登入帳戶  | 訂單查詢  | 購物車/收銀台( 0 ) | 在線留言板  | 付款方式  | 運費計算  | 聯絡我們  | 幫助中心 |  加入書簽
會員登入 新用戶登記
HOME新書上架暢銷書架好書推介特價區會員書架精選月讀2023年度TOP分類瀏覽雜誌 臺灣用戶
品種:超過100萬種各類書籍/音像和精品,正品正價,放心網購,悭钱省心 服務:香港台灣澳門海外 送貨:速遞郵局服務站

新書上架簡體書 繁體書
暢銷書架簡體書 繁體書
好書推介簡體書 繁體書

四月出版:大陸書 台灣書
三月出版:大陸書 台灣書
二月出版:大陸書 台灣書
一月出版:大陸書 台灣書
12月出版:大陸書 台灣書
11月出版:大陸書 台灣書
十月出版:大陸書 台灣書
九月出版:大陸書 台灣書
八月出版:大陸書 台灣書
七月出版:大陸書 台灣書
六月出版:大陸書 台灣書
五月出版:大陸書 台灣書
四月出版:大陸書 台灣書
三月出版:大陸書 台灣書
二月出版:大陸書 台灣書

『簡體書』Elasticsearch集成Hadoop最佳实践

書城自編碼: 3014954
分類:簡體書→大陸圖書→計算機/網絡程序設計
作者: [著] [美]Vishal Shukla ,[译]贾传青
國際書號(ISBN): 9787302469674
出版社: 清华大学出版社
出版日期: 2017-06-01
版次: 1 印次: 1
頁數/字數: 186页
書度/開本: 32开 釘裝: 平装

售價:HK$ 79.8

我要買

 

** 我創建的書架 **
未登入.


新書推薦:
山河不足重,重在遇知己
《 山河不足重,重在遇知己 》

售價:HK$ 54.0
独自走过悲喜
《 独自走过悲喜 》

售價:HK$ 81.6
永不停步:玛格丽特·阿特伍德传
《 永不停步:玛格丽特·阿特伍德传 》

售價:HK$ 94.8
假努力:方向不对,一切白费
《 假努力:方向不对,一切白费 》

售價:HK$ 71.8
北京三万里
《 北京三万里 》

售價:HK$ 93.6
争吵的恋人:我们为什么相爱,又为什么争吵
《 争吵的恋人:我们为什么相爱,又为什么争吵 》

售價:HK$ 70.8
秘史:英国情报机构的崛起
《 秘史:英国情报机构的崛起 》

售價:HK$ 81.6
李鸿章及其时代:中西方世界的历史撞击
《 李鸿章及其时代:中西方世界的历史撞击 》

售價:HK$ 70.8

 

建議一齊購買:

+

HK$ 243.6
《 深度学习 》
+

HK$ 129.1
《 Hadoop构建数据仓库实践 》
+

HK$ 114.6
《 深入理解Elasticsearch(原书第2版) 》
+

HK$ 146.9
《 Apache Kafka源码剖析 》
+

HK$ 129.1
《 Spring Cloud微服务实战 》
+

HK$ 100.1
《 基于Apache Kylin构建大数据分析平台 》
編輯推薦:
Hadoop已经是业界公认的大数据处理的事实标准,对海量数据的存储与处理都是不在话下,不过,随着业务的复杂化以及需求的多变,分析师们迫切地想要看到结果从而进行分析,而目前Hadoop离线 批量处理的方式稍微有点力不从心,有没有一种能够更灵活地使用和加工数据的方法呢?鉴于此,Elasticsearch脱颖而出,Elasticsearch可以很方便地对海量数据进行搜索与聚合,并且能够实时响应,用户无需为每次不同的查询而重新定义Schema或者预处理。
数据科学家可以非常灵活地对数据进行各种维度的钻取与分析,把更多的精力关注在业务本身和挖掘数据本身的价值,并且可以随时根据上一个查询得到的线索随时去构建新的查询从而继续挖掘,这在以前是不敢想象的。另外,Elasticsearch还能和Hadoop进行结合,Hadoop 适合海量数据的归档与离线预处理,Elasticsearch则进行实时检索与分析,而本书是目前介绍这两方面结合使用的非常好的中文资料,相信对您一定会大有裨益。

Medcl
Elastic中文社区发起人
Elastic工程师与布道师

