从上面多篇的讨论中我们了解到scalaz-stream代表一串连续无穷的数据或者程序。对这个数据流的处理过程就是一个状态机器(state
machine)的状态转变过程。这种模式与我们通常遇到的程序流程很相似:通过程序状态的变化来推进程序进展。传统OOP式编程可能是通过一些全局变量来记录当前程序状态,而FP则是通过函数组合来实现状态转变的。这个FP模式讲起来有些模糊和抽象,但实际上通过我们前面长时间对FP编程的学习了解到FP编程讲究避免使用任何局部中间变量,更不用说全局变量了。FP程序的数据A是包嵌在算法F[A]内的。FP编程模式提供了一整套全新的数据更新方法来实现对F[A]中数据A的操作。对许多编程人员来讲,FP的这种编程方式会显得很别扭、不容易掌握。如果我们仔细观察分析,会发觉scalaz-stream就是一种很好的FP编程工具:它的数据也是不可变的(immutable),并且是包嵌在高阶类型结构里的,是通过Process状态转变来标示数据处理过程进展的。scalaz-stream的数据处理是有序流程,这样可以使我们更容易分析理解程序的运算过程,它的三个大环节包括:数据源,数据传换(transducer)及数据终点(Sink/Channel)可以很形象地描绘一个程序运算的全过程。scalaz-stream在运算过程中的并行运算方式(parallel
computaion)、安全资源使用(resource safety)和异常处理能力(exception
handling)是实现泛函多线程编程最好的支持。我们先来看看scalaz-stream里的一个典型函数:

   
scalaz-stream是一个泛函数据流配件库(functional stream combinator
library),特别适用于函数式编程。scalar-stream是由一个以上各种状态的Process串联组成。stream代表一连串的元素,可能是自动产生或者由外部的源头输入,如:一连串鼠标位置;文件中的文字行;数据库记录;又或者一连串的HTTP请求等。Process就是stream转换器(transducer),它可以把一种stream转换成另一种stream。Process的类型款式如下:

/**   * Await the given `F` request and use its result.   * If you need to specify fallback, use `awaitOr`   */  def await[F[_], A, O](req: F[A])(rcv: A => Process[F, O]): Process[F, O] =    awaitOr(Halt.apply)/**   * Await a request, and if it fails, use `fb` to determine the next state.   * Otherwise, use `rcv` to determine the next state.   */  def awaitOr[F[_], A, O](req: F[A])(fb: EarlyCause => Process[F, O])(rcv: A => Process[F, O]): Process[F, O] =    Await(req,(r: EarlyCause / A) => Trampoline.delay(Try(r.fold)
sealed trait Process[+F[_], +O]

这个await函数可以说是一个代表完整程序流程的典范。注意,awaitOr里的Await是个数据结构。这样我们在递归运算await时可以避免StackOverflowError的发生。req:
F[A]代表与外界交互的一个运算,如从外部获取输入、函数rcv对这个req产生的运算结果进行处理并设定程序新的状态。

其中F是个高阶类,是一种算法,O是Process的运算值。从类型款式上看Process是个对O类型值进行F运算的节点,那么scalaz-stream就应该是个运算流了。Process包含以下几种状态:

1 import scalaz.stream._2 import scalaz.concurrent._3 object streamApps {4 import Process._5   def getInput: Task[Int] = Task.delay { 3 }      //> getInput: => scalaz.concurrent.Task[Int]6   val prg = await(i => emit(i * 3))     //> prg  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@4973813a,<function1>,<function1>)7   prg.runLog.run                                  //> res0: Vector[Int] = Vector8 }
case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O]

case class Await[+F[_], A, +O](
    req: F[A]
    , rcv: (EarlyCause / A) => Trampoline[Process[F, O]] @uncheckedVariance
    , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing])
    ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] {
...
}
case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing]

case class Append[+F[_], +O](
    head: HaltEmitOrAwait[F, O]
    , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance
    ) extends Process[F, O] {
...
}   

这是一个一步计算程序。我们可以再加一步:

scalaz-stream是个主动读取模式的流(pull
model stream),Process转换stream的方式不是以Stream[I] =>
Stream[O]这种函数方式,而是一种状态转换方式进行(state
transition),所以这些状态就等于向一个驱动程序发出的请求:

1  val add10 = await1[Int].flatMap{i => emit(i + 10)}2                                                   //> add10  : scalaz.stream.Process[[x]scalaz.stream.Process.Env[Int,Any]#Is[x],Int] = Await(Left,<function1>,<function1>)3   val prg1 = await(i => emit(i * 3) |> add10)4                                                   //> prg1  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@6737fd8f,<function1>,<function1>)5   prg1.runLog.run                                 //> res0: Vector[Int] = Vector

Emit[+O]:请求发一个O值

add10是新增的一个运算步骤,是个transducer所以调用了Process1的函数await1,并用pipe来连接。实际上我们可以用组合方式把add10和prg组合起来:

Await[+F[_],A,+O]:要求运算F[A],得出F[A]的结果A后输入函数rcv再运算得出下一个Process状态。这个是flatMap函数的结构化版本

