在我之前发表的文章中,我提到我最近热衷于Complex Event PRocessing (CEP) (复杂事件处理)。简单来说,CEP把数据流作为输入,根据一系列预定义的规则,把数据(或部分数据)重定向给监听者们;又或者是当发现数据中的隐含的模式(Pattern)时,触发事件。在大量数据被产生出来并需要进行实时地分析的场景下,CEP特别有用。
import java.util.Date; public static class Tick { String symbol; Double price; Date timeStamp; public Tick(String s, double p, long t) { symbol = s; price = p; timeStamp = new Date(t); } public double getPrice() {return price;} public String getSymbol() {return symbol;} public Date getTimeStamp() {return timeStamp;} @Override public String toString() { return "Price: " + price.toString() + " time: " + timeStamp.toString(); } }
import com.espertech.esper.client.*; public class main { public static void main(String [] args){ //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); // We register Ticks as objects the engine will have to handle cepConfig.addEventType("StockTick",Tick.class.getName()); // We setup the engine EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine",cepConfig); }}
import java.util.Random;import com.espertech.esper.client.*; public class exampleMain { private static Random generator=new Random(); public static void GenerateRandomTick(EPRuntime cepRT){ double price = (double) generator.nextInt(10); long timeStamp = System.currentTimeMillis(); String symbol = "AAPL"; Tick tick= new Tick(symbol,price,timeStamp); System.out.println("Sending tick:" + tick); cepRT.sendEvent(tick);} public static void main(String[] args) { //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); cepConfig.addEventType("StockTick",Tick.class.getName()); EPServiceProvider cep=EPServiceProviderManager.getProvider("myCEPEngine",cepConfig); EPRuntime cepRT = cep.getEPRuntime(); }}
public static void main(String[] args) { //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); cepConfig.addEventType("StockTick",Tick.class.getName()); EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine",cepConfig); EPRuntime cepRT = cep.getEPRuntime(); // We register an EPL statement EPAdministrator cepAdm = cep.getEPAdministrator(); EPStatement cepStatement = cepAdm.createEPL("select * from " + "StockTick(symbol='AAPL').win:length(2) " + "having avg(price) > 6.0"); }
cepStatement.addListener(new CEPListener());
public static class CEPListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("Event received: " + newData[0].getUnderlying()); }}
for(int i = 0; i< 5; i++) GenerateRandomTick(cepRT);
import com.espertech.esper.client.*;import java.util.Random;import java.util.Date; public class exampleMain { public static class Tick { String symbol; Double price; Date timeStamp; public Tick(String s, double p, long t) { symbol = s; price = p; timeStamp = new Date(t); } public double getPrice() {return price;} public String getSymbol() {return symbol;} public Date getTimeStamp() {return timeStamp;} @Override public String toString() { return "Price: " + price.toString() + " time: " + timeStamp.toString(); } } private static Random generator = new Random(); public static void GenerateRandomTick(EPRuntime cepRT) { double price = (double) generator.nextInt(10); long timeStamp = System.currentTimeMillis(); String symbol = "AAPL"; Tick tick = new Tick(symbol, price, timeStamp); System.out.println("Sending tick:" + tick); cepRT.sendEvent(tick); } public static class CEPListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("Event received: " + newData[0].getUnderlying()); } } public static void main(String[] args) { //The Configuration is meant only as an initialization-time object. Configuration cepConfig = new Configuration(); cepConfig.addEventType("StockTick", Tick.class.getName()); EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine", cepConfig); EPRuntime cepRT = cep.getEPRuntime(); EPAdministrator cepAdm = cep.getEPAdministrator(); EPStatement cepStatement = cepAdm.createEPL("select * from " + "StockTick(symbol='AAPL').win:length(2) " + "having avg(price) > 6.0"); cepStatement.addListener(new CEPListener()); // We generate a few ticks... for (int i = 0; i < 5; i++) { GenerateRandomTick(cepRT); } }}
log4j:WARN No appenders could be found for logger (com.espertech.esper.epl.metric.MetricReportingPath).log4j:WARN Please initialize the log4j system properly.Sending tick:Price: 6.0 time: Tue Jul 21 01:11:15 CEST 2009Sending tick:Price: 0.0 time: Tue Jul 21 01:11:15 CEST 2009Sending tick:Price: 7.0 time: Tue Jul 21 01:11:15 CEST 2009Sending tick:Price: 4.0 time: Tue Jul 21 01:11:15 CEST 2009Sending tick:Price: 9.0 time: Tue Jul 21 01:11:15 CEST 2009Event received: Price: 9.0 time: Tue Jul 21 01:11:15 CEST 2009
import org.apache.log4j.ConsoleAppender;import org.apache.log4j.SimpleLayout;import org.apache.log4j.Level;import org.apache.log4j.Logger;//and this in the main function before the rest of your code:public static void main(String [] args){ SimpleLayout layout = new SimpleLayout(); ConsoleAppender appender = new ConsoleAppender(new SimpleLayout()); Logger.getRootLogger().addAppender(appender); Logger.getRootLogger().setLevel((Level) Level.WARN);(...)
下一篇文章中,我将更深入一些来探索EPL语句,提供一些代码来连接两个引擎实现所谓的“事件精化”(event refinement)(译者:好像之后作者再也没有更新过了,所以,不要指望后续了:))