Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] PullByteBufferOut as a default ordering #1728

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from

Conversation

ianoc-stripe
Copy link
Contributor

First extraction.

Right now i don't think we need the macros and can probably drop them again.

Naming/code organization and how this should all get imported open to opinions

We could have the defaults show up in the package object ordered serialization itself?

thoughts ? @johnynek @travisbrown

@@ -0,0 +1,18 @@
/*
Copyright 2015 Twitter, Inc.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we skip the bogus copyright?

  1. year is wrong
  2. stripe is maybe paying you to write this code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah i was going to ask you if you thought we need to put the licence and/or copyright in these at all

*/
package com.twitter.scalding.serialization

case class Exported[T](instance: T) extends AnyVal
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this pattern. Can you document it or link to an explanation? I know @travisbrown likes it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general gist is just that by supplying instances of Exported you can import an object/package supplying them and inject them at a lower priority than a normal import.

So here if you import the default object it would just supply low priority implicits so won't override a user supplied one.

https://github.com/milessabin/export-hook

(i can add a link in the code too)

import com.twitter.scalding.serialization.{OrderedSerialization, DefaultOrderedSerialization}
import scala.reflect.macros.whitebox

class ExportMacros(val c: whitebox.Context) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we actually calling this anywhere?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, i think its killable/not sure if adds something. Was discussing that with @travisbrown towards EOD here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the only exported instance has a concrete type, but once you have generically derived ones the macro here will be necessary (although it turns out it doesn't need to be whitebox, I believe). I've got some notes on this from when I was prepping this talk—I'll try to find them.

if (staticSize.isEmpty)
in.readPosVarInt

_root_.scala.util.Success(unsafeRead(in))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the _root_ just an artifact of a previous macro context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, i've cleaned these up i think so it should be more sane now

import com.twitter.scalding.serialization.{OrderedSerialization, DefaultOrderedSerialization}
import scala.reflect.macros.whitebox

class ExportMacros(val c: whitebox.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the only exported instance has a concrete type, but once you have generically derived ones the macro here will be necessary (although it turns out it doesn't need to be whitebox, I believe). I've got some notes on this from when I was prepping this talk—I'll try to find them.

@@ -510,7 +512,7 @@ class MacroOrderingProperties
}

test("Test out ByteBuffer") {
BinaryOrdering.ordSer[ByteBuffer]
implicitly[OrderedSerialization[ByteBuffer]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to have a test confirming that an explicit instance doesn't get overridden by the DefaultOrderedSerialization import.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, did a test with a companion object defined ordered serialization not being overridden

@ianoc
Copy link
Collaborator

ianoc commented Sep 27, 2017

Ok I think this is looking a bit more cleaned up and may be worth looking at again design wise. I'd be slow to merge this as-is if we think develop should always be ready to released since i think for a release we would want to get everything from the macro out and under provided split up. It would be binary breaking since today a single method can be used to do everything. (Though we could possibly inject a different/new whitebox macro to smooth over most of this... but i'm not sure its worth that vs just moving onto a nicer split up world).

There is also a pre-release question here if the old school macros should be under the same package/import as say our ByteBuffer implementations. This would make it harder to shop around and use the ByteBuffer implementation, but then also use Shapeless.

private[this] def noLengthWrite(element: T, outerOutputStream: OutputStream): Unit = {
// Start with pretty big buffers because reallocation will be expensive
val baos = new ByteArrayOutputStream(512)
unsafeWrite(baos, element)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this implies that unsafeWrite means no size. Can we add that to the comments below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It implies no outer size, it can be a bit wasteful. I'm adding a comment to the definition of this method that I hope will help here a little:

  // This will write out the interior data as a blob with no prepended length
  // This means binary compare cannot skip on this data.
  // However the contract remains that one should be able to _read_ the data
  // back out again.
  def unsafeWrite(out: java.io.OutputStream, t: T): Unit


trait HasUnsafeCompareBinary[T] extends OrderedSerialization[T] {
def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int
def unsafeWrite(out: java.io.OutputStream, t: T): Unit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we drop java.io? Also, I think this has to be the unsized output if there is ever a size header added, right? It seems confusing above since it is used that way, but it is not clear.

It may or may not be sized? Can you add some laws about how to reason about these things?

def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int
def unsafeWrite(out: java.io.OutputStream, t: T): Unit
def unsafeRead(in: java.io.InputStream): T
def unsafeSize(t: T): Option[Int]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the contract here? Again similar concerns as above.

// Members declared in com.twitter.scalding.serialization.Serialization
def read(in: java.io.InputStream): scala.util.Try[T] = o.read(in)
def staticSize: Option[Int] = o.staticSize
def unsafeWrite(out: java.io.OutputStream, t: T): Unit = o.write(out, t).get
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, unsafeWrite could have a size if the original did, but then I guess you could add two sizes, couldn't you (since above we might call noLengthWrite)?

}

def unsafeRead(inputStream: java.io.InputStream): ByteBuffer = {
val lenA = inputStream.readPosVarInt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we write an additional length header on this thing currently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this: can you update the PR to include 3 hand-written combinators:

Tuple2OrderedSerialization EitherOrderedSerialization and ListOrderedSerialization. I think if we can do those three (static-sized product, static-sized sum/union, dynamic-sized product) I think we will see what methods we want to have to enable.

You can tell something is wrong with OrderedSerialization because in our current tuple2 we have no way to avoid deserializing the second part.

I think if we exercise your code in the same PR that does those three we will be able to see more clearly if we have the API improved or not, or if we are still missing something.

@@ -53,6 +54,11 @@ trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable {
* otherwise the caller should just serialize into an ByteArrayOutputStream
*/
def dynamicSize(t: T): Option[Int]

// Override this to provide more efficient
def skip(in: InputStream): Try[Unit] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want def skip(count: Int, in: InputStream): Try[Unit] so in say List[T] we can skip the rest of the collection. If count <= 0 do nothing, and otherwise in the worst case just read and throw them away as you do below.

* This compares two InputStreams. After this call, the position in
* the InputStreams may or may not be at the end of the record.
*/
def compareBinaryNoConsume(a: InputStream, b: InputStream): OrderedSerialization.Result = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get really worried about how to compose methods like this that lack a strong contract. Also, I don't see that we ever call this.

Failure(e)
}

override def compareBinaryNoConsume(inputStreamA: InputStream, inputStreamB: InputStream): OrderedSerialization.Result =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be final?

OrderedSerialization.CompareFailure(e)
}

override def compareBinary(inputStreamA: InputStream, inputStreamB: InputStream): OrderedSerialization.Result =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be final?

@CLAassistant
Copy link

CLAassistant commented Nov 16, 2019

CLA assistant check
All committers have signed the CLA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants