Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mechanism to use / extend ClassResolver #287

Closed
ilx opened this issue Apr 19, 2023 · 1 comment
Closed

Mechanism to use / extend ClassResolver #287

ilx opened this issue Apr 19, 2023 · 1 comment

Comments

@ilx
Copy link

ilx commented Apr 19, 2023

Hi, let me explain original issue first:

  • we use cglib to create proxy classes and we mark generated classes with a custom interface and provide a custom serializer
  • when Kryo 5 serializes immutable collection (non-generic) then elements are serialized by this snippet in CollectionSerializer:
} else {
for (Object element : collection)
	kryo.writeClassAndObject(output, element);
}
  • kryo.writeClassAndObject uses writeClass(output, object.getClass()); and writes generated class name:
	public Registration writeClass (Output output, Class type) {
		if (output == null) throw new IllegalArgumentException("output cannot be null.");
		try {
			return classResolver.writeClass(output, type);
		} finally {
			if (depth == 0 && autoReset) reset();
		}
	}
  • ClassResolver writes a class name:
	public Registration writeClass (Output output, Class type) {
		if (type == null) {
			if (TRACE || (DEBUG && kryo.getDepth() == 1)) log("Write", null, output.position());
			output.writeByte(Kryo.NULL);
			return null;
		}
		Registration registration = kryo.getRegistration(type);
		if (registration.getId() == NAME)
			writeName(output, type, registration);
  • where writeName method uses:
if (registration.isTypeNameAscii())
	output.writeAscii(type.getName());
else
	output.writeString(type.getName());

in other words Kryo writes a name of the class that is generated (something like $Configuration$$EnhancerByCGLIB$$d0595dcf), later it finds registered serializer and serializes cglibproxy object via registered serializer.

The problem happens when the other side attempts deserialization: there is no class named like $Configuration$$EnhancerByCGLIB$$d0595dcf as it was generated in a different JVM.

One way to go around this problem could be to override ClassResolver (since all of classresolvers extend DefaultClassResolver) and provide custom implementation of writeName .

Providing custom CollectionSerializer does not look like a viable option (see EsotericSoftware/kryo#943 for an attempt to do something similar).

Now, Kryo allows us to pass custom ClassResolver, but KryoSerializer is quite opinionated (because it provides custom classresolvers), so I had to resort to reflection:

// inside KryoInit extends DefaultKryoInitializer {

    val classResolver = kryo.getClassResolver
    classResolver match {
      case resolver: DefaultClassResolver =>
         // I really did not want to use reflection, but alternative is to re-implement/copy KryoSerializer (from altoo) in order to use custom classresolver
         replaceClassResolver(kryo, resolver)
      case _ =>
        throw new IllegalStateException(s"Unable to replace unknown classResolver type ${classResolver.getClass.getName}")
    }
  }

  private def replaceClassResolver(kryo: ScalaKryo, original: DefaultClassResolver): Unit = {
    // Kryo might use CollectionSerializer under the hood to serialize "syntheticProperties" collection.
    // If this happens it will write name of the class and then serialize it using registered serializer
    // If collection element is a cglib proxy it means that the class was generated only on the node where serialization happens
    // This is a problem because other nodes in the cluster do not know about this custom class - those nodes do have serializer
    // and can deserialize a reference into their own proxy.
    // That's why we want to tell Kryo that it should use "ResolvableLazyConfigurationItem" as a class name instead of the one created by cglib.
    // Use of KryoSerializer and Akka make this difficult to test on a single node and with cluster you have to debug through Kryo codebase
    // to see which serializers and class names are being picked up.
    val kryoClazz = classOf[Kryo]
    val classResolverField = kryoClazz.getDeclaredField("classResolver")
    classResolverField.setAccessible(true)
    val xlrClassResolver = new XlrClassResolver(original)
    classResolverField.set(kryo, xlrClassResolver)
    xlrClassResolver.setKryo(kryo)
  }

and then again in custom classresolver:

class XlrClassResolver(delegate: DefaultClassResolver) extends ClassResolver with Logging {
  private val resolvableLazyConfigurationItemClazz = classOf[ResolvableLazyConfigurationItem]
  private val m = classOf[DefaultClassResolver].getDeclaredMethod("writeName", classOf[Output], classOf[Class[_]], classOf[Registration])
  m.setAccessible(true)

  override def setKryo(kryo: Kryo): Unit = {
    delegate.setKryo(kryo)
  }

  def doWriteName(output: Output, typ: Class[_], registration: Registration): Unit = {
    m.invoke(delegate, output, typ, registration)
  }

  override def register(registration: Registration): Registration = delegate.register(registration)

  override def unregister(classID: Int): Registration = delegate.unregister(classID)

  override def registerImplicit(typ: Class[_]): Registration = delegate.registerImplicit(typ)

  override def getRegistration(typ: Class[_]): Registration = delegate.getRegistration(typ)

  override def getRegistration(classID: Int): Registration = delegate.getRegistration(classID)

  override def writeClass(output: Output, typ: Class[_]): Registration = {
    if (null != typ && resolvableLazyConfigurationItemClazz.isAssignableFrom(typ)) {
      val registration = delegate.getRegistration(resolvableLazyConfigurationItemClazz)
      doWriteName(output, resolvableLazyConfigurationItemClazz, registration)
      registration
    } else {
      delegate.writeClass(output, typ)
    }
  }

  override def readClass(input: Input): Registration = delegate.readClass(input)

  override def reset(): Unit = delegate.reset()
}

I would like to avoid reflection and directly extend classresolvers used by KryoSerializer and pass custom class resolver (which should extend "expected" classresolver type for particular akka-kryo configuration) via config to akka-kryo-serialization.

Something like:

    akka-kryo-serialization {
        type = "graph"
        id-strategy = "default"
        buffer-size = 4096
        max-buffer-size = -1
        use-manifests = true
        use-unsafe = false
        post-serialization-transformations = "lz4"
        kryo-initializer = "custom.KryoInit"
        implicit-registration-logging = true
        kryo-trace = false
        class-resolver="custom.MyClassResolver" // new option...
    }

and then io.altoo.akka.serialization.kryo.KryoSerializer#getKryo should be a bit more complicated as it would have to check if there is a custom class-resolver and if it is extending expected class resolver.

Alternatively, just make getKryo protected so we can extend KryoSerializer and provide our own implementation of getKryo.

@nvollmar
Copy link
Member

nvollmar commented Jul 7, 2023

Sorry, this kind of went under the radar.
I guess the option to provide a class-resolver via config would make sense for such scenarios, but the class resolver chosen depends on other settings:

    val classResolver =
      if (settings.idStrategy == "incremental") new KryoClassResolver(settings.implicitRegistrationLogging)
      else if (settings.resolveSubclasses) new SubclassResolver()
      else new DefaultClassResolver()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants