首页 > 学院 > 开发设计 > 正文

jdbc操作 数据库做同步,全量+增量,线程控制,批处理

2019-11-08 20:59:32
字体:
来源:转载
供稿:网友

此次更新在前边文章基础上做了优化 http://blog.csdn.net/gooooa/article/details/54615455


页面展示: 这里写图片描述

表现层代码:

package com.zntz.web.admin.controller.jdbc;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import java.util.Date;import java.util.List;import java.util.Map;import org.sPRingframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.ResponseBody;import com.zntz.web.admin.model.base.ResultModel;import com.zntz.web.admin.model.vo.Operate.SyncDataVo;import com.zntz.web.admin.service.operate.SyncDataService;import com.zntz.web.admin.shiro.LoginUserUtils;import com.zntz.web.admin.utils.common.page.Pagination;import com.zntz.web.admin.utils.tuple.BeanTransMap;@Controller@RequestMapping("/MySQL/syn")public class JdbcController { @Autowired SyncDataService syncDataService; /** * 进入列表页面 * @return */ @RequestMapping(value = "/list", method = RequestMethod.GET) public String list() { return "mysql/syn/syn_list"; } @ResponseBody @RequestMapping(value = "/list", method = RequestMethod.POST) public Pagination<Map<String, Object>> list(Map<String, Object> modelMap, SyncDataVo vo) { Pagination<Map<String, Object>> pager = new Pagination<Map<String, Object>>(); pager.setStartIndex(vo.getStart()); pager.setPageSize(vo.getLength()); Map<String, Object> map = BeanTransMap.transBean2Map(vo); pager.setCondition(map); pager = syncDataService.getSyncDataVoListByConditions(pager); return pager; } /**新建 * @param rolevo * @param errors * @return */ @RequestMapping(value = "/add", method = RequestMethod.GET) public String create(Map<String, Object> modelMap) { return "mysql/syn/syn_create"; } /**保存 * @param rolevo * @return */ @ResponseBody @RequestMapping(value = "/save", method = RequestMethod.POST) public ResultModel save(SyncDataVo syncDataVo) { ResultModel result = new ResultModel(); try{ if (syncDataVo.getId() == null) { syncDataVo.setIsAll(true); syncDataVo.setIncDate(new Date()); syncDataVo.setIsTimer(false); syncDataVo.setTimerContent(null); syncDataVo.setCreateTime(new Date()); syncDataVo.setModifyTime(new Date()); syncDataVo.setCreateUser(LoginUserUtils.getLoginUser().getUserCode()); syncDataVo.setModifyUser(LoginUserUtils.getLoginUser().getUserCode()); syncDataService.addSyncDataVo(syncDataVo); } else { syncDataVo.setModifyTime(new Date()); syncDataVo.setModifyUser(LoginUserUtils.getLoginUser().getUserCode()); syncDataService.updateSyncDataVo(syncDataVo); } }catch(Exception e){ result.setCode("1"); } return result; } /** 修改 * @param id * @param modelMap * @return */ @RequestMapping(value = "/edit/{id}", method = RequestMethod.GET) public String edit(@PathVariable Long id, Map<String, Object> modelMap) { SyncDataVo vo = syncDataService.getSyncDataVoById(id); modelMap.put("syn", vo); return "mysql/syn/syn_edit"; } /**删除 * @param allroleids * @return */ @ResponseBody @RequestMapping(value = "/delItem", method = RequestMethod.POST) public ResultModel delItem(Long id) { ResultModel result = new ResultModel(); Integer delresult = syncDataService.deleteSyncDataVo(id); if (delresult == null || delresult < 1) result.setSuccess(false); return result; } @ResponseBody @RequestMapping("/syncData") public ResultModel syncData(Long id){ ResultModel model=new ResultModel(); SyncDataVo syncData = syncDataService.getSyncDataVoById(id); try { String dbsource=syncData.getDbSource(); String dbdestination=syncData.getDbDestination(); String source="jdbc:mysql://localhost:3306/"+dbsource+"?user=root&passWord=123456&useUnicode=true&characterEncoding=UTF8"; String destination="jdbc:mysql://localhost:3306/"+dbdestination+"?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"; SynchronizationController.init(source, destination,syncData.getIsAll(),syncData.getIncDate()); } catch (Exception e) { model.setMessage("系统异常"); model.setData(e); model.setCode("1"); } return model; } @ResponseBody @RequestMapping("/testConnection") public ResultModel testConnection(String url){ ResultModel model=new ResultModel(); String url_source=url; try { Connection con=DriverManager.getConnection(url_source); con.close(); } catch (SQLException e) { model.setCode("1"); model.setMessage("数据库连接异常,请确认连接是否正确"); } return model; }}

