Skip to content

Commit

Permalink
#53 - safety commit - async iteratees working - one fail in pulltests…
Browse files Browse the repository at this point in the history
… left
  • Loading branch information
chris-twiner committed Jan 15, 2024
1 parent 6bd5120 commit 1c0f5b4
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 51 deletions.
21 changes: 11 additions & 10 deletions aalto/src/main/scala/scales/xml/parser/pull/aalto/AsyncParser.scala
Expand Up @@ -277,7 +277,6 @@ object AsyncParser {
// Duplicates happen when restarting the processing in the face of trampolines. Using run without asynchronous empties does not suffer this issue
None


def EOF: ResumableStep[DataChunk, F, EphemeralStream[PullType]] = {
parser.closeResource

Expand All @@ -297,20 +296,21 @@ object AsyncParser {
//println("Did get a large chunk " + new String(e.array, e.offset, e.length, "UTF-8") + " e " + System.identityHashCode(e) + " r " + System.identityHashCode(r))

r(el = es => {
// println("got " + es.toList)
//println("got el with es " + es.isEmpty + " feeder " + parser.feeder.needMoreInput)
Done((es,
iterateeT(Monad[F].point(Cont(
step
)))), Input.Empty[DataChunk])
},
// println("got " + es.toList)
//println("got el with es " + es.isEmpty + " feeder " + parser.feeder.needMoreInput)
Done((es,
iterateeT(Monad[F].point(Cont(
step
)))), Input.Empty[DataChunk])
},
empty =
//println("empty from input")
//emptyness
Cont(step)
,
eof =
eof = {
EOF
}
)
}
},
Expand All @@ -320,8 +320,9 @@ object AsyncParser {
//Done((EphemeralStream.empty, Cont(step)), IterV.Empty[DataChunk]) // nothing that can be done on empty
Cont(step)
},
eof =
eof = {
EOF
}
)
))

Expand Down
Expand Up @@ -8,6 +8,7 @@ import scalaz.iteratee.StepT.Done
import scalaz._
import Scalaz._
import junit.framework.Assert.assertTrue
import scales.utils.iteratee.functions.referenceDedup
import scales.utils.trampolineIteratees._

