Future[T]Try[T]对偶(dual)

1
2
3
trait Future[T] {
def OnComplete[U](func: Try[T] => U)(implicit ex: ExecutionContext): Unit
}

OnComplete 方法的类型进行化简(U 化简为 Unit),得到

1
(Try[T] => Unit) => Unit

翻转这个类型,得到

1
Unit => (Unit => Try[T])

继续简化,得到

1
() => (() => Try[T]) ≈ Try[T]

可以看出,对方法

1
def asynchronous(): Future[T] = { ... }

传递回调(Try[T] => Unit)得到 Try[T],而方法

1
def synchronous(): Try[T] = { ... }

一直阻塞直到返回 Try[T]

同步数据流:Iterable[T]

这是 Scala 所有集合类型的基 trait,它定义了一个迭代器方法来一个一个地遍历集合中的元素。

1
trait Iterable[T] { def iterator(): Iterator[T] }

迭代器是用来遍历序列元素的数据结构。它有个 hasNext 方法来检测下一个元素是否存在,还有个 next 方法来返回下一个元素。

1
trait Iterator[T] { def hasNext: Boolean; def next(): T }

画成图是这样:

Iterable

操作 Iterable[T] 的高阶函数有这些:

1
2
3
4
5
6
def flatMap[B](f: A=>Iterable[B]): Iterable[B] def map[B](f: A=>B): Iterable[B]
def filter(p: A=>Boolean): Iterable[A]
def take(n: Int): Iterable[A]
def takeWhile(p: A=>Boolean): Iterable[A]
def toList(): List[A]
def zip[B](that: Iterable [B]): Iterable[(A, B)]

这是一个单子。

常常用弹子图(Marble Diagram)来描述这种类型。

Marble Diagram

如果将不同命令的执行时间放大到人类级别,将会是这样:

Timings on human scale

这时用 Iterator 从磁盘中读取文件会让程序阻塞很长时间:

1
2
3
4
5
6
7
8
9
def ReadLinesFromDisk(path: String): Iterator[String] = {
Source.fromFile(path).getLines()
}
val lines = ReadLinesFromDisk("\c:\tmp.txt")
for (line
... DoWork(line) ...
}
// 2 weeks per line.

现在用之前的对偶化技巧将拉(pull)模型转化为推(push)模型。

第零步,化简。将之前的签名:

1
2
3
4
5
6
7
8
trait Iterable[T] {
def iterator(): Iterator[T]
}
trait Iterator[T] {
def hasNext: Boolean
def next(): T
}

抽象出类型:

1
() => (() => Try[Option[T]])
  • () => ( ... )iterator() 而来,
  • () => Try[Option[T]]next() 而来,
  • Option 表示了 hasNext
  • Try 显式化了错误。

第一步,翻转。

1
() => (() => Try[Option[T]])

翻转为:

1
(Try[Option[T]] => Unit) => Unit

第二步,化简。将组合在一起的类型拆分为三个:

1
2
3
4
( T => Unit,
Throwable => Unit,
() => Unit
) => Unit

第三步,复杂化。得出对应的签名:

1
2
3
4
5
6
7
8
9
10
11
12
13
trait Observable[T] {
def Subscribe(observer: Observer[T]): Subscription
}
trait Observer[T] {
def onNext(value: T): Unit
def onError(error: Throwable): Unit
def onCompleted(): Unit
}
trait Subscription {
def unsubscribe(): Unit
}

通过对比可发现 Iterable[T]Observable[T] 是对偶。

对比 FutureObservable

首先看签名:

1
2
Observable[T] = (Try[Option[T]] => Unit) => Unit
Future[T] = (Try[ [T]] => Unit) => Unit

Observable[T] 多了 Option,这使其可以处理多次数据。

并发方面有什么不同呢?

1
2
3
4
5
6
7
object Future {
def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T]
}
trait Observable[T] {
def observeOn(scheduler: Scheduler): Observable[T]
}

Future 只执行一次,仅需要当前线程相关的 ExecutionContext,而 Observable 执行多次,需要一个 Scheduler 来控制。

Observable 基础

来看一个使用 Observable 的例子:

1
2
3
4
5
6
val ticks: Observable[Long] = Observable.interval(1 seconds)
val evens: Observable[Long] = ticks.filter(s=>s%2==0)
val bufs: Observable[Seq[Long]] = ticks.buffer(2,1)
val s = bufs.subscribe(b=>printLn(b))
readLine()
s.unscubscribe()

分步执行如下:

1
val ticks: Observable[Long] = Observable.interval(1 seconds)

Observable-eg-01

1
val evens: Observable[Long] = ticks.filter(s=>s%2==0)

Observable-eg-02

1
val bufs: Observable[Seq[Long]] = ticks.buffer(2,1)

Observable-eg-03

练习:

1
val xs = Observable.range(1, 10)

的弹子图如下:

range

那么

1
val ys = xs.map(x => x + 1)

的弹子图是什么?

options

答案是 B。

Observable 上的组合子

操作 Observable[T] 的高阶函数有这些:

1
2
3
4
5
6
7
def flatMap[B](f: A=>Observable[B]): Observable[B]
def map[B](f: A=>B): Observable[B]
def filter(p: A=>Boolean): Observable[A]
def take(n: Int): Observable[A]
def takeWhile(p: A=>Boolean): Observable[A]
def toList(): List[A]
def zip[B](that: Observable[B]): Observable[(A, B)]

其中 map 的弹子图是:

map

flatMap 定义如下:

1
def flatMap(f: T=>Observable[S]): Observable[S] = { map(f).flatten() }

其弹子图为:

flatMap

有两种扁平化叠套流的方法,一种是 flatten

1
2
3
4
val xs: Observable[Int] = Observable(3,2,1)
val yss: Observable[Observable[Int]] =
xs.map(x => Observable.Interval(x seconds).map(_=>x).take(2))
val zs: Observable[Int] = yss.flatten()

flatten

merge

另一种是 concat

1
2
3
4
val xs: Observable[Int] = Observable(3,2,1)
val yss: Observable[Observable[Int]] =
xs.map(x => Observable.Interval(x seconds).map(_=>x).take(2))
val zs: Observable[Int] = yss.concat()

concat-eg

concat

下面通过一个处理地震通知的例子来展示如何映射和过滤异步的数据流。定义基本的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def usgs(): Observable[EarthQuake] = { ... }
class EarthQuake {
...
def magnitude: Double
def location: GeoCoordinate
}
object Magnitude extends Enumeration {
def apply(magnitude: Double): Magnitude = { ... }
type Magnitude = Value
val Micro, Minor, Light, Moderate, Strong, Major, Great = Value
}

用起来大概是这样:

