前回 は Stream オブジェクトをリモートアクターに転送するにはどうすればいいか?という話でした。
せっかく Straem を使うわけですから。List に変換したり、転送時に Stream を foreach で走査して、再構築するなんて事はしないで。値の評価の遅延だけでなく、プロセス間のデータ転送まで遅延したいところです。
という事で。Stream のデータ転送を遅延する、抽象化機構を作ってみました。
この Stream を抽象化するデータ構造を AStream とここでは名づけます [*1]。
まず。リモートアクターに Stream 転送する際、直接 Stream を送らないで別のデータ構造に変換してからリモートに転送するようにします。
そこで、入力に Stream オブジェクトを取る set メソッドを作り、出力に入力した Stream に可換な別のデータ構造を吐き出すようにします。具体的に、送信側では以下のようにします。
[送信側の実装の抜粋]
|
... // 送信する先 val server = select(Node(host, port), 'Server) server ! AStream.set(Stream.cons("A", Stream.cons("B", Stream.empty))) |
AStream.set は以下のケースクラスのインスタンスをリモートに送ります。
|
case class SendStream[A](host: String, port: Int, id: Symbol) |
リモートでデータを受け取り、Stream を取り出す実装は以下のようにします。
[受信側の実装の抜粋]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
... def act { alive(port) register('TestServer, self) loop { receive { ... case SendStream(host, port, id) => { // Stream を抽出 val stream = AStream.get(host, port, id) println("strm : " + strm); // 参照時に送信元からデータを取ってくる println("strm.tail : " + strm.tail); } ... } } } |
リモートの受信側では、SendStream の各メンバを引数に AStream オブジェクトの get メソッドを呼び出し、Stream を抽出します。
以下が get メソッドの実装です。ここまでの話は、かなり手続的な内容でしたが。ここでようやく Stream の本領発揮です。
[AStream のコード抜粋]
|
def get[A](host: String, port: Int, id: Symbol) : Stream[A] = { val server = select(Node(host, port), id) var retStream = Stream[A]() server !? "get" match { case Nil => { retStream = Stream.empty } case value => { retStream = Stream.cons(value.asInstanceOf[A], get(host, port, id)) } } return retStream; } |
ここでの処理は、送信側に Stream の中身を問い合わせて、先頭要素だけを送ってもらうという事をしています(もちろん送信側では、この処理の後に先頭要素を次要素にスライドさせます)。ただし、Stream.empty は、空リスト (Nil) に変換して送ってもらい、リモート側で Stream.empty に戻します。
ミソは 10 行目の再起が Stream によって遅延評価される事です。この時点では 10 行目の get メソッドは評価されず、「次に評価するのは僕ですよ」という事だけ教えて Stream を返します。なのでここでの処理は一瞬で終わります。
実際に先頭以降の要素を送信側から取ってくるのは、値が参照されたとき ([受信側の実装] の tail が実行されたとき) になります。このときに始めて AStream.get メソッドで返された Stream の次の要素 (get メソッド) の評価が行われ、送信側から次の要素が入った Stream が返されます。
こうすることで、データ転送を遅延する Stream の転送が実現できました。
こんな事しなくても、別の (もっと簡単な) 方法を知ってるぜ。という方は、是非コメントください。
[*1] AStream のコード : AStream.scala
関連文書:
最近のコメント