新聞中心
Storm是一個(gè)開(kāi)源的分布式實(shí)時(shí)計(jì)算系統(tǒng),它可以處理大量的數(shù)據(jù)流并進(jìn)行實(shí)時(shí)分析,在實(shí)際應(yīng)用中,單詞計(jì)數(shù)是一種常見(jiàn)的需求,可以通過(guò)Storm來(lái)實(shí)現(xiàn),下面將詳細(xì)介紹如何使用Storm實(shí)現(xiàn)單詞計(jì)數(shù)。

創(chuàng)新互聯(lián)長(zhǎng)期為上1000+客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開(kāi)放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為烏審企業(yè)提供專(zhuān)業(yè)的網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè),烏審網(wǎng)站改版等技術(shù)服務(wù)。擁有十多年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開(kāi)發(fā)。
我們需要?jiǎng)?chuàng)建一個(gè)Storm拓?fù)浣Y(jié)構(gòu),Storm拓?fù)溆梢粋€(gè)或多個(gè)Spouts(數(shù)據(jù)源)和Bolts(數(shù)據(jù)處理單元)組成,在這個(gè)例子中,我們將使用一個(gè)簡(jiǎn)單的Spout來(lái)生成單詞流,然后使用一個(gè)Bolt來(lái)計(jì)算每個(gè)單詞的出現(xiàn)次數(shù)。
1. 創(chuàng)建Spout:Spout是Storm拓?fù)涞臄?shù)據(jù)源,它負(fù)責(zé)生成數(shù)據(jù)流,在這個(gè)例子中,我們可以使用隨機(jī)數(shù)生成器來(lái)模擬單詞流,創(chuàng)建一個(gè)名為WordSpout的Java類(lèi),繼承自BaseRichSpout類(lèi),重寫(xiě)nextTuple方法,每次調(diào)用時(shí)生成一個(gè)隨機(jī)單詞作為輸出。
import backtype.storm.spout.BaseRichSpout;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import java.util.Map;
import java.util.Random;
public class WordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private Random random;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}
@Override
public void nextTuple() {
String word = "word" + random.nextInt(100);
collector.emit(new Values(word));
}
}
2. 創(chuàng)建Bolt:Bolt是Storm拓?fù)涞臄?shù)據(jù)處理單元,它負(fù)責(zé)對(duì)數(shù)據(jù)流進(jìn)行處理,在這個(gè)例子中,我們可以使用HashMap來(lái)存儲(chǔ)每個(gè)單詞的出現(xiàn)次數(shù),創(chuàng)建一個(gè)名為WordCounterBolt的Java類(lèi),繼承自BaseRichBolt類(lèi),重寫(xiě)execute方法,每次接收到一個(gè)單詞時(shí),將其出現(xiàn)次數(shù)加一,使用collector將結(jié)果發(fā)送出去。
import backtype.storm.bolt.BaseRichBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;
import java.util.Map.Entry;
public class WordCounterBolt extends BaseRichBolt {
private OutputCollector collector;
private Map wordCounts;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.wordCounts = new HashMap<>();
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
int count = wordCounts.containsKey(word) ? wordCounts.get(word) + 1 : 1;
wordCounts.put(word, count);
collector.emit(new Values(word, count));
}
}
3. 配置拓?fù)洌航酉聛?lái),我們需要配置Storm拓?fù)?,?chuàng)建一個(gè)名為WordCountTopology的Java類(lèi),繼承自BaseMainClass類(lèi),重寫(xiě)buildTopology方法,設(shè)置Spout和Bolt的配置參數(shù),啟動(dòng)拓?fù)洹?/p>
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import storm_wordcount_example.*; // 導(dǎo)入自定義的Spout和Bolt類(lèi)
public class WordCountTopology {
public static void main(String[] args) throws Exception {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", buildTopology());
Utils.sleep(10000); // 等待10秒后關(guān)閉集群
cluster.shutdown();
}
private static TopologyBuilder buildTopology() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-spout", new WordSpout(), 5); // 設(shè)置Spout的并發(fā)度為5
builder.setBolt("word-counter", new WordCounterBolt(), 5).shuffleGrouping("word-spout"); // 設(shè)置Bolt的并發(fā)度為5,并指定分組策略為隨機(jī)分組(shuffle grouping)
return builder;
}
}
4. 運(yùn)行拓?fù)洌哼\(yùn)行WordCountTopology類(lèi),觀察單詞計(jì)數(shù)的結(jié)果,在Storm UI中,可以看到每個(gè)單詞的出現(xiàn)次數(shù)以及總計(jì)數(shù),還可以查看拓?fù)涞臓顟B(tài)、任務(wù)分配等信息。
新聞名稱(chēng):storm怎么記
標(biāo)題鏈接:http://m.jiaoqi3.com/article/djoihgh.html


咨詢(xún)
建站咨詢(xún)
