首页 > 学院 > 开发设计 > 正文

FunDA(10)- 用户功能函数模式:User Function Model

2019-11-08 19:43:42
字体:
来源:转载
供稿:网友

   前面我们提过:FunDA就像一个管道(PipeLine)。管道内流动着一串数据(Data)或者运算指令(Action)。管道的源头就是能产生纯数据的数据源(Source),跟着在管道的中间会有一些节点(WorkNode),我们可以在这些节点施用(apply)用户提供的功能函数(Task)。用户功能函数可以截取并使用管道中流动的数据或者指令,然后利用一种水龙头开关机制(Valve)来影响流动元素:可以截住、直接传送、传送修改版本、插入新数据。作为FunDA的用户,需要掌握用户功能函数编写模式。我们先从一个简单的用户函数开始介绍:

//定义一个用户作业函数:列印数据,完全不影响数据流  def PRintAlbums: FDATask[FDAROW] = row => {    row match {      case album: Album =>        println("____________________")        println(s"品名:${album.title}")        println(s"演唱:${album.artist}")        println(s"年份:${album.year}")        println(s"发行:${album.publisher}")//原封不动直接传下去        fda_next(album)      case r@ _ => fda_next(r)    }  }上面这个用户函数的类型是FDATask[FDAROW],这是一个函数类型:

    //作业类型    type FDATask[ROW] = ROW => Option[List[ROW]]所以我们用lambda来代表函数内容:row => {函数功能}。lambda为用户函数提供了当前元素。我们用下面方式调用这个用户函数:

  val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _)  val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()//定义一个用户作业函数:列印数据,完全不影响数据流  def printAlbums: FDATask[FDAROW] = row => {    row match {      case album: Album =>        println("____________________")        println(s"品名:${album.title}")        println(s"演唱:${album.artist}")        println(s"年份:${album.year}")        println(s"发行:${album.publisher}")//原封不动直接传下去        fda_next(album)      case r@ _ => fda_next(r)    }  } albumStream.appendTask(printAlbums).startRun我们把用户函数printAlbums传入appendTask来对数据流进行施用。我们可以在appendTask后面再接一个用户函数,这个用户函数截取到的数据流元素是原装的数据源,因为在任何情况下printAlbums都会原封不动地把截获的元素用fda_next()传下去。运行一下下面这个就清楚了:

 albumStream.appendTask(printAlbums).appendTask(printAlbums).startRun相反情况我们只需要做下面的修改把fda_next替换成fda_skip就可以证实了:

//原封不动直接传下去        fda_skip//        fda_next(album)我们也可以根据当前元素情况生成一条FDAActionROW,它的定义是这样的:

  type FDAAction = DBIO[Int]  case class FDAActionRow(action: FDAAction) extends FDAROW  def fda_mkActionRow(action: FDAAction): FDAActionRow = FDAActionRow(action)  class FDAActionRunner(slickProfile: JdbcProfile) {    import slickProfile.api._    def fda_execAction(action: FDAAction)(slickDB: Database): Int =      Await.result(slickDB.run(action), Duration.Inf)  }  object FDAActionRunner {    def apply(slickProfile: JdbcProfile): FDAActionRunner = new FDAActionRunner(slickProfile)  }我们可以把一条FDAActionRow传下去:

  def updateYear: FDATask[FDAROW] = row => {      row match {        case album: Album => {          val updateAction = albums.filter(r => r.title === album.title)          .map(_.year)          .update(Some(2017))          fda_next(FDAActionRow(updateAction))        }        case others@ _ => fda_next(others)      }  }我们也可以把原数据同时传下去:

  def updateYear: FDATask[FDAROW] = row => {      row match {        case album: Album => {          val updateAction = albums.filter(r => r.title === album.title)          .map(_.year)          .update(Some(2017))          fda_next(FDAActionRow(updateAction))          fda_next(album)        }        case others@ _ => fda_next(others)      }  }我们需要FDAActionRunner来运算action:

val runner = FDAActionRunner(slick.driver.H2Driver)  def runActions: FDATask[FDAROW] = row => {    row match {      case FDAActionRow(action) =>        runner.fda_execAction(action)(db)        fda_skip      case others@ _ => fda_next(others)    }  }现在试试运转这个管道:
  albumStream.appendTask(updateYear).appendTask(runActions).appendTask(printAlbums).startRun实际上updateYear和runActions可以一步完成。但细化拆分功能就是函数式编程的一个特点,因为能够更自由的进行组合,这其中就包括了并行运算组合。

下面是这篇讨论的示范源代码:

package com.bayakala.funda.fdasources.examplesimport slick.driver.H2Driver.api._import com.bayakala.funda.fdasources.FDADataStream._import com.bayakala.funda.samples._import com.bayakala.funda.fdarows._import com.bayakala.funda.fdapipes._import FDAValves._import com.bayakala.funda.fdarows.FDARowTypes._import scala.concurrent.duration._object Example2 extends App {   val albums = SlickModels.albums   val companies = SlickModels.companies//数据源query   val albumsInfo = for {     (a,c) <- albums join companies on (_.company === _.id)   } yield (a.title,a.artist,a.year,c.name)//query结果强类型(用户提供)  case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW//转换函数(用户提供)  def toTypedRow(row: (String, String, Option[Int], String)): Album =    Album(row._1, row._2, row._3.getOrElse(2000), row._4)  val db = Database.forConfig("h2db")  val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _)  val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()//定义一个用户作业函数:列印数据,完全不影响数据流  def printAlbums: FDATask[FDAROW] = row => {    row match {      case album: Album =>        println("____________________")        println(s"品名:${album.title}")        println(s"演唱:${album.artist}")        println(s"年份:${album.year}")        println(s"发行:${album.publisher}")//原封不动直接传下去//        fda_skip        fda_next(album)      case r@ _ => fda_next(r)    }  }// albumStream.appendTask(printAlbums).appendTask(printAlbums).startRun  def updateYear: FDATask[FDAROW] = row => {      row match {        case album: Album => {          val updateAction = albums.filter(r => r.title === album.title)          .map(_.year)          .update(Some(2017))          fda_next(FDAActionRow(updateAction))          fda_next(album)        }        case others@ _ => fda_next(others)      }  }  val runner = FDAActionRunner(slick.driver.H2Driver)  def runActions: FDATask[FDAROW] = row => {    row match {      case FDAActionRow(action) =>        runner.fda_execAction(action)(db)        fda_skip      case others@ _ => fda_next(others)    }  }  albumStream.appendTask(updateYear).appendTask(runActions).appendTask(printAlbums).startRun}


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表