首页 > 编程 > Python > 正文

python消费kafka数据批量插入到es的方法

2020-01-04 13:41:23
字体:
来源:转载
供稿:网友

1、es的批量插入

这是为了方便后期配置的更改,把配置信息放在logging.conf中

用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2

from elasticsearch import Elasticsearch class ImportEsData:  logging.config.fileConfig("logging.conf")  logger = logging.getLogger("msg")  def __init__(self,hosts,index,type):    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)    self.index = index    self.type = type  def set_date(self,data):     # 批量处理     # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})    self.es.index(index=self.index,doc_type=self.index,body=data)

2、使用pykafka消费kafka

1.因为kafka是0.8,pykafka不支持zk,只能用get_simple_consumer来实现

2.为了实现多个应用同时消费而且不重消费,所以一个应用消费一个partition

3. 为是确保消费数据量在不满足10000这个批量值,能在一个时间范围内插入到es中,这里设置consumer_timeout_ms一个超时等待时间,退出等待消费阻塞。

4.退出等待消费阻塞后导致无法再消费数据,因此在获取self.consumer 的外层加入了while True 一个死循环

#!/usr/bin/python# -*- coding: UTF-8 -*-from pykafka import KafkaClientimport loggingimport logging.configfrom ConfigUtil import ConfigUtilimport datetimeclass KafkaPython:  logging.config.fileConfig("logging.conf")  logger = logging.getLogger("msg")  logger_data = logging.getLogger("data")  def __init__(self):    self.server = ConfigUtil().get("kafka","kafka_server")    self.topic = ConfigUtil().get("kafka","topic")    self.group = ConfigUtil().get("kafka","group")    self.partition_id = int(ConfigUtil().get("kafka","partition"))    self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))    self.consumer = None    self.hosts = ConfigUtil().get("es","hosts")    self.index_name = ConfigUtil().get("es","index_name")    self.type_name = ConfigUtil().get("es","type_name")  def getConnect(self):    client = KafkaClient(self.server)    topic = client.topics[self.topic]    p = topic.partitions    ps={p.get(self.partition_id)}    self.consumer = topic.get_simple_consumer(      consumer_group=self.group,      auto_commit_enable=True,      consumer_timeout_ms=self.consumer_timeout_ms,      # num_consumer_fetchers=1,      # consumer_id='test1',      partitions=ps      )    self.starttime = datetime.datetime.now()  def beginConsumer(self):    print("beginConsumer kafka-python")    imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)    #创建ACTIONS     count = 0    ACTIONS = []     while True:      endtime = datetime.datetime.now()      print (endtime - self.starttime).seconds      for message in self.consumer:        if message is not None:          try:            count = count + 1            # print(str(message.partition.id)+","+str(message.offset)+","+str(count))            # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))            action = {               "_index": self.index_name,               "_type": self.type_name,               "_source": message.value            }            ACTIONS.append(action)            if len(ACTIONS) >= 10000:              imprtEsData.set_date(ACTIONS)              ACTIONS = []              self.consumer.commit_offsets()              endtime = datetime.datetime.now()              print (endtime - self.starttime).seconds              #break          except (Exception) as e:            # self.consumer.commit_offsets()            print(e)            self.logger.error(e)            self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"/n")            # self.logger_data.error(message.value+"/n")          # self.consumer.commit_offsets()      if len(ACTIONS) > 0:        self.logger.info("等待时间超过,consumer_timeout_ms,把集合数据插入es")        imprtEsData.set_date(ACTIONS)        ACTIONS = []        self.consumer.commit_offsets()  def disConnect(self):    self.consumer.close()from elasticsearch import Elasticsearch from elasticsearch.helpers import bulkclass ImportEsData:  logging.config.fileConfig("logging.conf")  logger = logging.getLogger("msg")  def __init__(self,hosts,index,type):    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)    self.index = index    self.type = type  def set_date(self,data):     # 批量处理     success = bulk(self.es, data, index=self.index, raise_on_error=True)     self.logger.info(success) 

3、运行

if __name__ == '__main__':  kp = KafkaPython()  kp.getConnect()  kp.beginConsumer()  # kp.disConnect()

注:简单的写了一个从kafka中读取数据到一个list里,当数据达到一个阈值时,在批量插入到 es的插件

现在还在批量的压测中。。。

以上这篇python消费kafka数据批量插入到es的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持VEVB武林网。


注:相关教程知识阅读请移步到python教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表