Elasticsearch是近年来大数据领
內容簡介:
ElasticSearch是一个开源的分布式搜索引擎,具有高可靠性,支持非常多的企业级搜索用例。Elasticsearch Hadoop作为一个完美的工具,用来连接 Elasticsearch 和 Hadoop 的生态系统。通过Kibana技术,Elasticsearch Hadoop很容易从Hadoop 生态系统中获得大数据分析的结果。
本书全面介绍Elasticsearch Hadoop技术用于大数据分析以及数据可视化的方法。内容共分7章,包括Hadoop、Elasticsearch、 Marvel和 Kibana 安装;通过编写 MapReduce 作业,把Hadoop数据导入 Elasticsearch;全面分析 Elasticsearch本质,如全文本搜索分析、 查询、 筛选器和聚合;使用 Kibana创建各种可视化和交互式仪表板,并使用Storm和 Elasticsearch分类现实世界的流数据以及相关的其他主题。
本书适合从事大数据分析人员、大数据应用开发的人员参考,也适合高等院校及培训机构相关专业的师生教学参考。
關於作者:
贾传青,数据架构师,Oracle OCM,DB2迁移之星,TechTarget特约作家,从数据库向大数据转型的先行者,酷爱摄影。曾服务于中国联通、中国电信、建设银行、PICC等,目前供职于一家大数据解决方案提供商,致力于大数据技术的应用与实践。著有《开源大数据分析引擎Impala实战》一书。
目錄
目录
第1章 环境部署 1
1.1 安装部署Hadoop集群 1
Java安装和配置 2
用户添加和配置 2
SSH认证配置 3
Hadoop下载 4
环境变量配置 4
Hadoop配置 5
配置core-site.xml 6
配置hdfs-site.xml 6
配置yarn-site.xml 6
配置mapred-site.xml 7
格式化HDFS 7
启动Hadoop进程 8
1.2 安装Elasticsearch及相关插件 8
下载Elasticsearch 9
配置Elasticsearch 9
安装Head插件 11
安装Marvel插件 11
启动Elasticsearch 12
1.3 运行WordCount示例 13
下载编译示例程序 13
将示例文件上传到HDFS 13
运行第一个作业 14
1.4 使用Head 和 Marvel浏览数据 16
使用Head浏览数据 16
初识Marvel 18
使用Sense浏览数据 19
小结 21
第2章 初识ES-Hadoop 22
2.1 理解WordCount程序 23
理解Mapper 23
理解Reducer 24
理解Driver 25
使用旧的APIorg.apache.hadoop.mapred 28
2.2 实际案例网络数据监控 28
获取并理解数据 28
明确问题 29
解决方案 30
解决方案1预聚合结果 30
解决方案2直接查询聚合结果 32
2.3 开发MapReduce作业 33
编写Mapper类 34
编写Driver 37
编译作业 38
上传数据到HDFS 41
运行作业 41
查看TOP N结果 42
2.4 将数据从Elasticsearch写回HDFS 44
了解Twitter数据集 44
导入Elasticsearch 45
创建MapReduce作业 46
编写Tweets2HdfsMapper 46
运行示例 50
确认输出 50
小结 52
第3章 深入理解Elasticsearch 53
3.1 理解搜索 53
观念转换 54
索引 54
类型 55
文档 55
字段 55
3.2 与Elasticsearch交互 56
Elasticsearch的CRUD 56
创建文档 56
获取文档 57
更新文档 58
删除文档 58
创建索引 58
映射 59
数据类型 60
创建映射 61
索引模板 62
3.3 控制索引过程 63
什么是反转索引 63
输入数据分析 64
停止词 64
大小写 65
词根 65
同义词 65
分析器 65
3.4 Elastic查询 67
编写查询语句 68
URI查询 68
match_all查询 68
term查询 68
boolean查询 70
match查询 71
range查询 72
wildcard查询 73
过滤器 73
3.5 聚合查询 75
执行聚合查询 76
terms聚合 76
histogram聚合 78
range聚合 78
geo distance聚合 79
嵌套聚合 81
自测题 82
小结 82
第4章 利用Kibana进行大数据可视化 83
4.1 安装部署 83
Kibana安装 84
准备数据 84
自测题 85
启动Kibana 86
4.2 数据发现 87
4.3 数据可视化 90
饼图 91
堆积柱状图 94
使用堆积柱状图完成日期直方图 96
面积图 97
饼图组图 98
环形图 98
瓦片地图 99
自测题 100
4.4 动态图表 101
小结 104
第5章 实时分析 105
5.1 了解Twitter趋势分析器 105
实现目标 106
Apache Storm安装 107
5.2 将流式数据接入Storm 107
编写Storm spout 108
编写Storm bolt 110
创建Storm topology 112
编译运行Storm作业 113
5.3 趋势分析 114
significant term聚合 114
使用Kibana分析趋势 116
5.4 使用Percolator对推文分类 117
Percolator 118
Percolator优化 120
推文分类 121
小结 124
第6章 ES-Hadoop配置 125
6.1 分布式环境中的Elasticsearch 125
集群和节点 126
节点类型 126
节点发现 128
数据分布 129
分片 129
副本 129
分片分配 130
6.2 ES-Hadoop架构 132
动态并行 132
写入Elasticsearch 133
从Elasticsearch中读取 134
失败捕获 134
数据本地化 135
6.3 生产环境配置 135
硬件 135
内存 135
CPU 135
磁盘 136
网络 136
集群安装 137
集群拓扑结构 137
设置名称 138
设置路径 138
设置内存 139
脑裂问题 140
设置恢复参数 141
预设配置 142
数据导入 142
全文检索 144
快速聚合 144
生产环境部署检查列表 145
6.4 集群管理 146
监控集群健康 146
备份和恢复 149
数据备份 149
数据恢复 150
小结 151
第7章 与Hadoop生态系统集成 152
7.1 与Pig集成 152
Pig安装 154
向Elasticsearch中导入数据 155
从JSON源写数据 157
类型转换 157
从Elasticsearch中读取数据 158
7.2 与Hive集成 158
安装Apache Hive 158
向Elasticsearch中导入数据 159
从JSON源写数据 161
类型转换 161
从Elasticsearch中读取数据 162
7.3 与Cascading集成 163
向Elasticsearch中导入数据 163
编写一个Cascading作业 163
运行作业 164
从Elasticsearch中读取数据 165
编写一个reader作业 165
使用Lingual 165
7.4 与Spark集成 167
安装Spark 168
向Elasticsearch中导入数据 168
使用SparkSQL向Elasticsearch中导入数据 169
从Elasticsearch中读取数据 170
使用SparkSQL从Elasticsearch中读取数据 170
7.5 与YARN集成 171
小结 172
附录 配置 174
基本配置 174
es.resource 174
es.resource.read 174
es.resource.write 175
es.nodes 175
es.port 175
读写配置 175
es.query 175
es.input.json 176
es.write.operation 177
es.update.script 177
es.update.script.lang 177
es.update.script.params 177
es.update.script.params.json 178
es.batch.size.bytes 178
es.batch.size.entries 178
es.batch.write.refresh 178
es.batch.write.retry.count 178
es.batch.write.retry.wait 179
es.ser.reader.value.class 179
es.ser.writer.value.class 179
es.update.retry.on.conflict 179
映射配置 179
es.mapping.id 179
es.mapping.parent 180
es.mapping.version 180
es.mapping.version.type 180
es.mapping.routing 180
es.mapping.ttl 180
es.mapping.timestamp 181
es.mapping.date.rich 181
es.mapping.include 181
es.mapping.exclude 181
索引配置 181
es.index.auto.create 181
es.index.read.missing.as.empty 182
es.field.read.empty.as.null 182
es.field.read.validate.presence 182
网络配置 182
es.nodes.discovery 182
es.nodes.client.only 183
es.http.timeout 183
es.http.retries 183
es.scroll.keepalive 183
es.scroll.size 183
es.action.heart.beat.lead 183
认证配置 184
es.net.http.auth.user 184
es.net.http.auth.pass 184
SSL配置 184
es.net.ssl 184
es.net.ssl.keystore.location 184
es.net.ssl.keystore.pass 184
es.net.ssl.keystore.type 184
es.net.ssl.truststore.location 184
es.net.ssl.truststore.pass 185
es.net.ssl.cert.allow.self.signed 185
es.net.ssl.protocol 185
es.scroll.size 185
代理配置 185
es.net.proxy.http.host 185
es.net.proxy.http.port 185
es.net.proxy.http.user 185
es.net.proxy.http.pass 186
es.net.proxy.http.use.system.props 186
es.net.proxy.socks.host 186
es.net.proxy.socks.port 186
es.net.proxy.socks.user 186
es.net.proxy.socks.pass 186
es.net.proxy.socks.use.system.props 186
內容試閱
前 言
在2004年到2006年期间,关于Hadoop的核心组件的讨论都是围绕MapReduce的。Hadoop天生具有分布式运算能力和水平扩展能力,这些特性使其在各个行业被广泛应用。那些超大型的组织认识到Hadoop带来的价值,包括处理TB和PB级数据、采集处理社交数据、利用廉价的商业硬件存储海量数据等。然而,大数据解决方案除了这些以外,还需要解决数据处理的实时性问题,尤其是对非结构化数据的实时性处理。Elasticsearch是一款高效的分布式搜索及分析引擎,可以让你实时了解你的海量数据。它丰富的查询能力可以帮助你进行复杂的全文检索、基于地理位置的分析及异常检测等。Elasticsearch-Hadoop也被简称为ES-Hadoop,是Elasticsearch和Hadoop的连接器,通过它可以非常方便地在Hadoop生态系统和Elasticsearch之间进行数据交互。你也可以将流式数据从Apache Storm或者Apache Spark写入Elasticsearch进行实时分析。本书的目标是让你获得真正的利用Hadoop和Elasticsearch的能力。我将带你一步一步地对海量数据进行数据发现和数据探索。你将学习如何将Elasticsearch与Pig、Hive、Cascading、Apache Storm和Apache Spark等Hadoop生态系统工具进行无缝集成。通过本书的学习,你可以使用Elasticsearch创建自己的分析报表。通过强大的数据分析和可视化平台Kibana,你可以对要展示的图形、大小、颜色等进行控制。在本书中我使用了不少很有意思的数据集,通过这些数据集你将获得真实的数据探索体验。因此,你可以使用我们介绍的工具和技术非常快速地构建基于特定行业的解决方案。我衷心希望阅读本书能够给你带来有趣的学习体验。


