首页 > 开发 > Java > 正文

详解spring batch的使用和定时器Quart的使用

2024-07-13 10:10:38
字体:
来源:转载
供稿:网友

spring Batch是一个基于Spring的企业级批处理框架,它通过配合定时器Quartz来轻易实现大批量的数据读取或插入,并且全程自动化,无需人员管理。

在使用spring batch之前,得对spring batch的流程有一个基本了解

每个batch它都包含了一个job,而一个job中却有可能包含多个step,整个batch中干活的是step,batch主要是用来对数据的操作,所以step就有三个操作数据的东西,一个是ItemReader用来读取数据的,一个是ItemProcessor用来处理数据的,一个是ItemWriter用来写数据(可以是文件也可以是插入sql语句),JobLauncher用来启动Job,JobRepository是上述处理提供的一种持久化机制,它为JobLauncher,Job,和Step实例提供CRUD操作。

pom.xml  三个batch的jar包

<dependency>      <groupId>org.springframework</groupId>      <artifactId>spring-batch-core</artifactId>      <version>2.1.8.RELEASE</version>     </dependency>          <dependency>      <groupId>org.springframework</groupId>      <artifactId>spring-batch-infrastructure</artifactId>      <version>2.1.8.RELEASE</version> <dependency>           <dependency>      <groupId>org.springframework</groupId>      <artifactId>spring-batch-test</artifactId>      <version>2.1.8.RELEASE</version>     </dependency>  

batch.xml

<beans xmlns="http://www.springframework.org/schema/beans"   xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   xsi:schemaLocation="http://www.springframework.org/schema/batch     http://www.springframework.org/schema/batch/spring-batch-2.1.xsd     http://www.springframework.org/schema/beans      http://www.springframework.org/schema/beans/spring-beans-3.1.xsd   ">     <bean id="jobLauncher"     class="org.springframework.batch.core.launch.support.SimpleJobLauncher">     <property name="jobRepository" ref="jobRepository" />   </bean>    <bean id="jobRepository"     class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">     <property name="validateTransactionState" value="false" />    </bean> <!--一个job-->     <batch:job id="writerteacherInterview">         <batch:step id="teacherInterview">           <batch:tasklet>             <batch:chunk reader="jdbcItemReaderTeacherInterview" writer="teacherInterviewItemWriter"               processor="teacherInterviewProcessor" commit-interval="10">             </batch:chunk>           </batch:tasklet>         </batch:step>       </batch:job>        <!--job的读取数据操作-->   <bean id="jdbcItemReaderTeacherInterview"     class="org.springframework.batch.item.database.JdbcCursorItemReader"     scope="step">     <property name="dataSource" ref="dataSource" />     <property name="sql"       value="select distinct teacherName ,count(teacherName) as num from examininterviewrecord  where pdate >'${detail_startime}' and pdate < '${detail_endtime}' GROUP BY teacherName " />     <property name="rowMapper" ref="teacherInterviewMapper">     </property>   </bean>   </beans> 

读取数据    teacherInterviewMapper

package com.yc.batch;  import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Component; import com.yc.vo.TeacherInterviewdetail; import com.yc.vo.TeacherWorkdetail; import com.yc.vo.Workdetail; @Component("teacherInterviewMapper")  public class TeacherInterviewMapper implements RowMapper {    @Override   public Object mapRow(ResultSet rs, int rowNum) throws SQLException {           TeacherInterviewdetail TId=new TeacherInterviewdetail();     TId.setTeacherName(rs.getString("teacherName"));      TId.setNum(rs.getInt("num"));     return TId;    } }  

处理数据  teacherInterviewProcessor ,这个处理数据方法,一般都是在这里在这里进行一些数据的加工,比如有些数据没有读到,你也可以在这个方法和后面那个写入数据的类里面写,所以就导致了这个类里面你可以什么都不敢,直接把数据抛到后面去,让后面的写数据类来处理;我这里就是处理数据的这个类什么都没写,但是最好还是按它的规则来!

package com.yc.batch;  import org.hibernate.engine.transaction.jta.platform.internal.SynchronizationRegistryBasedSynchronizationStrategy; import org.springframework.batch.item.ItemProcessor; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service;  import com.yc.vo.TeacherInterviewdetail; import com.yc.vo.TeacherWorkdetail; import com.yc.vo.Workdetail;   //业务层 @Component("teacherInterviewProcessor") public class TeacherInterviewProcessor implements ItemProcessor<TeacherInterviewdetail, TeacherInterviewdetail> {    @Override   public TeacherInterviewdetail process(TeacherInterviewdetail teacherInterviewdetail) throws Exception {           return teacherInterviewdetail;   } } 

