[康熙来了0910300] SHUFFLE!Es...

elasticsearch-hadoop提供ElasticSearch与Apache Storm的集成支持。从ElasticSearch读取的数据是以Storm里Tuple的形式进行操作处理。
依赖版本信息:
&dependency&
&groupId&org.apache.storm&/groupId&
&artifactId&storm-core&/artifactId&
&version&1.0.1&/version&
&/dependency&
&dependency&
&groupId&org.apache.storm&/groupId&
&artifactId&storm-starter&/artifactId&
&version&1.0.1&/version&
&/dependency&
&dependency&
&groupId&org.apache.storm&/groupId&
&artifactId&storm-hdfs&/artifactId&
&version&1.0.1&/version&
&/dependency&
&dependency&
&groupId&org.apache.storm&/groupId&
&artifactId&storm-kafka&/artifactId&
&version&1.0.1&/version&
&/dependency&
&dependency&
&groupId&org.apache.kafka&/groupId&
&artifactId&kafka_2.10&/artifactId&
&version&0.10.0.0&/version&
&/dependency&
&dependency&
&groupId&org.elasticsearch&/groupId&
&artifactId&elasticsearch-hadoop&/artifactId&
&version&2.3.2&/version&
&/dependency&
Strom的extlib目录下jar包
import java.util.M
import org.apache.storm.task.OutputC
import org.apache.storm.task.TopologyC
import org.apache.storm.topology.OutputFieldsD
import org.apache.storm.topology.base.BaseRichB
import org.apache.storm.tuple.F
import org.apache.storm.tuple.T
import org.apache.storm.tuple.V
public class HandleBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector =
@SuppressWarnings(&rawtypes&)
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector =
public void execute(Tuple input) {
String name = &NA&;
if (input.contains(&name&)) {
name = input.getStringByField(&name&);
String phone = &NA&;
if (input.contains(&phone&)) {
phone = input.getStringByField(&phone&);
String rcall = &NA&;
if (input.contains(&rcall&)) {
rcall = input.getStringByField(&rcall&);
rcall = null == rcall || &null&.equals(rcall) ? &NA& :
String address = &NA&;
if (input.contains(&address&)) {
address = input.getStringByField(&address&);
address = null == address || &null&.equals(address) ? &NA& :
String email = &NA&;
if (input.contains(&email&)) {
email = input.getStringByField(&email&);
email = null == email || &null&.equals(email) ? &NA& :
String idCard = &NA&;
if (input.contains(&idCard&)) {
idCard = input.getStringByField(&idCard&);
idCard = null == idCard || &null&.equals(idCard) ? &NA& : idC
this.collector.emit(new Values(name, phone, rcall, address, email, idCard));
this.collector.ack(input);
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(&name&, &phone&, &rcal&, &address&, &email&, &idCard&));
import java.util.HashM
import java.util.M
import org.apache.storm.C
import org.apache.storm.LocalC
import org.apache.storm.StormS
import org.apache.storm.hdfs.bolt.HdfsB
import org.apache.storm.hdfs.bolt.format.DefaultFileNameF
import org.apache.storm.hdfs.bolt.format.DelimitedRecordF
import org.apache.storm.hdfs.bolt.format.FileNameF
import org.apache.storm.hdfs.bolt.format.RecordF
import org.apache.storm.hdfs.bolt.rotation.FileRotationP
import org.apache.storm.hdfs.bolt.rotation.TimedRotationP
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeU
import org.apache.storm.hdfs.bolt.sync.CountSyncP
import org.apache.storm.hdfs.bolt.sync.SyncP
import org.apache.storm.starter.bolt.PrinterB
import org.apache.storm.topology.TopologyB
import org.apache.storm.utils.U
public class ES2StormTopology {
private static final String TOPOLOGY_NAME = &es-storm-topology&;
public static void main(String[] args) {
if (args.length != 1) {
System.exit(0);
boolean isCluster = Boolean.parseBoolean(args[0]);
TopologyBuilder builder = new TopologyBuilder();
String target = &operator/telecom&;
String query = &?q=*&;
Map&Object, Object& configuration = new HashMap&Object, Object&();
configuration.put(&es.nodes&, &192.168.10.20:9200&);
configuration.put(&es.read.field.include&, &name,phone,rcall,email,idCard,zipCode,address&);
configuration.put(&es.storm.spout.fields&, &name,phone,rcall,email,idCard,zipCode,address&);
builder.setSpout(&es-storm-spout&, new ESSpout(target, query, configuration), 1);
builder.setBolt(&storm-print-bolt&, new PrinterBolt()).shuffleGrouping(&es-storm-spout&);
builder.setBolt(&storm-handle-bolt&, new HandleBolt()).shuffleGrouping(&es-storm-spout&);
RecordFormat recordFormat = new DelimitedRecordFormat().withFieldDelimiter(&:&);
SyncPolicy syncPolicy = new CountSyncPolicy(10);
FileRotationPolicy fileRotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(&/storm/&)
.withPrefix(&es_&).withExtension(&.log&);
HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl(&hdfs://centos.host1:9000&)
.withFileNameFormat(fileNameFormat).withRecordFormat(recordFormat)
.withRotationPolicy(fileRotationPolicy).withSyncPolicy(syncPolicy);
builder.setBolt(&storm-hdfs-bolt&, hdfsBolt).globalGrouping(&storm-handle-bolt&);
Config config = new Config();
config.setDebug(true);
if (isCluster) {
config.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(
TOPOLOGY_NAME, config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
注意:elasticsearch-hadoop里的EsSpout类用到的Storm版本过低,所以重写了一个ESSpout替换旧版本Storm的API。
$bin/storm&jar&/home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar&org.platform.storm.elasticsearch.ES2StormTopology&false
import java.util.HashM
import java.util.M
import org.apache.storm.C
import org.apache.storm.LocalC
import org.apache.storm.StormS
import org.apache.storm.starter.bolt.PrinterB
import org.apache.storm.topology.TopologyB
import org.apache.storm.utils.U
import org.platform.storm.elasticsearch.bolt.ESB
import org.platform.storm.elasticsearch.spout.ESS
public class Storm2ESTopology {
private static final String TOPOLOGY_NAME = &storm-es-topology&;
public static void main(String[] args) {
if (args.length != 1) {
System.exit(0);
boolean isCluster = Boolean.parseBoolean(args[0]);
TopologyBuilder builder = new TopologyBuilder();
String target = &operator/telecom&;
String query = &?q=*&;
Map&Object, Object& spoutConf = new HashMap&Object, Object&();
spoutConf.put(&es.nodes&, &192.168.10.20:9200&);
spoutConf.put(&es.read.field.include&, &name,phone,rcall,email,idCard,zipCode,address&);
spoutConf.put(&es.storm.spout.fields&, &name,phone,rcall,email,idCard,zipCode,address&);
builder.setSpout(&es-storm-spout&, new ESSpout(target, query, spoutConf), 1);
builder.setBolt(&storm-print-bolt&, new PrinterBolt()).shuffleGrouping(&es-storm-spout&);
Map&Object, Object& boltConf = new HashMap&Object, Object&();
boltConf.put(&es.nodes&, &192.168.10.20:9200&);
boltConf.put(&es.index.auto.create&, &true&);
boltConf.put(&es.ser.writer.bytes.class&, &org.platform.storm.elasticsearch.bolt.StormTupleBytesConverter&);
//boltConf.put(&es.input.json&, &true&);
builder.setBolt(&storm-es-bolt&, new ESBolt(&data/telecom&, boltConf))
.globalGrouping(&es-storm-spout&);
Config config = new Config();
config.setDebug(true);
if (isCluster) {
config.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(
TOPOLOGY_NAME, config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
注意:elasticsearch-hadoop里的EsBolt、StormTupleBytesConverter类用到的Storm版本过低,所以重写了一个ESBolt、StormTupleBytesConverter替换旧版本Storm的API。
$bin/storm&jar&/home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar&org.platform.storm.elasticsearch.Storm2ESTopology&false
本文已收录于以下专栏:
相关文章推荐
在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm。
Storm运行模式:
本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细...
public class Utils {
public Utils() {
* 自动生成32位的UUid,对应数据库的主键id进行插入用。
kafka_2.11-0.10.0.1 代码原理主要是建立线程池,利用多线程来消费信息,原理还是比较简单的,可配置线程数
KafkaConsumer
import java.util....
本片文章基于本专题Demo进行 传送门:http://blog.csdn.net/column/details/17004.html
由于Storm集成ES过于陈旧,所以会照成连接ES客户端抛出nod...
此篇基于原有两篇文章基础上扩展
STORM入门之(集成KafkaBolt) 传送门:http://blog.csdn.net/yl3395017/article/details/
首先需要Redis工具类,自己写了一个简易的源码地址:http://blog.csdn.net/yl3395017/article/details/
测试数据 KEY:test  V...
根据构造函数参数
1.开始时间
2.结束时间
3.轮训次数
4.轮训时间
import java.text.SimpleDateF
import java.util.D
Flume作用:收集日志
组件下载地址:http://archive.apache.org/dist/flume/1.6.0/ 
版本:1.6.0以上集成Kafka
Flume配置...
Kafka接收数据源生产的消息数据,通过订阅的方式,使用Storm的Topology作为消息的消费者。
相关版本:
kafka_2.11-0.10.0.0
storm-1.0.1
启动Kafk...
他的最新文章
讲师:王哲涵
讲师:韦玮
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)

参考资料

 

随机推荐