第5章 实时分析
我们已经了解了如何以批处理方式对不同的数据源进行数据采集、分析和可视化。如果缩短分析的时间周期可以节省时间和金钱成本,那么实时分析就非常有必要了。当金融交易的交易量骤减或者某些商品的库存量变得太小时,我们希望立刻就能看到数据分析的结果来寻找原因,而不是让数据在晚上经历批处理任务,第二天才能从分析师那里看到分析的报告。在本章中,我们会讨论如何使用Apache Storm将数据导入Elasticsearch并进行实时分析。在很多场景下我们将要介绍的这些Elasticsearch高级特性会成为我们的瑞士军刀。在本章中,我们将介绍以下内容:● 了解Twitter趋势分析器● 将流式数据接入Storm● 趋势分析● 使用Percolator对推文分类5.1了解Twitter趋势分析器要学习一个东西最好的方式就是运用它。这也正是我们本章要做的。所以,让我们先来了解一下本章要做的东西。实现目标Twitter是可能包括你的客户在内的很多人发表自己见解的地方。通过从海量的推文中分析特定的关键字或者话题可以进行趋势分析。在本章中,我们将使用Apache Storm和Elasticsearch开发自己的Twitter趋势分析器。当然,中间还需要使用ES-Hadoop来做这两个组件的整合。通常情况下,趋势分析器可以根据关键字或者话题展示其趋势。比如,#elasticsearch和#apachestorm就是上个月的热门话题。我们还可以找出像Big Data这样更高级别的热门话题。这个趋势其实是通过字符串进行精确匹配,将所有大数据生态系统的术语进行聚合得到的。类似地,我们可以通过添加和某个类别相关的关键词或者话题来创建一个新的类别。我们将使用分类技术来对流式的推文数据进行类别匹配。不止如此,你还可以以天、周、年等不同的时间周期来查看趋势。图5-1是我们要开发的趋势分析器的整理流程图。
图5-1如图5-1中显示,我们将使用Storm Spout来接收实时推文数据,将数据按照不同类别分类之后导入Elasticsearch中,在Elasticsearch中使用segnificant terms聚合计算出不同的趋势,然后使用Kibana进行展示。这个流程看起来挺简单,然而基于海量数据完成这个处理流程并不容易。同样的,分类的过程也不简单。我们将使用Elasticsearch中的Percolator来解决分类问题。我们需要先安装Apache Storm,用它来接收数据,并把数据导入Storm。Apache Storm安装Apache Storm是一个分布式实时计算引擎。它可以对那些Hadoop使用批处理模式处理的数据进行实时计算。使用如下命令下载Apache Storm的相应版本(撰写本书时,Storm的稳定版本是0.9.5):$ cd usrlocal$ sudo wget http:www.apache.orgdyncloser.cgistormapache-storm-0.9.5 apache-storm-0.9.5.tar.gz使用如下命令将下载的文件解压到storm目录:$ sudo tar -zxvf apache-storm-0.9.5.tar.gz$ sudo mv apache-storm-0.9.5 storm通过如下命令在~.bashrc文件中将storm二进制添加到PATH变量中:$ export PATH=$PATH:usrlocalstorm-0.9.5bin我们现在安装的storm是以本地模式安装的。如果你需要在生产环境中以集群模式安装storm,就需要安装Zookeeper,还需要独立的nimbus和supervisor。5.2将流式数据接入Storm很多读者可能已经对Storm有了充分的了解。但是,在这里我还是要对那些不太了解Storm的读者进行一个简单的介绍。Storm为流式数据提供了一个实时计算框架。因此,流是Storm的数据抽象,是由无限制的tuple组成的序列。在Storm术语中,tuple是流式数据的一个单元。Storm作业的工作组件包括spout和bolt。spout是流的数据源,而bolt消费这些流。你可以通过对spout和bolt进行级联生成topology。topology是我们可以向集群提交执行的顶层抽象。图5-2是一个Storm的topology示例,显示数据是如何从数据源经过处理并存储的。
图5-2现在我们编写一个Storm作业,它负责监听实时的推文数据,并把它们导入Elasticsearch中。简单起见,我们只是实现简单的监听功能,不对推文进行分类处理,直接把我们需要的数据导入Elasticsearch中。图5-3显示了我们将要实现的Twitter趋势分析器的topology。
图5-3编写Storm spout我们使用Twitter4j API接收实时的Twitter数据流。然后,我们创建了一个状态监听器,它负责接收twitter4j.Status对象中的推文。从如下代码片段中可以看到监听类是在Storm spout中以内部类的形式定义的:public class TweetsCollectorSpout extends BaseRichSpout {
String consumerKey = "";String consumerSecret = "";String accessToken = "";String accessTokenSecret = "";String[] keyWords = {};在上述代码中,我们创建的类TweetsCollectorSpout继承了Apache Storm提供的BaseRichSpout类。我们初始化了Twitter API认证需要的相关变量。为了消费Twitter API,我们需要提供注册应用程序时得到的用户key和secret。另外,我们还需要访问令牌的key和secret。你可以访问https:dev.twitter.com获取更多的相关信息。SpoutOutputCollector collector;LinkedBlockingQueue queue = null;TwitterStream twitterStream;
@Overridepublic void openMap conf, TopologyContext context,SpoutOutputCollector collector {queue = new LinkedBlockingQueue1000;this.collector = collector;
StatusListener listener = new StatusListener {public void onStatusStatus status {queue.offerstatus;}}; Get TwitterStream instance Register StatusListener with TwitterStream Configure OAuth access tokens..要接收Twitter的数据流,我们需要实现twitter4j API中的StatusListener接口,使用StatusListener接口的onStatus方法可以获取用户实时发送的推文。我们在初始化spout时初始化了这个监听器。你也可以在BaseRichSpout中的open方法中写自定义的spout初始化代码。另外,我们还需要使用spout的nextTuple方法从StatusListener中接收推文。为了实现这个目的,我们需要将推文写入LinkedBlockingQueue,然后在nextTuple中轮询这个队列。然后,我们创建了一个TwitterStream,将监听器传入刚刚创建的数据流,把用户应用证书和访问令牌相关信息也传进去。这个数据流可以随机获取推文数据。另外,我们也可以通过指定关键字对推文进行过滤。其他相关的代码不是我们本章关注的重点,所以并没有体现在上述代码中。下面列出nextTuple方法和declareOutputFields的代码:public void nextTuple {Status status = queue.poll;if status == null {Utils.sleep50;} else {collector.emitnew Valuesstatus;}}
public void declareOutputFieldsOutputFieldsDeclarer declarer{declarer.declarenew Fields"tweet";}Storm通过spout中的nextTuple方法获取下一个tuple。在nextTuple中,我们对推文队列进行轮询,然后调用SpoutOutputCollector中的emit方法将轮询的结果传入后续的bolt中。另外,我们还需要在declareOutputFields方法中声明输出字段。我们向下一个bolt传输的只是一个简单的String类型的status消息,请看如下代码:@Overridepublic Map getComponentConfiguration {Config config = new Config;config.setMaxTaskParallelism5;return config;}在上述代码中,我们还可以设置任务的并行度。编写Storm boltStorm bolt接收spout发送的字段tweet的tuple。TweetsParserBolt解析推文,抽取出相应字段,然后将字段提交到topology中的下一个bolt。以下是TweetsParserBolt将数据提交给EsBolt的代码片段:public class TweetsParserBolt extends BaseRichBolt {private OutputCollector collector;
public void prepareMap stormConf, TopologyContext context,OutputCollector collector {this.collector = collector;}在上述代码中,我们创建的TweetsParserBolt类继承了BaseRichBolt。在prepare方法中提供了相应的OutputCollector实例。@Overridepublic void executeTuple input {Status status = Status input.getValueByField"tweet";
String tweet = status.getText;String source = status.getSource;Date createdDate = status.getCreatedAt;HashtagEntity entities[] = status.getHashtagEntities;long retweetCount = status.getRetweetCount;long favoriteCount = status.getFavoriteCount;UserMentionEntity mentions[] = status.getUserMentionEntities;String lang = status.getLang;
Extract hashtagsif entities != null {for HashtagEntity entity : entities {String hashTag = entity.getText;hashtagList.addhashTag;}}我们可以覆盖BaseRichBolt的execute方法进行逻辑处理。在上述代码中,我们只是从Status中抽取了我们感兴趣的字段。if"en".equalsIgnoreCaselang{System.out.println"Emitting : " userHandle " - " tweet;collector.emitinput, new Valuesuser, userHandle, tweet,createdDate, location, country, strHashtag, source, lang,retweetCount, favoriteCount, strUserMention; }为了避免非英文字符对分析的影响,我们仅仅过滤出英文推文索引到Elasticsearch中。最后,我们将处理后的值发送给EsBolt。请参考如下代码:public void declareOutputFieldsOutputFieldsDeclarer declarer {declarer.declarenew Fields"user", "userHandle", "tweet","time", "location", "country", "hashtags","source","lang", "retweetCount", "favoriteCount","mentions";}和本节前面介绍的类似,declareOutputFields方法负责将我们选择的字段发送给EsBolt。创建Storm topology正如我们前面介绍的,Storm topology把spout和bolt处理流程连接起来,构造了一个流程图。让我们创建一个topology来实现spout和bolt的相互通信:public class Topology {
public static void mainString[] args throwsInterruptedException {
TopologyBuilder builder = new TopologyBuilder;builder.setSpout"tweets-collector", newTweetsCollectorSpout,1;builder.setBolt"tweets-parser-bolt", newTweetsParserBolt.shuffleGrouping"tweets-collector";在上述代码中,我们在TopologyBuilder对象中设置了TweetsCollectorSpout和TweetsParserBolt。我们使用tweets-parse-bolt的shuffleGrouping方法来监听tweets-collector的tuple。Map config = new HashMap;builder.setBolt"es-bolt", new EsBolt"es-stormstorm-tweets",config.shuffleGrouping"tweets-parser-bolt".addConfigurationConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2;
LocalCluster cluster = new LocalCluster;cluster.submitTopology"twitter-test", null,builder.createTopology;}}ES-Hadoop提供了单独的EsSpout和EsBolt与Elasticsearch进行数据交互。在上述代码中,我们使用EsBolt接收来自tweets-parse-bolt的tuple。最后,我们调用TopologyBuilder对象的createTopology方法创建了一个topology实例,然后使用submitTopology方法向集群提交这个topology。编译运行Storm作业我们可以使用Maven将其编译成内置了JAR依赖的JAR文件。编译成功之后,我们就可以使用以下命令来运行Storm作业。在执行该命令之前需要先启动运行Elasticsearch:$ storm jar ch05-0.0.1-job.jar com.packtpub.esh.streaming.Topology我们执行的上述命令将引导Storm和Zookeeper服务器、启动worker,并且让spout和bolt做好准备接收推文。下面的文本片段就是Storm作业启动后控制台的输出信息。在这段文本的后半部分我们可以看到数据被导入Elasticsearch并在控制台打印输出:16048 [Thread-11-tweets-collector] INFO backtype.storm.daemon.executor - Opening spout tweets-collector:3 16081 [Thread-13-tweets-parser-bolt] INFO backtype.storm.daemon.executor - Preparing bolt tweets-parser-bolt:4 16085 [Thread-13-tweets-parser-bolt] INFO backtype.storm.daemon.executor - Prepared bolt tweets-parser-bolt:4 16100 [Thread-9-es-bolt] INFO backtype.storm.daemon.executor - Preparing bolt es-bolt:2 16108 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Preparing bolt __acker:1 16108 [Thread-15-__system] INFO backtype.storm.daemon.executor - Preparing bolt __system:-1 16116 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Prepared bolt __acker:1 16120 [Thread-15-__system] INFO backtype.storm.daemon.executor - Prepared bolt __system:-1 16211 [Thread-11-tweets-collector] INFO backtype.storm.daemon.executor - Opened spout tweets-collector:3 16213 [Thread-11-tweets-collector] INFO backtype.storm.daemon.executor - Activating spout tweets-collector:3 16213 [Twitter Stream consumer-1[initializing]] INFO twitter4j.TwitterStreamImpl - Establishing connection. 16982 [Thread-9-es-bolt] INFO org.elasticsearch.hadoop.util.Version - Elasticsearch Hadoop v2.1.0.Beta4 [2c62e273d2] 16982 [Thread-9-es-bolt] INFO org.elasticsearch.storm.EsBolt - Writing to [es-stormstorm-tweets] 17028 [Thread-9-es-bolt] INFO backtype.storm.daemon.executor - Prepared bolt es-bolt:2 19823 [Twitter Stream consumer-1[Establishing connection]] INFO twitter4j.TwitterStreamImpl - Connection established. 19823 [Twitter Stream consumer-1[Establishing connection]] INFO twitter4j.TwitterStreamImpl - Receiving status stream.5.3趋势分析在将推文导入Elasticsearch之后,我们就可以利用Elasticsearch的分析能力来对推文进行分析。在对推文的分析中,我们着重对趋势进行分析。在进行趋势分析之前,我们首先要知道什么是趋势。趋势就是某件事情在某个时间区间、某个地理位置出现得有多频繁的描述。换句话说,通过趋势我们可以发现平常中的异常。我们将在普通的场景中寻找某些隐藏其中的变化。这样的过程也叫作异常检测。它从整个数据集中找出哪些数据和整体数据表现的特性是偏离的。我们把整个数据集称为背景数据集,而我们感兴趣的某个时间区间或者某个地理位置的数据被称为前景数据集。比如,在1000000推文的背景数据集中,单词Dornier大约出现1次,然而某一天的前景数据集的100条推文中,这个单词出现了5次。这就表示这一天这个单词出现了异常。significant term聚合Elasticsearch的significant term聚合对异常检测提供了开箱支持。现在我们基于导入Elasticsearch中的推文数据集进行日趋势变化的查询:$ curl -XPOST d http:localhost:9200es-storm_search{ "size": 0, "query": {"range": { "time": {"gte": "now-1d","lte": "now" }} }, "aggs": {"significant_hashtags": { "significant_terms": {"field": "hashtags","size": 3 }} }}在上述查询中,基于范围的查询过滤出了前景数据,而索引中所有的文档都被作为了背景数据。你也可以使用background_filter来过滤背景数据。在笔者的测试环境中,上面的查询语句返回如下结果:{ "took": 31, "timed_out": false, ... ..., "aggregations": { "significant_hashtags": {"doc_count": 42760,"buckets": [{ "key": "ohnoharry", "doc_count": 381, "score": 0.03435635054048922, "bg_count": 381},{ "key": "otrasandiego", "doc_count": 228, "score": 0.018093820892682595, "bg_count": 252},{ "key": "msgblessedbygod", "doc_count": 190, "score": 0.01713308819604449, "bg_count": 190}] } }}返回的结果中,doc_count和bg_count分别表示在前景数据集和背景数据集中匹配到的文档个数。其中的score是基于前景数据集和背景数据集匹配到的文档数计算出来的排名。 significant term聚合使用了子聚合,在进行基于某个匹配项,基于地理位置或者基于范围的异常时非常有用。
使用Kibana分析趋势让我们使用Kibana来查看Twitter趋势分析器的趋势变化情况。我们将使用子聚合来创建一个柱状图查看最近一天、最近一周、最近一年的趋势变化情况。执行步骤如下:1. 在Kibana的Visualize选项卡中选择Vertical bar chart。2. 在x轴中选择time字段中的 Date Range Aggregation。3. 添加最近一天、最近一周、最近一年三个时间范围。4. 使用Split Bars添加子聚合,对hashtags进行significant term聚合。你可能不想对所有推文的话题进行展示。5. 选择想展示的话题个数,这里我们选择4。6. 在Options选项卡中将Bar Mode设置为percentage。7. 生成图表。 为了使图表中展示的结果更有意义,我们需要让Storm作业跑得时间稍微长一点,采集到足够多的数据。另外,你在选择图表的数据范围时也需要与采集的数据时间范围相对应。比如,我们需要看基于月的趋势变化,至少需要一到两个月的背景数据。
图5-4显示了我们按照最近一天、最近一周、最近一年生成的趋势变化图。
图5-45.4使用Percolator对推文分类到目前为止,我们已经开发了一个非常简单的趋势分析器。在5.1节中,我们讨论的趋势分析器不止限于按照话题进行分析,功能更复杂。在本节中,我们将修改Storm的bolt,对传入的实时数据进行分类处理。我们通过检查推文的话题是否满足某个标准来对其进行分类。基于这个标准,我们将相应的文档标记为某个类别。Percolator 我们可以使用Elasticsearch查询来执行分类的标准。当数据存入Elasticsearch时,我们可以使用Percolator对文档进行匹配。通常情况下,当进行查询时,我们需要向搜索引擎提交查询语句,搜索引擎就会把匹配到的文档返回给我们。Percolator与普通的查询过程完全相反。普通的查询过程要求文档需要被索引在Elasticsearch中,而Percolator则要求Elasticsearch查询需要事先被索引在Elasticsearch中。普通的查询过程是根据查询去匹配文档,而Percolator则是根据文档来匹配索引在Elasticsearch中的查询。图5-5展示了Percolator是如何工作的。
图5-5在图5-5中我们可以看到Elasticsearch是如何将不同的查询本身索引在Elasticsearch中的。我们定义了Big DataHacking和Agile三个不同的分类标准。当我们进行查询时,Elasticsearch会从已经被索引的查询中匹配到对应的查询,然后把匹配到的查询返回。创建Percolator查询只需要在Elasticsearch索引中添加一个新的文档。这个文档也以JSON的格式索引在Elasticsearch中。如下命令展示了如何创建Big Data类别的Percolator查询:$ curl -XPOST ''http:localhost:9200es-storm.percolator1{"query" : {"match" : {"tweet" : "bigdata analytics hadoop spark elasticsearch eshadoop nosql mongo mongodb cassandra hbase titan orientdb neo4j storm pig hive cloudera hortonworks"}}}''当执行上述命令时,我们将会创建一个匹配Big Data生态系统关键词的Percolator查询。另外一个问题是如何执行这个Percolator查询。这个查询可以根据文档提供与定义在Percolator中的查询匹配的能力。这也意味着我们在Percolator查询请求中需要提供文档本身。请看以下命令:$ curl -XGET ''http:localhost:9200es-stormstorm-tweets_percolate'' d ''{"doc" : {"tweet" : "I can''t believe that I can now analyse trends from the hadoop data in a snap using Elasticsearch-Hadoop."}}''上述的_percolate请求会对我们在请求体中的文档与我们已经创建的Percolator查询进行匹配。返回结果如下:{ "took": 31, "_shards": { "total": 5, "successful": 5, "failed": 0 }, "total": 1, "matches": [ {"_index": "es-storm","_id": "1" } ]}在上述返回结果中,matches字段列出了所有与我们提供的文档匹配的percolator查询,而_id字段指的是我们创建Percolator时使用的id。 要得到上述结果,需要已经在相应的索引上创建了对应的Percolator。
Percolator优化在上一部分中,我们使用了一个Big Data的Percolator查询。在这个查询中,我们使用了一系列与Big Data相关的关键词,但是要确定和某个类别相关的关键词也不太容易。最好的方式是从用户发送的推文中寻找。要人工对全部推文进行分析来确定与某个类别相关的关键词是不切实际的。使用Elasticsearch的significant term聚合可以在推文中找出比Big Data出现更频繁的并且和Big Data相关的关键词。我们在创建Percolator查询时可以以这些高频词作为参考。现在,让我们来回答一个问题:在包含bigdata hadoop spark elasticsearch eshadoop nosql任何词的推文中找出出现频率最高的单词。以下是查询语句:$ curl -XPOST http:localhost:9200es-stormstorm-tweets_search -d ''{ "query": {"match": {"tweet": "bigdata hadoop spark elasticsearch eshadoop nosql" } }, "aggs": {"bigdata_suggestions": { "significant_terms": {"field": "tweet" }} }, "size": 0}''当我们执行上述查询时,我们可以得到analytics、mining、data warehouse和ETL这些关键词,这些词和这个类别并不相关,却为人工选择关键词提供了参考依据。我们根据包含某些关键词的推文查询出频度高的单词,然后针对包含高频单词的推文进行迭代查询,这样就把关键词选择变成了一个需要很少人工干预的自学习过程。推文分类使用Percolator,我们可以将文档与Percolator查询匹配。我们可以把Percolator的id映射为相应的分类名称。我们将在之前创建的TweetsPasrserBolt中实现分类功能。以下是在ElasticSearchService类中实现Percolator查询的代码示例:public class ElasticSearchService {
private TransportClient client;
public ElasticSearchService{Settings settings = ImmutableSettings.settingsBuilder.put"cluster.name", "eshadoopcluster".build;this.client = new TransportClientsettings;client.addTransportAddressnew InetSocketTransportAddress"localhost", 9300;}上述代码在该类的构造函数中创建了一个TransportClient来与Elasticsearch集群建立连接。public List percolateMap map{List ids = new ArrayList;PercolateRequest request = new PercolateRequest;request.indices"es-storm";request.documentType"storm-tweets";ActionFuture responseFuture =client.percolaterequest.sourcemap;PercolateResponse response = responseFuture.actionGet;PercolateResponse.Match[] matches = response.getMatches;forPercolateResponse.Match match: matches{ids.addmatch.getId.toString;}return ids;}}percolate方法将文档以java.util.Map对象的形式传入PercolateRequest。PercolateResponse包含了与文档匹配的查询的id。现在我们调用percolate方法对推文进行分类,具体调用在classify方法中:private String classifyString tweet {StringBuilder categoriesBuilder = new StringBuilder;ElasticSearchService service = new ElasticSearchService;Map main = new HashMap;Map doc = new HashMap;doc.put"tweet",tweet;main.put"doc",doc;List ids = service.percolatemain;forString id :ids{categoriesBuilder.appendgetCategoryNameid " ";}return categoriesBuilder.toString;}
public String getCategoryNameString id {switch id {case "1":return "BigData";case "2":return "Relational Database";case "3":return "Sports";case "4":return "Agile";case "5":return "Business";default:return "Other";} }我们为将要索引在Elasticsearch中的文档添加category字段。前面的classify方法以推文文本作为输入参数,返回以空格分隔的该推文所属的所有类别的字符串。该方法中构造了一个Map对象作为percolate的输入参数。在getCategoryName方法中实现了percolate返回的类别id与类别名称的对应关系。你也可以重新运行修改过的Storm作业,确认一下新写入的文档是否包含了category字段、category字段中是否包含了类别名称信息。你可以使用如下查询查看最近一周的推文类别趋势变化情况:$ curl -XPOST d http:localhost:9200es-storm_search{ "size": 0, "query": {"range": { "time": {"gte": "now-1w","lte": "now" }} }, "aggs": {"significant_categories": { "significant_terms": {"field": "categories","size": 3 }} }} 如果你仍然想在Kibana中对趋势进行可视化展示,你需要在配置选项卡中重新加载索引的字段列表,让我们新增的category字段信息也体现在Kibana中。
小结在本章中,我们讨论了如何在本地环境中进行Storm的安装。我们使用Twitter趋势分析器作为示例介绍了如何对实时的流式数据进行实时分析。我们创建了Storm spout和bolt来获取并处理推文。我们还创建了Storm topology,在其中使用ES-Hadoop的EsBolt配置了spout和bolt,完成将推文导入Elasticsearch的过程。我们使用Elasticsearch中的significant term聚合进行趋势发现和异常检测。我们还使用Percolater存储的查询对文档进行分类。在下一章中,我们将进一步理解Elasticsearch和ES-Hadoop中的重要概念,这些概念包括分片、副本、数据托管和其他高级配置参数。理解这些概念和参数可以帮助我们更好地在生产环境中使用Elasticsearch和ES-Hadoop。

 

 

書城介紹  | 合作申請 | 索要書目  | 新手入門 | 聯絡方式  | 幫助中心 | 找書說明  | 送貨方式 | 付款方式 香港用户  | 台灣用户 | 大陸用户 | 海外用户
megBook.com.hk
Copyright © 2013 - 2024 (香港)大書城有限公司  All Rights Reserved.