import scala.actors.Actor import scala.actors.Actor._ import scala.actors.remote.Node import scala.actors.remote.RemoteActor._ import java.util.UUID /* * Abstract Stream * * @author Hiroyasu OHYAMA */ case class SendStream[A](host: String, port: Int, id: Symbol) class KSMaster[A](id: Symbol, var data: Stream[A]) extends Actor { val port = 10100 def act { alive(port) register(id, self) loop { receive { case "get" => { if(! data.isEmpty) { reply(data.head) data = data.tail } else { reply(Nil) } } case _ => { reply(Nil) } } } } } object AStream { def set[A](s : Stream[A]) : SendStream[A] = { val id = Symbol(UUID.randomUUID.toString) val master = new KSMaster(id, s) master.start return SendStream("localhost", 10100, id) } def get[A](host: String, port: Int, id: Symbol) : Stream[A] = { val server = select(Node(host, port), id) var retStream = Stream[A]() server !? "get" match { case Nil => { retStream = Stream.empty } case value => { retStream = Stream.cons(value.asInstanceOf[A], get(host, port, id)) } } return retStream; } }