1
2
3
4
5
6
7
val quakes = usgs()
val major = quakes
.map(q=>(q.Location, Magnitude(q.Magnitude)))
.filter{ case (loc,mag) => mag >= Major }
major.subscribe({ case (loc, mag) => {
println($"Magnitude ${ mag } quake at ${ loc }")
})

现在想通过网络将地震处的地理坐标转换为国家信息:

1
2
3
4
5
6
7
8
9
10
def reverseGeocode(c: GeoCoordinate): Future[Country] = { ... }
val withCountry: Observable[Observable[(EarthQuake, Country)]] =
usgs().map(quake => {
val country: Future[Country] = reverseGeocode(q.Location)
Observable(country.map(country=>(quake,country)))
})
// This
val merged: Observable[(EarthQuake, Country)] = withCountry.flatten()
// Or this?
val merged: Observable[(EarthQuake, Country)] = withCountry.concat()

那么问题来了,该用 flatten 还是 concat?如果用 flatten

geo-flatten

最终收到地震消息的顺序会因为反向解析的延迟而出现错乱,而如果用 concat 则没问题:

geo-concat

看一个新函数 groupBy

1
def groupBy[K](keySelector: T=>K): Observable[(K,Observable[T])]

它的弹子图为:

groupBy

原序列元素根据形状分为了两组,最终产生了三个数据流。

现在想让收到的地震信息根据国家不同而分类,可以这样写:

1
2
3
val merged: Observable[(EarthQuake, Country)] = withCountry.flatten()
val byCountry: Observable[(Country, Observable[(EarthQuake, Country)]] =
merged.groupBy{ case (q,c) => c }

group-eg

练习:

若想统计不同国家发生地震的平均次数,部分代码如下:

1
2
3
val byCountry: Observable[(Country, Observable[(EarthQuake, Country)]]
def runningAverage(s : Observable[Double]): Observable[Double] = {...}
val runningAveragePerCountry : Observable[(Country, Observable[Double])]

那么 runningAveragePerCountry 的实现应该是什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// a)
val runningAveragePerCountry = byCountry.map{
case (country, quakes) => (country, runningAverage(quakes))
}
// b)
val runningAveragePerCountry = byCountry.map{
case (country, quakes) => (country, runningAverage(quakes.map(_.Magnitude))
}
// c)
val runningAveragePerCountry = byCountry.map{
case (country, cqs) => (country, runningAverage(cqs.map(_._1.Magnitude))
}

根据类型匹配的原则可以得出答案为 C。

订阅

如何取消订阅呢?这样:

1
2
3
val quakes: Observable[EarthQuake] = ...
val s: Subscription = quakes.Subscribe(...)
s.unsubscribe()

Observable 可分为两种,一种称为 Hot Observable,所有的订阅者共享同样的源:

hot observable

另一种称为 Cold Observable,每个订阅者都有自己的私有源:

cold observable

注意取消订阅不等于终止源,因为可能还存在其他订阅者。

Subscription 的基础定义如下:

1
2
3
4
5
6
7
trait Subscription {
def unsubscribe(): Unit
}
object Subscription {
def apply(unsubscribe: => Unit):Subscription
}

Subscription 家族中包含这些成员:

1
2
3
4
5
6
7
8
9
10
11
12
13
trait BooleanSubscription extends Subscription {
def isUnsubscribed: Boolean
}
trait CompositeSubscription extends BooleanSubscription {
def +=(s: Subscription): this.type
def -=(s: Subscription): this.type
}
trait MultipleAssignmentSubscription extends BooleanSubscription {
def subscription: Subscription
def subscription_=(that: Subscription): this.type
}

下面的代码中 subscription 被调用了两次:

1
2
3
4
5
val subscription = Subscription {
println("bye, bye, I’m out fishing")
}
subscription.unsubscribe()
subscription.unsubscribe()

结果是只有第一次会输出字符串。就是说,unsubscribe 可以被调用多次,它必须是幂等(idempotent)的。

BooleanSubscription 有一个 isUnsubscribed 方法,它能指示此订阅是否已被取消:

1
2
3
4
5
6
val subscription = BooleanSubscription {
println("bye, bye, I’m out fishing")
}
println(subscription.isUnsubscribed)
subscription.unsubscribe()
println(subscription.isUnsubscribed)

BooleanSubscription

CompositeSubscription 可以包含许多订阅,当其被取消时所包含的订阅也会被取消:

1
2
3
4
5
6
7
8
val a = BooleanSubscription { println("A") }
val b = Subscription { println("B") }
val composite = CompositeSubscription(a,b)
println(composite.isUnsubscribed)
composite.unsubscribe()
println(composite.isUnsubscribed)
println(a.isUnsubscribed)
composite += Subscription{ println ("C") }

CompositeSubscription-subscribe

当新加入订阅时,若 CompositeSubscription 未被取消则新订阅状态不变,若 CompositeSubscription 已被取消则新订阅会被立刻取消:

CompositeSubscription-add

MultiAssignment 只能包含一个子订阅,且它自身包含了一个隐式的订阅:

MultiAssignment-subscribe

新加入订阅时的行为和 CompositeSubscription 类似:

MultiAssignment-add

当子订阅被取消时,MultiAssignment 的隐式订阅并不会被取消:

MultiAssignment-unscribe

练习:

有如下代码段:

1
2
3
4
5
6
val a = BooleanSubscription { println("A") }
val b = Subscription { println("B") }
val c = CompositeSubscription(a,b)
val m = MultiAssignmentSubscription()
m.subscription = c
c.unsubscribe

下面哪个是正确的?

1
2
3
4
a) b.isUnsubscribed == true
b) a.isUnsubscribed == false
c) m.isUnsubscribed == true
d) c.isUnsubscribed == true

答案是 D。