Skip to content

Commit

Permalink
#53 - document and utility monad forwarding functions for iteratees
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-twiner committed Jan 10, 2024
1 parent 4dab0a0 commit d92bdb5
Show file tree
Hide file tree
Showing 13 changed files with 728 additions and 256 deletions.
5 changes: 2 additions & 3 deletions aalto/src/test/scala/scales/xml/AaltoPullTest.scala
@@ -1,6 +1,5 @@
package scales.xml
/*

class AaltoPullTest extends PullTest {

}
*/
}
@@ -1,7 +1,8 @@
package scales.xml.parser.pull.aalto

import scalaz.Free.Trampoline
import scalaz.iteratee.Input.{Element, Empty, Eof}
import scalaz.iteratee.Iteratee
import scalaz.iteratee.{Iteratee, IterateeT}
import scalaz.iteratee.Iteratee.{peek, iteratee => siteratee}
import scalaz.iteratee.StepT.Done
/*
Expand Down Expand Up @@ -69,7 +70,6 @@ class AsyncPullTest extends junit.framework.TestCase {
/**
* ensure that the enumerator doesn't break basic assumptions when it can get
* all the data
*/
def testFlatMapMultipleDones = {
val url = sresource(this, "/data/BaseXmlTest.xml")
Expand Down Expand Up @@ -100,16 +100,19 @@ class AsyncPullTest extends junit.framework.TestCase {
parser.closeResource
wrapped.closeResource
}
*/
def testSimpleLoadAndFold =
doSimpleLoadAndFold{
def testSimpleLoadAndFold(): Unit =
doSimpleLoadAndFold[Trampoline]{
(p, iter, wrapped) =>
val enumeratee = enumToMany(iter)(AsyncParser.parse(p))
val (e,cont) = (enumeratee &= dataChunkerEnumerator(wrapped)).run
e
}
for {
r <- (enumeratee &= dataChunkerEnumerator[DataChunk, Trampoline](wrapped)).run
(e,cont) = r
} yield e
} run
def doSimpleLoadAndFold[T](test: (AsyncParser, Iteratee[PullType, List[String]], ReadableByteChannelWrapper[DataChunk ]) => List[String] ) : Unit = {
def doSimpleLoadAndFold[F[_]: Monad](test: (AsyncParser, IterateeT[PullType, F, List[String]], ReadableByteChannelWrapper[DataChunk ]) => F[List[String]] ) : F[Unit] = {
val url = sresource(this, "/data/BaseXmlTest.xml")
val channel = Channels.newChannel(url.openStream())
Expand All @@ -119,29 +122,32 @@ class AsyncPullTest extends junit.framework.TestCase {
val ns = Namespace("urn:default")
val iter = foldOnDoneIter( List[String](),
onQNames(List(ns("Default"), "NoNamespace"l,"DontRedeclare"l))){
(l, qmatch) => qmatch._2. // its empty as it was eof
map(p => qname(p.tree) :: l).getOrElse( l )
onQNames[F](List(ns("Default"), "NoNamespace"l,"DontRedeclare"l))){
(l, qmatch) => qmatch._2. // its empty as it was eof
map(p => qname(p.tree) :: l).getOrElse( l )
}
val wrapped = new ReadableByteChannelWrapper(channel)
val e = test(parser, iter, wrapped)
assertEquals(2, e.size)
e match {
case List("DontRedeclare", "DontRedeclare") => ()
case _ => fail("got "+e)
for {
e <- test(parser, iter, wrapped)
} yield {
assertEquals(2, e.size)
e match {
case List("DontRedeclare", "DontRedeclare") => ()
case _ => fail("got " + e)
}
}
}
def testSimpleLoadAndFoldAsync =
// should have collected all anyway
doSimpleLoadAndFold{
doSimpleLoadAndFold[Trampoline]{
(p, iter, wrapped) =>
val enumeratee = enumToMany(iter)(AsyncParser.parse(p))
val (e,cont) = (enumeratee &= dataChunkerEnumerator(wrapped)).run
e
}
(enumeratee &= dataChunkerEnumerator[DataChunk, Trampoline](wrapped)).run map {
_._1
}
} run
// here
// Just using the parser
Expand Down Expand Up @@ -210,18 +216,18 @@ class AsyncPullTest extends junit.framework.TestCase {
var res = Vector.empty[PullType]
var c = AsyncParser.parse(parser)
var c = AsyncParser.parse[Trampoline](parser)
var b : DataChunk = EmptyData
while(b != EOFData) {
def input =
if (b.isEOF)
Eof[DataChunk]
else
if (b.isEmpty) {
Empty[DataChunk]
} else
Element(b)
if (b.isEOF)
Eof[DataChunk]
else
if (b.isEmpty)
Empty[DataChunk]
else
Element(b)
def nextC =
c.foldT (
Expand Down
166 changes: 163 additions & 3 deletions docs/Getting_Started/release_notes/0.6.0.md
Expand Up @@ -8,7 +8,7 @@ Build wise it's moved to maven and github actions to simplify rolling out any fu

## Scalaz Iteratee

The pull api (and async pull api) remains the same, however user iteratee usage will have to change. Largely this involves swapping Cont and Done for cont and done and migrating enumeratee / run usage to:
The pull api (and async pull api) has changed to be fully monadic and will require code changes, the default Id implementation is not tail recursive and leads to SOEs. Direct usage of iteratee's largely involves swapping Cont and Done for cont and done and migrating enumeratee / run usage to:

```scala
(iteratee &= iteratorEnumerator(pull.it)) run
Expand All @@ -20,9 +20,169 @@ instead of
iteratee(pull.it) run
```

this is because enumerators are no longer implicitly bound, things are notably more verbose. Scalaz provides a good selection of useful starting enumerators.
this is because enumerators are no longer implicitly bound, things are notably more verbose. Scalaz now provides a good selection of useful starting enumerators.

Also note that Iteratee[E,A] is a type alias for Iteratee[E,Id,A] so everything is "wrapped" in an Id container (Id[X]=X), as such you may need to specify types to get a proper compilation.
Iteratee[E,A] is a type alias for Iteratee[E,Id,A] so everything is "wrapped" in an Id container (Id[X]=X), as such you may need to specify an appropriate Monad type to get a proper compilation and run, e.g. IO or Trampoline. Indeed, if the compiler doesn't present an error it's typically derived as Id, which can SOE.

To aid in this, and type-inference given a specific monad, the following helper function and defaults are provided:

```scala
def iterateesOf[F[_]]: IterateeFunctions[F]

val ioIteratees = iterateesOf[IO]
val trampolineIteratees = iterateesOf[Trampoline]
// not recommended but may help migrations
val idIteratees = iterateesOf[Id]

```

The functions in IterateeFunctions already have F captured with the type TheF (although F is not stored as an implicit given conflicts with import Scalaz._). So using:

```scala
import scales.utils._
import scales.utils.trampolineIteratee._
```

will bring in shadowing functions already defined within Trampoline, allowing better type inference.

### Upgrading to Monadic usage

IterV would allow non-monadic usage with tail recursion, this is no longer directly possible in Scalaz 7 IterateeT usage. A "great" example of this is the foldOnDone method implementation, the original IterV based code is fairly simple:

````scala

var currentI = initResumable(it).eval
var isdone = isDone(currentI)
var currentA = initAcc
while( !isdone || (isdone && !isEOF(currentI)) ) {

if (isdone) {
val a = extract(currentI)
if (!a.isDefined)
return currentA
else {
currentA = f(currentA, a.get)
currentI = extractCont(currentI)
}
}

currentI = currentI(it).eval
isdone = isDone(currentI)
}
currentA
````

i.e. keep extracting the next continuation and call the f fold function when you find a done. The code runs fast (400ms on the dev box) and in constant space, as you'd expect. An "obvious" translation to a monadic version is:

```scala
val starter = (initResumable &= e).eval

val r =
(foldIM[ACC,F,(ACC, ResumableIter[E,F,A], Boolean)]((p, a) => {
val (currentA, itr, _) = a
for {
isdone <- isDone(itr)
iseof <- isEOF(itr)

res <-
if (isdone && !iseof) {
val a = extract(itr)
F.map(a) { a =>
if (!a.isDefined)
(currentA, itr, true)
else
(f(currentA, a.get), extractCont(itr), false)
}
} else
F.point((currentA, itr, true))
} yield {
val (currentA, itr, done) = res

(currentA, (itr &= e).eval, done)
}
})(init = (initAcc, starter, false), stopOn = a => a._3) &= repeat[ACC,F](initAcc) ) run

F.map( r ) { r =>
val ((acc, nr, b), cont) = r
acc
}
```

We have to loop, so we use a fold with an early exit and drive it through a never ending enumerator (whose value "p" above is ignored), unlike foldI, foldIM expects an accumulator wrapped in the monad. As we are looping we cannot return and have to handle all exit criteria directly, so more "if" branches are introduced.

Finally, as we must stay in the monad there is lots of direct map/point and of course the hidden bind/flatmaps. This code runs *very* slowly - 20 seconds (on the dev box), we've gone from a non-monadic version at 400ms to 20s, why?

[This SO post](https://stackoverflow.com/a/41128527/1028537) does an excellent job of explaining how seemingly innocuous and correct code can become a very large chain of O(N) calls under the hood. This version of "r" brings us back to the 400-450ms mark (still a bit slower but no SOEs and monadic):

```scala
val r =
(foldIM[ACC,F,(ACC, ResumableIter[E,F,A], Boolean)]((p, a) => {
val (currentA, itr, _) = a
for {
step <- itr.value
isdone = isDoneS(step)
iseof = isEOFS(step)

shouldStop = (currentA, itr, true)

res =
if (isdone && !iseof) {
val a = extractS(step)
if (a.isEmpty)
shouldStop
else
(f(currentA, a.get), extractContS(step), false)
} else
shouldStop

} yield {
val (currentA, itr, done) = res

(currentA, (itr &= e).eval, done)
}
})(init = (initAcc, starter, false), stopOn = a => a._3) &= repeat[ACC,F](initAcc) ) run
```

Here, instead of 2 flatmaps and 1 flatmap containing a map that is also mapped we are left with only one flatmap - that of the underlying step. To enable this there are now new functions ending in S designed to behave like the old IterV based function and stay out of monads. If your code runs slower than expected this is a likely culprit.

Per the above example you can use foldIM with a repeating enum and a stopOn clause, or if you know the range of times you wish to repeat you can use the Scalaz foldM and a fixed enumerator (e.g. iteratorEnumerator)

### BEWARE THE ITERATOR

As Trampolining is now used anything that generates with side effects e.g. Iterator[PullType] can case significant oddities as one part of the code you'd reasoned is finished starts up again and calls next.

In order to aid this there is a memory and state trade off to be had and wrapping the iterator in EphemeralStream. To aid this three functions are introduced:

```scala
package scales.utils.iteratee

object EphemeralStreamEnum {
def enumEphemeralStream[E, F[_] : Monad](xs: EphemeralStream[E]): EnumeratorT[E, F]

def enumEphemeralStreamF[E, F[_] : Monad](state: EphemeralStream[E] => Unit)(xs: EphemeralStream[E]): EnumeratorT[E, F]

def toEphemeral[A](iterator: Iterator[A]): EphemeralStream[A]
}
```

toEphemeral safely wraps an iterator in EphemeralStream ensuring trampolining does not trigger more .next calls.

The enumEphemeralStreamF variant lets you use a workaround to keep the progress and restart processing the stream:

```scala
var theStream: EphemeralStream[PullType] = toEphemeral(xmlpull: Iterator[PullType])
val func = (e: EphemeralStream[PullType]) => {iter = e}

def enum(e: EphemeralStream[PullType]) = enumEphemeralStreamF[PullType, TheF](func)(e)

val starter = (ionDone &= enum(theStream)).eval

// some time later, restart the stream processing
(extractCont(starter) &= enum(theStream)).eval
```

!!! Note "Consider iteratee composition"
If you are using iteratee's already consider composing them via for and only processing the stream once (i.e. not using extractCont) if intermediate values aren't needed.

## Scala 2.13 support

Expand Down
2 changes: 1 addition & 1 deletion docs/Parsing_XML/PullParsing.md
Expand Up @@ -13,7 +13,7 @@ Some of the advanced features of Pull Parsing may require importing Scalaz as we
```scala
import scalaz._
import Scalaz._
import IterV._ // may not always be required
import iteratee._ // may not always be required
```

## Pull Model
Expand Down
2 changes: 1 addition & 1 deletion docs/Parsing_XML/RepeatedSections.md
Expand Up @@ -6,7 +6,7 @@ Many documents however have a more complex structure, of many repeated or altern

## Supported Repeating Section Examples

Its far easier to discuss the solution with a few examples of the problem:
It's far easier to discuss the solution with a few examples of the problem:

### Alternating and Repeating Elements

Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Expand Up @@ -17,7 +17,7 @@ Scales XML is an alternative XML library for Scala, its design started with the

The answer for XML tends naturally to trees and zippers, enabling a combined model for both XML Tree handling and XML Event handling. This allows opportunities for saving memory usage and increasing performance.

The design aims of Scales Xml also target correctness first, an Iteratee based processing for Pull, an XPath like syntax for querying and manipulation and deep support for JAXP.
The design aims of Scales Xml also target correctness first, an Iteratee based processing for Pull, an XPath like syntax for querying and manipulation and deep support for JAXP. See the release notes for [Iteratee upgrade information](Getting_Started/release_notes/0.6.0.md#scalaz-iteratee).

__The main focus areas are__

Expand Down
5 changes: 2 additions & 3 deletions jaxen/src/test/scala/scales/xml/jaxen/JaxenPullTest.scala
@@ -1,8 +1,7 @@
package scales.xml.jaxen

import scales.xml._
/*

class JaxenPullTest extends PullTest {

}
*/
}
5 changes: 2 additions & 3 deletions saxon-tests/src/test/scala/scales/xml/SaxonPullTest.scala
@@ -1,6 +1,5 @@
package scales.xml
/*

class SaxonPullTest extends PullTest {

}
*/
}

0 comments on commit d92bdb5

Please sign in to comment.