2012年9月1日土曜日

Machines

先日rpscalaに行った時、@xuwei_kさんにscala-machinesというものを教えてもらったので調べてみました。

Example.scalaを参考に、コードを読んでいきます。

import java.io._
import scalaz.{ Reader => _, _ }, Scalaz._
import scalaz.effect._, IO._
import com.clarifi.machines._
object Main extends SafeApp {
import Plan._
def getFileLines[A](m: Process[String, A])(f: File): Procedure[IO, A] =
new Procedure[IO, A] {
type K = String => Any
val machine: Machine[K, A] = m
def withDriver[R](k: Driver[IO, K] => IO[R]): IO[R] = {
bufferFile(f).bracket(closeReader)(r => {
val d = new Driver[IO, String => Any] {
def apply(k: String => Any): IO[Option[Any]] = rReadLn(r) map (_ map k)
}
k(d)
})
}
}
def bufferFile(f: File): IO[BufferedReader] =
IO { new BufferedReader(new FileReader(f)) }
def rReadLn(r: BufferedReader): IO[Option[String]] = IO { Option(r.readLine) }
def closeReader(r: Reader): IO[Unit] = IO { r.close }
def countLn: Process[String, Int] = Process(_ => 1)
def exec[A: Monoid](f: File => Procedure[IO, A]): IO[Unit] =
(f(new File("test.csv")).execute >>= putOut) >> putStrLn("")
override def runc = exec(getFileLines(countLn))
}
view raw machines.scala hosted with ❤ by GitHub
少々書き換えました。

ぱっと見ても全くわからないですね。

IO


副作用(アクション)を表します。
IO.applyは値をcall by nameでとります。
unsafePerformIOを呼ぶことで、アクションを実行します。

bufferFile


def bufferFile(f: File): IO[BufferedReader] =
IO { new BufferedReader(new FileReader(f)) }
ファイルをとってBufferedReaderを返す関数です。
IOでラップしています。

rReadLn


def rReadLn(r: BufferedReader): IO[Option[String]] = IO { Option(r.readLine) }
view raw rReadLn.scala hosted with ❤ by GitHub
BufferedReaderから一行読み取ります。
readLineは文字列かnullを返すのでOption.applyを使用しています。
これもIOでラップしています。

closeReader


def closeReader(r: Reader): IO[Unit] = IO { r.close }
Readerを閉じます。
当然副作用なのでIOでラップされます。

Process


Process[I, O]はI => Oをラップしたものと考えていい・・・・はず。
Process[I, O]はMachine[I => Any, O]のシノニムで、Machine[K, O]はPlan[K, O, Nothing]のシノニムなので、実体はPlanなわけですが、まだ私は理解できていません。
IterateeのIteratee,Enumeratee,Enumeratorの内のIterateeにあたるようなものだと思われます。
なのでストリームの要素はこいつを使って処理していきます。

countLn


def countLn: Process[String, Int] = Process(_ => 1)
view raw countLn.scala hosted with ❤ by GitHub
文字列をとって1を返すProcessです。

getFileLines


def getFileLines[A](m: Process[String, A])(f: File): Procedure[IO, A] =
new Procedure[IO, A] {
type K = String => Any
val machine: Machine[K, A] = m
def withDriver[R](k: Driver[IO, K] => IO[R]): IO[R] = {
bufferFile(f).bracket(closeReader)(r => {
val d = new Driver[IO, String => Any] {
def apply(k: String => Any): IO[Option[Any]] = rReadLn(r) map (_ map k)
}
k(d)
})
}
}
型パラメータAとProcess[String, A]とFileをとり、Procedure[IO, A]を返します。
モナド変換子と同じく、Procedureから値Aを取り出す時にIOにラップされるようになります。
ProcedureはEnumeratorのようなもので、ストリームとそれを処理するMachineを持ちます。
ProcedureはMachineの第一型パラメータとMachine、抽象メソッドのwithDriverを定義します。
Process[I, O]は先述の通りMachine[I => Any, O]と定義されているので、抽象タイプメンバにはString => Any、Machineは引数としてとったProcessで定義します。
Driverでは入力を定義するのですが、当然、入力にはリソースが必要になります。
そこで、このwithDriverではリソースの作成、開放、Driver(入力)の作成、Driverの適用を定義します。
リソースの作成はbufferedReaderを使います。
IO#bracket(f)(g)は処理gの後に必ず処理fが処理されます。
事後処理としてcloseReaderを渡し、Driverの作成と適用をします。
DriverにはrReadLnを使い、入力を適用するapplyを定義します。
最後にwithDriverがとる関数kにDriverを適用します。

exec


def exec[A: Monoid](f: File => Procedure[IO, A]): IO[Unit] =
(f(new File("test.csv")).execute >>= putOut) >> putStrLn("")
view raw exec.scala hosted with ❤ by GitHub
ファイルを関数に適用し、実行、出力する関数です。
executeはProcedureの結果の型Aがモノイドである必要があります。
ストリームを処理した結果をaccumulatorに加える関数で合成していきます。

runc


override def runc = exec(getFileLines(countLn))
view raw runc.scala hosted with ❤ by GitHub
main関数で呼ばれる関数で、裏でunsafePerformIOを呼びます。

CSVファイルの読み込みと要素のカウントを書いてみた。

import java.io._
import scalaz.{ Reader => _, _ }, Scalaz._
import scalaz.effect._, IO._
import com.clarifi.machines._
object CSV extends SafeApp {
import Plan._
def getCSVFile[A](m: Process[List[String], A])(f: File): Procedure[IO, A] =
new Procedure[IO, A] {
type K = List[String] => Any
val machine = m
def withDriver[R](k: Driver[IO, K] => IO[R]): IO[R] = {
bufferFile(f).bracket(closeReader)(r => {
val d = new Driver[IO, List[String] => Any] {
def apply(k: List[String] => Any): IO[Option[Any]] = rReadLn(r) map (_ map (s => k(s.split(",").toList)))
}
k(d)
})
}
}
def bufferFile(f: File): IO[BufferedReader] =
IO { new BufferedReader(new FileReader(f)) }
def rReadLn(r: BufferedReader): IO[Option[String]] = IO { Option(r.readLine) }
def closeReader(r: Reader): IO[Unit] = IO { r.close }
def countLn: Process[List[String], Int] = Process(_.size)
def exec[A: Monoid](f: File => Procedure[IO, A]): IO[Unit] =
(f(new File("test.csv")).execute >>= putOut) >> putStrLn("")
override def runc = exec(getCSVFile(countLn))
}
view raw CSV.scala hosted with ❤ by GitHub

まとめ


IterateeよりはIOに特化していると思われる。
リソースの処理をProcedureでやっているところとか。
ProcessやProcedureを自分で書くのは、IterateeやEnumeratorを自分で書くよりは楽・・・・な気がしなくもない。

今回は本当にExampleをさわっただけで、詳細はわかっていない。
作者のドキュメントに期待です。

0 件のコメント:

コメントを投稿