分类 |
一个 |
多个 |
同步 |
T / Try[T] |
Iterable[T] |
异步 |
Future[T] |
Observable[T] |
异常作为副作用(Exception as an Effect)
| trait Adventure { def collectCoins(): List[Coin] def buyTreasure(coins: List[Coin]): Treasure }
| def collectCoins(): List[Coin] = { if (eatenByMonster(this)) throw new GameOverException("Ooops") List(Gold, Gold, Silver) } def buyTreasure(coins: List[Coin]): Treasure = { if (coins.sumBy(_.value) throw new GameOverException("Nice try!") Diamond }
| val adventure = Adventure() val coins = adventure.collectCoins() val treasure = adventure.buyTreasure(coins)
通过引入 Try
| abstract class Try[T] case class Success[T](elem: T) extends Try[T] case class Failure(t: Throwable) extends Try[Nothing] trait Adventure { def collectCoins(): Try[List[Coin]] def buyTreasure(coins: List[Coin]): Try[Treasure] } val adventure = Adventure() val coins: Try[List[Coin]] = adventure.collectCoins() val treasure: Try[Treasure] = coins match { case Success(cs) => adventure.buyTreasure(cs) case failure @ Failure(t) => failure }
其实 Try
| def flatMap[S](f: T=>Try[S]): Try[S] def flatten[U Try[T]]: Try[U] def map[S](f: T=>S): Try[T] def filter(p: T=>Boolean): Try[T] def recoverWith(f: PartialFunction[Throwable, Try[T]]): Try[T]
| val treasure: Try[Treasure] = adventure.collectCoins().flatMap(coins => { adventure.buyTreasure(coins) })
如果用 for
| val treasure: Try[Treasure] = for { coins treasure } yield treasure
是单子,它的 map
和 apply
| def map[S](f: T=>S): Try[S] = this match { case Success(value) => Try(f(value)) case failure @ Failure(t) => failure } object Try { def apply[T](r: =>T): Try[T] = { try { Success(r) } catch { case t => Failure(t) } }
练习:下面哪个函数是正确的 Try
的 flatMap
| def flatMap[S](f: T=>Try[S]): Try[S] = this match { case Success(values) => f(value) case failure @ Failure(t) => failure } def flatMap[S](f: T=>Try[S]): Try[S] = this match { case Success(value) => Try(f(value)) case failure @ Failure(t) => failure } def flatMap[S](f: T=>Try[S]): Try[S] = this match { case Success(value) => try { f(value) } catch { case t => Failure(t) } case failure @ Failure(t) => failure }
答案是 c。a 中调用 f
时没有处理异常,b 中返回值类型为 Try[Try[T]]
延迟作为副作用(Latency as an Effect)
| trait Socket { def readFromMemory(): Array[Byte] def sendToEurope(packet: Array[Byte]): Array[Byte] } val socket = Socket() val packet = socket.readFromMemory() val confirmation = socket.sendToEurope(packet)
| import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global trait Future[T] { def onComplete(callback: Try[T] => Unit)(implicit executor: ExecutionContext): Unit def onComplete(success: T => Unit, failed: Throwable => Unit): Unit def onComplete(callback: Observer[T]): Unit } trait Observer[T] { def onNext(value: T): Unit def onError(error: Throwable): Unit }
一般在另外的线程中运行,因此回调函数 onComplete
会提供一个运行环境参数 executor
| val socket = Socket() val packet: Future[Array[Byte]] = socket.readFromMemory() val confirmation: Future[Array[Byte]] = packet onComplete { case Success(p) => socket.sendToEurope(p) case Failure(t) => ... }
的类型并不是 Future[Array[Byte]]
| packet onComplete { case Success(p) => { val confirmation: Future[Array[Byte]] = socket.sendToEurope(p) } case Failure(t) => ... }
暂时先不管 Future
的使用形式问题,来看看如何创建一个 Future
| object Future { def apply(body: =>T)(implicit context: ExecutionContext): Future[T] }
| import scala.concurrent.ExecutionContext.Implicits.global import akka.serializer._ val memory = Queue[EMailMessage]( EMailMessage(from = "Erik", to = "Roland"), EMailMessage(from = "Martin", to = "Erik"), EMailMessage(from = "Roland", to = "Martin")) def readFromMemory(): Future[Array[Byte]] = Future { val email = queue.dequeue() val serializer = serialization.findSerializerFor(email) serializer.toBinary(email) } val packet: Future[Array[Byte]] = socket.readFromMemory() packet onSuccess { case bs => socket.sendToEurope(p) } packet onSuccess { case bs => socket.sendToEurope(p) }
当其执行完时 email
答案是 2 封。尽管调用了两次 onSuccess
,但是 Future
对象会缓存执行完的结果,两个 onSuccess
Future 上的组合子
Scala 中的 Future
| trait Awaitable[T] extends AnyRef { abstract def ready(atMost: Duration): Unit abstract def result(atMost: Duration): T } trait Future[T] extends Awaitable[T] { def filter(p: T => Boolean): Future[T] def flatMap[S](f: T => Future[S]): Future[U] def map[S](f: T => S): Future[S] def recoverWith(f: PartialFunction[Throwable, Future[T]]): Future[T] } object Future { def apply[T](body: => T): Future[T] }
假设有一个 Http
| object Http { def apply(url: URL, req: Request): Future[Response] = {... runs the http request asynchronously ...} } def sendTo(url: URL, packet: Array[Byte]): Future[Array[Byte]] = Http(url, Request(packet)).filter(response => response.isOK).map(response => response.toByteArray) def sendToAndBackup(packet: Array[Byte]): Future[(Array[Byte], Array[Byte])] = { val europeConfirm = sendTo(mailServer.europe, packet) val usaConfirm = sendTo(mailServer.usa, packet) europeConfirm.zip(usaConfirm) }
实际上 Future
| def recover(f: PartialFunction[Throwable,T]): Future[T] def recoverWith(f: PartialFunction[Throwable,Future[T]]): Future[T]
用一个同步的过程来恢复,而 recoverWith
| def sendToSafe(packet: Array[Byte]): Future[Array[Byte]] = sendTo(mailServer.europe, packet) recoverWith { case europeError => sendTo(mailServer.usa, packet) recover { case usaError => usaError.getMessage.toByteArray } }
虽然过程上正确,但当两个网络请求都失败时返回的异常是 usaError
,并不是想要的 europeError
。因此想增加一个 fallbackTo
| def sendToSafe(packet: Array[Byte]): Future[Array[Byte]] = sendTo(mailServer.europe, packet) fallbackTo { sendTo(mailServer.usa, packet) } recover { case europeError => europeError.getMessage.toByteArray } def fallbackTo(that: => Future[T]): Future[T] = { this recoverWith { case _ => that recoverWith { case _ => this } } }
这时程序就是正确并鲁棒的了,而且 fallbackTo
练习:现在想定义一个支持 Future
的 Try
| object Try { def apply(f: Future[T]): Future[Try[T]] = { ... } }
它的 apply
| f onComplete { x => x } f recoverWith { case t => Future.failed(t) } f.map(x => Try(x)) f.map(s => Success(s)) recover { case t => Failure(t) }
答案为 d。a 中返回值为 Unit
,b 和 c 都只考虑了 Try
的一个 case。
之前看到 Future
继承了 Awaitable
| trait Awaitable[T] extends AnyRef { abstract def ready(atMost: Duration): Unit abstract def result(atMost: Duration): T }
| val socket = Socket() val packet: Future[Array[Byte]] = socket.readFromMemory() val confirmation: Future[Array[Byte]] = packet.flatMap(socket.sendToSafe(_)) val c = Await.result(confirmation, 2 seconds) println(c.toText)
Future 的应用
因为 Future
是单子,所以可以使用 for
| val socket = Socket() val confirmation: Future[Array[Byte]] = for { packet confirmation } yield confirmation
现在想定义一个不断尝试执行 Future
最多 n 次直至成功的函数,一个可行的实现是:
| def retry(noTimes: Int)(block: => Future[T]): Future[T] = { if (noTimes == 0) { Future.failed(new Exception("Sorry")) } else { block fallbackTo { retry(noTimes–1) { block } } } }
Recursion is the GOTO of Functional Programming - ErikMeijer
这个人其实就是本课的老师……另外一个方法就是使用 foldRight
或 foldLeft
| def retry(noTimes: Int)(block: =>Future[T]): Future[T] = { val ns: Iterator[Int] = (1 to noTimes).iterator val attempts: Iterator[Future[T]] = ns.map(_=> ()=>block) val failed = Future.failed(new Exception) attempts.foldLeft(failed)((a,block) => a recoverWith { block() }) } def retry(noTimes: Int)(block: =>Future[T]): Future[T] = { val ns: Iterator[Int] = (1 to noTimes).iterator val attempts: Iterator[Future[T]] = ns.map(_=> ()=>block) val failed = Future.failed(new Exception) attempts.foldRight(() => failed) ((block, a) => () => { block() fallbackTo { a() } }) }
现在觉得显式的副作用并不方便,能不能将其变成隐式的呢?比如 Future
,能否将 T => Future[S]
变为 T => S
?答案是可以的,Scala 提供了 async { ... await{ ... } ... }
| import scala.async.Async._ def async[T](body: =>T)(implicit context: ExecutionContext): Future[T] def await[T](future: Future[T]): T
会将异步非阻塞的代码变成同步阻塞的,外部的 async
依旧返回异步非阻塞的 Future
外部的 async
不能用在 by-name 参数中
不能包含 return
不能包含在 try-catch
使用了 await
| def retry(noTimes: Int)(block: =>Future[T]): Future[T] = async { var i = 0 var result: Try[T] = Failure(new Exception("sorry man!")) while (i result = await { Try(block) } i += 1 } result.get }
使用 await
重写 filter
| def filter(p: T => Boolean): Future[T] = async { val x = await { this } if (!p(x)) { throw new NoSuchElementException() } else { x } }
练习:如何用 await
实现 flatMap
| def flatMap[S](f: T => Future[S]): Future[S] = async { await { f( await { this } ) } }
如果不用 await
,可以使用 Promise
实现 filter
| def filter(pred: T => Boolean): Future[T] = { val p = Promise[T]() this onComplete { case Failure(e) => p.failure(e) case Success(x) => if (!pred(x)) p.failure(new NoSuchElementException) else p.success(x) } p.future }
包含一个 Future
,当调用 Promise
的 complete
方法时 Promise
会调用自身 Future
和 complete
方法类似,只是当该 Promise
已经完成后 tryComplete
会返回 false
和 failure
| trait Promise[T] { def future: Future[T] def complete(result: Try[T]): Unit def tryComplete(result: Try[T]): Boolean def success(value: T): Unit = this.complete(Success(value)) def failure(t: Throwable): Unit = this.complete(Failure(t)) }
之前的 zip
| def zip[S, R](that: Future[S], f: (T, S) => R): Future[R] = { val p = Promise[R]() this onComplete { case Failure(e) => p.failure(e) case Success(x) => that onComplete { case Failure(e) => p.failure(e) case Success(y) => p.success(f(x, y)) } } p.future }
如果用 await
| def zip[S, R](p: Future[S], f: (T, S) => R): Future[R] = async { f(await { this }, await { that }) }
现在用 await
定义一个队列函数,它会依次执行队列中的 Future
| def sequence[T](fs: List[Future[T]]): Future[List[T]] = async { var _fs = fs val r = ListBuffer[T]() while (_fs != Nil) { r += await { _fs.head } _fs = _fs.tail } f.result }
如果用 Promise
| def sequence[T](fs: List[Future[T]]): Future[List[T]] = { val successful = Promise[List[T]]() successful.success(Nil) fs.foldRight(successful.future) { (f, acc) => for { x yield x :: xs } }