写数据 teacherInterviewItemWriter 这个类里面主要是把数据写进一个文件里,同时我这个类里面还有一些数据处理

package com.yc.batch;  import java.io.InputStream;  import java.text.NumberFormat; import java.util.ArrayList; import java.util.List; import java.util.Properties; import javax.annotation.Resource; import org.springframework.batch.item.ItemWriter; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import com.yc.biz.ExamineeClassBiz; import com.yc.biz.WorkBiz; import com.yc.utils.CsvUtils; import com.yc.vo.TeacherInterviewdetail; import com.yc.vo.TeacherWorkdetail; import com.yc.vo.Workdetail; import net.sf.ehcache.util.PropertyUtil; //写 @Component("teacherInterviewItemWriter") public class TeacherInterviewItemWriter implements ItemWriter<TeacherInterviewdetail>{    @Override   public void write(List<? extends TeacherInterviewdetail> teacherInterviewdetails) throws Exception {     Properties props = new Properties();     InputStream in= PropertyUtil.class.getClassLoader().getResourceAsStream("connectionConfig.properties");     props.load(in);     String time=props.getProperty("detail_time");     CsvUtils cu=new CsvUtils();      List<Object> works=new ArrayList<Object>();      for(TeacherInterviewdetail t:teacherInterviewdetails){        works.add(t);      }           String path=this.getClass().getResource("/").getPath();     path=path.substring(0,path.lastIndexOf("/"));     path=path.substring(0,path.lastIndexOf("/"));     path=path.substring(0,path.lastIndexOf("/"));     path=path.substring(0,path.lastIndexOf("/"));     cu.writeCsv(path+"/csv/teacherInterview_"+time+".csv",works );    }  } 

我这里有用到一个吧数据写进CSV文件的jar包

<dependency>      <groupId>net.sourceforge.javacsv</groupId>      <artifactId>javacsv</artifactId>      <version>2.0</version>     </dependency> 

CsvUtils帮助类的写入CSV文件方法

/**    * 写入CSV文件    * @throws IOException    */    public void writeCsv(String path,List<Object> t) throws IOException{      String csvFilePath = path;     String filepath=path.substring(0,path.lastIndexOf("/"));     File f=new File(filepath);     if(!f.exists()){       f.mkdirs();     }     File file=new File(path);     if(!file.exists()){       file.createNewFile();     }     CsvWriter wr =new CsvWriter(csvFilePath,',',Charset.forName("GBK"));     try {        for(Object obj:t){         String[] contents=obj.toString().split(",");         wr.writeRecord(contents);        }       wr.close();      } catch (IOException e) {        e.printStackTrace();      }    }  

就这样一个基本的batch流程就跑起来了,它通过从数据里读取一些数据,然后经过处理后,被存进服务器下的一个文件里面,之后像这种数据的读取就不需要去数据库里面查询了,而是可以直接通过读取CSV文件来处理这个业务。一般使用这个的都会配一个定时器,让它们每隔一段时间跑一次,从而获得较新的数据

下面是定时器的配置

定时器的配置非常简单,我是使用注解方式来配置的

定时器任务类

package com.yc.task.impl; import javax.transaction.Transactional; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.batch.item.ItemProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service;  import com.yc.batch.ClassBatch; import com.yc.batch.MessageItemBatch; import com.yc.batch.TeacherInterviewBatch; import com.yc.batch.TearcherBatch; import com.yc.po.Work; import com.yc.task.WorkTask; import com.yc.vo.Workdetail; @Service public class WorkTaskImpl implements WorkTask{    @Autowired   private TeacherInterviewBatch teacherInterviewBatch;//教师访谈记录   public void setTeacherInterviewBatch(TeacherInterviewBatch teacherInterviewBatch) {     this.teacherInterviewBatch = teacherInterviewBatch;   }      @Scheduled(cron= "0 30 22 * * ?")  //每天晚上十点30执行一次 这个注解会让框架会自动把这个方法看成任务启动方法    @Override   public void task() {     try {       teacherInterviewBatch.test();//教师访谈     } catch (Exception e) {       e.printStackTrace();     }        }  } 

定时器所真正要执行的方法

package com.yc.batch;  import javax.annotation.Resource; import org.apache.commons.jexl2.Main; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;  @Component public class TeacherInterviewBatch {    private Job job;   private JobLauncher launcher;    @Resource(name="writerteacherInterview")   public void setJob(Job job) {     this.job = job;   }    @Autowired    public void setLauncher(JobLauncher launcher) {     this.launcher = launcher;   } 

  以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持VeVb武林网。


注:相关教程知识阅读请移步到JAVA教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表