class AsyncPullTest extends junit.framework.TestCase {
Expand Down Expand Up @@ -346,8 +347,6 @@ class AsyncPullTest extends junit.framework.TestCase {
r <- itr
// stopped on EOF for the cont - but never pushes the eof to the parser
((cont, _, count), remainingCont) = r
// for side effect of closing
res <- (cont &= dataChunkerEnumerator(wrapped)).run
//step <- c.value
step <- cont.value
} yield {
Expand Down
12 changes: 12 additions & 0 deletions docs/Getting_Started/release_notes/0.6.0.md
Expand Up @@ -312,6 +312,18 @@ val starter = (ionDone &= enum(theStream)).eval
!!! Note "Consider iteratee composition"
If you are using iteratee's already consider composing them via for and only processing the stream once (i.e. not using extractCont) if intermediate values aren't needed.

### Beware Side Effecting Output State as well

Although you may have written your iteratee's in a functional fashion (passing state to the next steps) if you are using side effecting state (like the base use case of processing a stream of xml pull events) if you are not running your IterateeT with ".run" you may fall afoul of trampolining restarting your state.

This can be seen as duplicate events being sent or, as a result, events seemingly being out of order. This is does not happen when running in Id or (at least it has not be observed to be the case) when using .run.

Using eval or evalAcceptEmpty to process continuations of the stream when faced with empty (instead of Eof as with .run) can leave the iteratee needing to restart outside of the trampoline mechanism. As of Scalaz 7.3.8 this triggers restarts of at least the previous iteratee - not the resulting next cont (again this does not happen with Id).

As counter-intuitive as it seems this requires managing duplicate feed detection in the iteratee. Both pushXmlIter and AsyncParser face this issue, state is moved to a var closure and next inputs are checked against the previous inputs. Given their use cases this is acceptable, each iteratee that manages state needs to verify this.

It is highly likely this behaviour is tied to that of the above iterator warning and possibly some bug.

## Scala 2.13 support

2.13 re-worked much of the internal collection logic, including CanBuildFrom. Scales required the ability to swap out the actual container used for Tree's in order to reduce allocation cost (yielding better performance).
Expand Down
Expand Up @@ -219,46 +219,42 @@ trait ReadableByteChannelWrapperImplicits {
def apply(stepT: StepT[E, F, A], count: Int): IterateeT[E, F, A] = stepT match {
case i if chunker.underlyingClosed || chunker.isClosed => iterateeT(F.point(i))
case i@Done(acc, input) =>
val c = acc
val in = input
iterateeT(F.point(i))
case Cont(k) =>
val realChunk = F.point(chunker.nextChunk)
iterateeT(
F.bind(realChunk) { realChunk =>
val realChunk = chunker.nextChunk

val nextChunk = realChunk.asInstanceOf[E]
val nextI =
if (realChunk.isEOF)
// println("actual data was EOF !!!")
k(Eof[E])
else
if (realChunk.isEmpty)
k(Empty[E])
else
k(Element(nextChunk))

// if (realChunk != EOFData) {
// println("got chunk " + new String(realChunk.array, realChunk.offset, realChunk.length))
// }
iterateeT(
nextI.value >>= { nextIStep =>

val nextChunk = realChunk.asInstanceOf[E]
val nextI =
if (realChunk.isEOF)
// println("actual data was EOF !!!")
k(Eof[E])
val res = {
val nc =
if (realChunk.isEmpty && !isDoneS(nextIStep))
count + 1
else
if (realChunk.isEmpty)
k(Empty[E])
else
k(Element(nextChunk))

nextI.value >>= { nextIStep =>

val res = {
val nc =
if (realChunk.isEmpty && !isDoneS(nextIStep))
count + 1
else
0

if ((contOnCont != INFINITE_RETRIES) && (nc > contOnCont))
//println("had cont on cont count, returning")
nextI
else
apply(nextIStep, nc)
}
res.value
}
0

if ((contOnCont != INFINITE_RETRIES) && (nc > contOnCont))
//println("had cont on cont count, returning")
nextI
else
apply(nextIStep, nc)
}
)
res.value
})

}

apply(i, 0)
Expand Down
55 changes: 50 additions & 5 deletions scales-xml/src/main/scala/scales/utils/iteratee/Iteratees.scala
Expand Up @@ -881,13 +881,13 @@ object functions {

if (Eof.unapply(y) && !internalEOF) {
// signal the end here to toMany, don't care about result, tested by testSimpleLoad and testRandomAmounts in AsyncPullTest
iterateeT(F.map(returnThis.value) { v =>
toMany.foldT(done = (a1, y1) => F.point(false),
iterateeT(F.bind(returnThis.value) { v =>
toMany.foldT(done = (a1, y1) => returnThis.value,
cont = k => {
k(Eof[E]);
F.point(false)
F.map(k(Eof[E]).value){
_ => v
}
})
v
})
} else
returnThis
Expand Down Expand Up @@ -934,6 +934,51 @@ object functions {

through(itr)
}


/**
* Drives an iteratee only calling Done when the Done(Element) is not the same reference (element ne prev_element),
* allows iteratee's to be side effect free even when trampolining restarts.
*
* This iteratee _does_ however use state you cannot copy an interim value and re-use it safely.
*
* When EOF is reached the last used E will be returned (or null should there never have been one)
*
* @tparam E
* @tparam F
* @return
commented as, although it works, it doesn't solve the problem of restarting iteratees sending duplicate state in trampolines
def referenceDedup[E, F[_]](implicit F: Monad[F]): IterateeT[E, F, E] = {
var prev: Option[E] = None
def usePrev(nextE: E): Option[E] = prev.fold{
prev = Some(nextE)
prev
} {prevI =>
if (System.identityHashCode(nextE) != System.identityHashCode(prevI)) {
prev = Some(nextE)
prev
} else None
}
def step(input: Input[E]): IterateeT[E, F, E] =
iterateeT(F.point(input(
el = e => {
val r = usePrev(e)
r.fold{
Cont(step)
}{ e =>
Done(e, Empty[E])
}
},
empty = Cont(step),
eof = Done(prev.get, Eof[E])
)))
iterateeT(F.point(Cont(step)))
} */
}

trait Iteratees {
Expand Down

0 comments on commit 1c0f5b4

Please sign in to comment.