1 val prg3 = prg |> add10                         //> prg3  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append ,Vector(<function1>))2   prg3.runLog.run                               //> res1: Vector[Int] = Vector

Halt:停止发送

我们同样可以增加一步输出运算:

Append:连接前后两个Process

1  val outResult: Sink[Task,Int] = sink.lift { i => Task.delay{println(s"the result is: $i")}}2                                                   //> outResult  : scalaz.stream.Sink[scalaz.concurrent.Task,Int] = Append(Emit(Vector(<function1>)),Vector(<function1>))3   val prg4 = prg1 to outResult                    //> prg4  : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],Unit] = Append,Vector(<function1>, <function1>))4   prg4.run.run                                    //> the result is: 19

可以看到Emit,Await,Halt,Append都是Process类型的结构化状态。其中Await就是flatMap函数的结构化,Emit就像Return,所以Process就是一个Free
Monad。

scalaz-stream的输出类型是Sink,我们用to来连接。那么如果需要不断重复运算呢:

Emit的作用是发出一个O值,Await的作用是运算F然后连接下一个Process,
Append的作用则是把前一个Process的信息传递到下一个Process。Await和Append分别是不同方式的Process连接方式。

 1 import scalaz._ 2 import Scalaz._ 3 import scalaz.concurrent._ 4 import scalaz.stream._ 5 import Process._ 6 object streamAppsDemo extends App { 7   def putLine(line: String) = Task.delay { println } 8   def getLine = Task.delay { Console.readLine } 9   val readL = putLine("Enter:>").flatMap {_ => getLine}10   val readLines =  repeatEval11   val echoLine = readLines.flatMap {line => eval(putLine}  12   echoLine.run.run13 }

Process又分以下几类:

这是一个无穷运算程序:不停地把键盘输入回响到显示器上。下面是一些测试结果:

  type Process0[+O] = Process[Nothing,O]

  /**
   * A single input stream transducer. Accepts input of type `I`,
   * and emits values of type `O`.
   */
  type Process1[-I,+O] = Process[Env[I,Any]#Is, O]

  /**
   * A stream transducer that can read from one of two inputs,
   * the 'left' (of type `I`) or the 'right' (of type `I2`).
   * `Process1[I,O] <: Tee[I,I2,O]`.
   */
  type Tee[-I,-I2,+O] = Process[Env[I,I2]#T, O]

  /**
   * A stream transducer that can read from one of two inputs,
   * non-deterministically.
   */
  type Wye[-I,-I2,+O] = Process[Env[I,I2]#Y, O]

  /**
   * An effectful sink, to which we can send values. Modeled
   * as a source of effectful functions.
   */
  type Sink[+F[_],-O] = Process[F, O => F[Unit]]

  /**
   * An effectful channel, to which we can send values and
   * get back responses. Modeled as a source of effectful
   * functions.
   */
  type Channel[+F[_],-I,O] = Process[F, I => F[O]]
1 Enter:>2 hello world!3 hello world!4 Enter:>5 how are you?6 how are you?7 Enter:>

Process[F[_],O]:source:运算流源点,由此发送F[O]运算

当然,我们也可以把上面的程序表达的更形象些:

Process0[+O]:>>>Process[Nothing,+O]:source:纯数据流源点,发送O类型元素

1   val outLine: Sink[Task,String] = constant(putLine _).toSource2   val echoInput: Process[Task,Unit] = readLines to outLine3   //echoLine.run.run4   echoInput.run.run 

Process1[-I,+O]:一对一的数据转换节点:接收一个I类型输入,经过处理转换成O类型数据输出

用to
Sink来表述可能更形象。这个程序没有任何控制:甚至无法有意识地退出。我们试着加一些控制机制:

Tee[-I1,-I2,+O]:二对一的有序输入数据转换节点:从左右两边一左一右有顺接受I1,I2类型输入后转换成O类型数据输出

 1   def lines: Process[Task,String] = { 2     def go(line: String): Process[Task,String] =  3         line.toUpperCase match { 4           case "QUIT" => halt 5           case _ => emit ++ await 6         }  7     await 8   } 9   10   val prg = lines to outLine11   prg.run.run 

Wye[-I1,-I2,+O]:二对一的无序输入数据转换节点:不按左右顺序,按上游数据发送情况接受I1,I2类型输入后转换成O类型数据输出

在rcv函数里检查输入是否quit,如果是就halt,否则重复运算await。现在可以控制终止程序了。

Sink[+F[_],-O]:运算终点,在此对O类型数据进行F运算,不返回值:O
=> F[Unit]

下面再示范一下异常处理机制:看看能不能有效的捕捉到运行时的错误:

Channel[+F[_],-I,O]:运算终点,接受I类型输入,进行F运算后返回F[O]:I
=> F[O]

1   def mul = await1[String].flatMap { s => emit((s.toDouble * i).toString) }.repeat2   val prg = (lines |> mul(5)) to outLine  3   prg.run.run 

以下是一些简单的Process构建方法:

发表评论

电子邮件地址不会被公开。 必填项已用*标注