数据库同步的业务处理:

package com.zntz.web.admin.controller.jdbc;import java.sql.Connection;import java.sql.DatabaseMetaData;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import java.sql.SQLException;import java.sql.Statement;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.Map;import java.util.Set;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.zntz.web.admin.model.constant.Constant;public class SynchronizationController{ //数据源库// static String url_source="jdbc:mysql://localhost:3306/zntz?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"; //目标库// static String url_destination="jdbc:mysql://localhost:3306/xx02?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"; //同步源// static String url_source="";//"jdbc:mysql://localhost:3306/zntz?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"; //目标库// static String url_destination="";//"jdbc:mysql://localhost:3306/oil?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"; private static Logger logger=LoggerFactory.getLogger(SynchronizationController.class); private static Connection conn_source = null; private static Connection conn_destination = null; private static String url_source = ""; private static String url_destination = ""; static{ try { Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } } /** * * @param url_source 源数据库连接 * @param url_destination 目标数据库连接 * @param is_all 是否全量更新 * @throws InterruptedException * @throws SQLException */ public static void init(String source,String destination,boolean is_all,Date date) throws InterruptedException, SQLException{ url_source= source; url_destination=destination; try { conn_source = DriverManager.getConnection(url_source); conn_destination= DriverManager.getConnection(url_destination); synchronizationTables(); addData(is_all,date); logger.info("*******本次导入结束********"); } catch (SQLException e) { e.printStackTrace(); } finally { while(true){ if(InsertThread.getThreadCounts()>0){ Thread.sleep(1000); }else{ break; } } conn_source.close(); conn_destination.close(); } } //本地获取表名获取表名 public static Set<String> getTableName() { Set<String> set = new HashSet<String>(); try { DatabaseMetaData meta = conn_source.getMetaData(); ResultSet rs = meta.getTables(null, null, null,new String[] { "TABLE" }); while (rs.next()) { set.add(rs.getString("TABLE_NAME"));// String s = rs.getString("TABLE_NAME");// String type = rs.getString("TABLE_TYPE");// System.out.println(s+"======"+type);// getTableDDL(rs.getString("TABLE_NAME"), con); } } catch (Exception e) { logger.error(""+e); } return set; } //目标数据库 public static Map<String,String> getTableNameToMap() { Map<String,String> map=new HashMap<String,String>(); try { DatabaseMetaData meta = conn_destination.getMetaData(); ResultSet rs = meta.getTables(null, null, null,new String[] { "TABLE" }); while (rs.next()) { map.put(rs.getString("TABLE_NAME"),"1"); } } catch (Exception e) { logger.error(""+e); } return map; } //创建表 public static void createTable(String sql_ddl) throws SQLException { Statement stmt = conn_destination.createStatement(); int result = stmt.executeUpdate(sql_ddl);// executeUpdate语句会返回一个受影响的行数,如果返回-1就没有成功 if (result != -1) { logger.info("表创建成功"); }else{ logger.error("表创建失败:"+sql_ddl); } } //创建sql public static String getTableField(String tableName) throws SQLException{ String sql = "select * from "+tableName +" limit 1,2"; Statement state = conn_source.createStatement(); ResultSet rs = state.executeQuery(sql); ResultSetMetaData rsd = rs.getMetaData() ; StringBuffer sql_model=new StringBuffer("insert into "+ tableName +" ("); StringBuffer sql_param=new StringBuffer(" VALUES("); for(int i = 1; i <= rsd.getColumnCount(); i++) { sql_model.append(rsd.getColumnName(i)); sql_param.append("?"); if (i < rsd.getColumnCount()) { sql_model.append(","); sql_param.append(","); } } sql_model.append(") ");sql_param.append(") "); logger.info(sql_model.toString()+sql_param.toString()); return sql_model.toString()+sql_param.toString(); } //创建增量同步的sql public static String getTableField2(String tableName,Date date) throws SQLException, Exception{ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateString = formatter.format(date); String sql = "select * from "+tableName +" where modify_time > '"+dateString+"'"; System.out.println("----------------------------"+sql); Statement state = conn_source.createStatement(); ResultSet rs = state.executeQuery(sql); ResultSetMetaData rsd = rs.getMetaData() ; StringBuffer sql_model=new StringBuffer("insert into "+ tableName +" ("); StringBuffer sql_param=new StringBuffer(" VALUES("); for(int i = 1; i <= rsd.getColumnCount(); i++) { sql_model.append(rsd.getColumnName(i)); sql_param.append("?"); if (i < rsd.getColumnCount()) { sql_model.append(","); sql_param.append(","); } } sql_model.append(") ");sql_param.append(") "); logger.info(sql_model.toString()+sql_param.toString()); return sql_model.toString()+sql_param.toString(); } //获取表结构ddl public static String getTableDDL(String tableName) throws SQLException{ ResultSet rs = null; PreparedStatement ps = null; ps = conn_source.prepareStatement("show create table "+tableName); rs = ps.executeQuery(); StringBuffer ddl=new StringBuffer(); while (rs.next()) { ddl.append(rs.getString(rs.getMetaData().getColumnName(2))); } return ddl.toString(); } /** * 检查本地库所有表在B库里是否存在,是否一致 * A本地库 B目标库 */ public static void synchronizationTables(){ Set<String> a_set=getTableName(); Map<String,String> b_map=getTableNameToMap(); Iterator<String> it=a_set.iterator(); while(it.hasNext()){ String n=it.next(); if(b_map.get(n)==null){ logger.info("表名:"+n+" 不在目标库里"); String create_table_ddl=""; try { create_table_ddl = getTableDDL(n); createTable(create_table_ddl); } catch (SQLException e) { logger.error(create_table_ddl+"--------"+e); } }else { clearTableData(n); } } } //清除表数据 public static boolean clearTableData(String tableName){ try { Statement stmt = conn_destination.createStatement(); String sql = "TRUNCATE TABLE "+tableName; stmt.executeUpdate(sql); logger.info(tableName+":表数据已被清空"); } catch (SQLException e) { logger.error("异常表:"+tableName+"----数据清空失败"); logger.error(""+e); return false; } return true; } /** * * @param conn_source * @param conn_destination * @param is_all 是否全量 * @throws SQLException * @throws InterruptedException */ public static void addData(boolean is_all,Date date) throws SQLException{ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateString = formatter.format(date); Statement stmt_source = conn_source.createStatement(); Set<String> tableNameSet=getTableName(); Iterator<String> it = tableNameSet.iterator(); //遍历表 while (it.hasNext()) { long start = System.currentTimeMillis(); String str = it.next(); if(is_all){ try{ while(true){ if(InsertThread.getThreadCounts()>0){ Thread.sleep(1000); }else{ break; } } String sql_insert=getTableField(str); //获取总条数 分页查询 long count_start=System.currentTimeMillis(); logger.info("-------------------------------------------------------------------"); String sql_count="select count(*) from "+ str; ResultSet rs = stmt_source.executeQuery(sql_count); rs.next(); int totalCount=rs.getInt(1); long count_end=System.currentTimeMillis(); logger.info("查询记录数耗时:"+(count_end-count_start)/1000); if(totalCount>500000) logger.info("xxxxxxxxxxxxxxx start implement table:"+str+",共"+totalCount+"条 xxxxxxxxxxxxxxxxxxxxxxxxxxx"); if(totalCount> Constant.pageSize){ int max=totalCount%Constant.pageSize==0 ? totalCount/Constant.pageSize : totalCount/Constant.pageSize+1; for(int i=0;i<max;i++){ synchronized (InsertThread.class) { String sql_data="select * from "+str+" limit "+ i*Constant.pageSize + " , "+Constant.pageSize; System.out.println("==================="+sql_data); int tCount = InsertThread.getThreadCounts(); while (tCount >= Constant.max_thread_size) { logger.info("系统当前线程数为:" + tCount+ ",已达到最大线程数 "+Constant.max_thread_size+",请等待其他线程执行完毕并释放系统资源"); InsertThread.class.wait(); tCount = InsertThread.getThreadCounts(); } // 重新启动一个子线程 Thread td = new InsertThread(sql_data, sql_insert, url_destination, url_source); td.start(); logger.info("已创建新的子线程: " + td.getName()); } } }else{ String sql_data="select * from "+str; System.out.println("=================="+sql_data); Thread td = new InsertThread(sql_data, sql_insert, url_destination, url_source); td.start(); logger.info("已创建新的子线程: " + td.getName()); } long end = System.currentTimeMillis(); logger.warn("表"+str+" ===================<<<全量>>>数据导入完成,耗时:"+(end-start)/1000+"秒,"+(end-start)/60000+"分钟 ========================="); }catch(Exception e){ logger.error(e+""); } }else { try{ while(true){ if(InsertThread.getThreadCounts()>0){ Thread.sleep(1000); }else{ break; } } String sql_insert=getTableField(str); //获取总条数 分页查询 long count_start=System.currentTimeMillis(); logger.info("-------------------------------------------------------------------"); String sql_count="select count(*) from "+ str; ResultSet rs = stmt_source.executeQuery(sql_count); rs.next(); int totalCount=rs.getInt(1); long count_end=System.currentTimeMillis(); logger.info("查询记录数耗时:"+(count_end-count_start)/1000); if(totalCount>500000) logger.info("xxxxxxxxxxxxxxxxx start implement table:"+str+",共"+totalCount+"条 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); if(totalCount> Constant.pageSize){ int max=totalCount%Constant.pageSize==0 ? totalCount/Constant.pageSize : totalCount/Constant.pageSize+1; for(int i=0;i<max;i++){ synchronized (InsertThread.class) { String sql_data="select * from "+str+" where modify_time > '"+dateString+"' "+" limit "+ i*Constant.pageSize + " , "+Constant.pageSize; System.out.println("==================="+sql_data); int tCount = InsertThread.getThreadCounts(); while (tCount >= Constant.max_thread_size) { logger.info("系统当前线程数为:" + tCount+ ",已达到最大线程数 "+Constant.max_thread_size+",请等待其他线程执行完毕并释放系统资源"); InsertThread.class.wait(); tCount = InsertThread.getThreadCounts(); } // 重新启动一个子线程 Thread td = new InsertThread(sql_data, sql_insert, url_destination, url_source); td.start(); logger.info("已创建新的子线程: " + td.getName()); } } }else{ String sql_data="select * from "+str+" where modify_time > '"+dateString+"'"; System.out.println("=================="+sql_data); Thread td = new InsertThread(sql_data, sql_insert, url_destination, url_source); td.start(); logger.info("已创建新的子线程: " + td.getName()); } long end = System.currentTimeMillis(); logger.warn("表"+str+" ===================<<<增量>>>数据导入完成,耗时:"+(end-start)/1000+"秒,"+(end-start)/60000+"分钟 ========================="); }catch(Exception e){ logger.error(e+""); } } } }}

线程处理:

package com.zntz.web.admin.controller.jdbc;import java.sql.BatchUpdateException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import java.sql.SQLException;import java.sql.Statement;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.zntz.web.admin.model.constant.Constant;public class InsertThread extends Thread { private Logger logger=LoggerFactory.getLogger(InsertThread.class); private String sql_data; private String sql_insert; private String url_destination; private String url_source; private Connection conA; private Connection conB; // 线程计数器 static private int threadCounts; // 线程名称池 static private String threadNames[]; static { // 假设这里允许系统同时运行最大线程数为10个 threadNames = new String[Constant.max_thread_size]; // 初始化线程名称池 for (int i = 1; i <= Constant.max_thread_size; i++) { threadNames[i - 1] = "子线程_" + i; } } public InsertThread() { // 临界资源锁定 synchronized (InsertThread.class) { // 线程总数加1 threadCounts++; // 从线程名称池中取出一个未使用的线程名 for (int i = 0; i < threadNames.length; i++) { if (threadNames[i] != null) { String temp = threadNames[i]; // 名被占用后清空 threadNames[i] = null; // 初始化线程名称 this.setName(temp); break; } } } } public void run() { try { conA=DriverManager.getConnection(url_source); conB= DriverManager.getConnection(url_destination); conB.setAutoCommit(false); Long start = System.currentTimeMillis(); Statement stmt_source = conA.createStatement(); ResultSet rs_sql_data = stmt_source.executeQuery(sql_data); ResultSetMetaData rsmd = rs_sql_data.getMetaData(); PreparedStatement ps = conB.prepareStatement(sql_insert); int columnCount=rsmd.getColumnCount(); int count=1; while (rs_sql_data.next()) { count++; for(int k=1;k<=columnCount;k++){ ps.setString(k, rs_sql_data.getString(k)); } ps.addBatch(); if(count % Constant.batchSize == 0) { myBatchUpdate(ps); } } myBatchUpdate(ps); Long end = System.currentTimeMillis(); logger.info(this.getName()+",耗时:"+(end-start)/1000 + "秒"); stmt_source.close(); rs_sql_data.close(); ps.close(); } catch (Exception e) { logger.error(""+e); } finally { synchronized (InsertThread.class) { // 释放线程名称 String[] threadName = this.getName().split("_"); // 线程名使用完后放入名称池 threadNames[Integer.parseInt(threadName[1]) - 1] = this.getName(); // 线程运行完毕后减1 threadCounts--; /* * 通知其他被阻塞的线程,但如果其他线程要执行,则该同步块一定要运行结束(即直 * 到释放占的锁),其他线程才有机会执行,所以这里的只是唤醒在此对象监视器上等待 * 的所有线程,让他们从等待池中进入对象锁池队列中,而这些线程重新运行时它们一定 * 要先要得该锁后才可能执行,这里的notifyAll是不会释放锁的,试着把下面的睡眠语 * 句注释去掉,即使你已调用了notify方法,发现CreateThread中的同步块还是好 * 像一直处于对象等待状态,其实调用notify方法后,CreateThread线程进入了对象锁 * 池队列中了,只要它一获取到锁,CreateThread所在线程就会真真的被唤醒并运行。 */ InsertThread.class.notifyAll(); logger.info("----" + this.getName() + " 所占用资源释放完毕,当前系统正在运行的子线程数:"+ threadCounts); try { conA.close(); conB.close(); } catch (SQLException e) { logger.error("关闭连接异常"); } } } } static public int getThreadCounts() { synchronized (InsertThread.class) { return threadCounts; } } public InsertThread(String sql_data, String sql_insert, String url_destination, String url_source) { super(); this.sql_data = sql_data; this.sql_insert = sql_insert; this.url_destination = url_destination; this.url_source = url_source; // 临界资源锁定 synchronized (InsertThread.class) { // 线程总数加1 threadCounts++; // 从线程名称池中取出一个未使用的线程名 for (int i = 0; i < threadNames.length; i++) { if (threadNames[i] != null) { String temp = threadNames[i]; // 名被占用后清空 threadNames[i] = null; // 初始化线程名称 this.setName(temp); break; } } } } public void myBatchUpdate(PreparedStatement ps){ try { ps.executeBatch(); conB.commit(); }catch (BatchUpdateException e) { for (int i = 0; i < e.getUpdateCounts().length; i++) { if(e.getUpdateCounts()[i]<0){ logger.error(sql_insert+"*********"+e.getUpdateCounts()[i] + "*********" +e.getMessage()+ "*********"+e.getErrorCode()+ "*********"+e.getSQLState()); } } } catch (SQLException e) { logger.error(""+e); } }}
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表