之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个
思路:
网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是和上一次同步开始时间比较,这样不管数据多少,更新耗时多少都不会少数据,因为原则是同步不漏掉任何数据,也可以程序多开将时间差和间隔时间差异化,因为用mysql中一个id当作es中的id,也避免了重复数据
使用:
只需要按照escongif.py写配置文件,然后写sql文件,最后直接执行mstes.py就可以了,我这个也是参考logstash-input-jdbc的配置形式
MsToEs
|----esconfig.py(配置文件)
|----mstes.py(同步程序)
|----sql_manage.py(数据库管理)
|----aa.sql(需要用到sql文件)
|----bb.sql(需要用到sql文件)
sql_manage.py:
# -*-coding:utf-8 -*-__author__ = "ZJL"from sqlalchemy.pool import QueuePoolfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmaker, scoped_sessionimport tracebackimport esconfig# 用于不需要回滚和提交的操作def find(func): def wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except Exception as e: print(traceback.format_exc()) print(str(e)) return traceback.format_exc() finally: self.session.close() return wrapperclass MysqlManager(object): def __init__(self): mysql_connection_string = esconfig.mysql.get("mysql_connection_string") self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool, pool_recycle=3600) # self.DB_Session = sessionmaker(bind=self.engine) # self.session = self.DB_Session() self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False) self.db = scoped_session(self.DB_Session) self.session = self.db() @find def select_all_dict(self, sql, keys): a = self.session.execute(sql) a = a.fetchall() lists = [] for i in a: if len(keys) == len(i): data_dict = {} for k, v in zip(keys, i): data_dict[k] = v lists.append(data_dict) else: return False return lists # 关闭 def close(self): self.session.close()
aa.sql:
select CONVERT(c.`id`,CHAR) as id, c.`code` as code, c.`project_name` as project_name, c.`name` as name, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, from `cc` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
新闻热点
疑难解答