www.ysbm.net > kAFkA spArkstrEAming

kAFkA spArkstrEAming

我这边的应用是这样的: ①采集程序:使用avro方式将自定义对象序列化成字节流存入Kafka ②spark streaming:获取Kafka中的字节流,使用avro反序列化为自定义对象

前面应该还有个数据生产者,比如flume. flume负责生产数据,发送至kafka. spark streaming作为消费者,实时的从kafka中获取数据进行计算. 计算结果保存至redis,供实时推荐使用. flume+kafka+spark+redis是实时数据收集与计算的一套经典架构

spark streaming接收kafka数据 用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构dstream.接收数据的方式有两种:1.利用receiver接收数据,2.直接从kafka读取数据.基于receiver的方式 这种方式利用接收器(receiver)来接收kafka中的数据,其最基本是使用kafka高阶用户api接口.对于所有的接收器,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据.如下图:

spark streaming从1.2开始提供了数据的零知丢失,想享受这个特性,道需要满足如下条件回: 1.数据输入需要可靠的答sources和可靠的receivers 2.应用metadata必须通过应用driver checkpoint 3.WAL(write ahead log)

日志采集.线上数据一般主要是落地文件或者通过socket传输给另外一个系统.这种情况下,你很难推动线上应用或服务去修改接口,直接向kafka里写数据.这时候你可能就需要flume这样的系统帮你去做传输.

曾经试过了用 spark streaming 读取 logstash 启动的 TCP Server 的数据.不过如果你有多台 logstash 的时候,这种方式就比较难办了 即使你给 logstash 集群申请一个 VIP,也很难确定说转发完全符合.所以一般来说,更多的选择是采用 kafka 等队列方式由 spark streaming 去作为订阅者获取数据.

driect方式的sparkstreaming kafka需要zookeeperKafkaUtils.createDstream构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的

应该是jar包导入的问题,不同的jar包有相同的方法参数导致的冲突

解决的方法是:分别从Kafka中获得某个Topic当前每个partition的offset,再从Zookeeper中获得某个consumer消费当前Topic中每个partition的offset,最后再这两个根据项目情况进行合并,就可以了.一、具体实现1、程序实现,如下:public

邮箱发给你

网站地图

All rights reserved Powered by www.ysbm.net

copyright ©right 2010-2021。
www.ysbm.net内容来自网络,如有侵犯请联系客服。zhit325@qq.com