Skip to content

Commit

Permalink
Added serde factories
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed May 5, 2024
1 parent 5b6cd8c commit a6ef0f0
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 30 deletions.
@@ -0,0 +1,32 @@
package com.sksamuel.centurion.avro.io

import java.util.concurrent.ConcurrentHashMap
import kotlin.reflect.KClass

/**
* A [CachedReflectionSerdeFactory] will create a [Serde] once for a given type via delegation
* to a [ReflectionSerdeFactory] and return that cached [Serde] upon future invocations.
*
* This instance is thread safe.
*/
object CachedReflectionSerdeFactory {

private val cache = ConcurrentHashMap<KClass<*>, Serde<*>>()

/**
* Creates or returns a [Serde] for the given [kclass].
*/
fun <T : Any> create(
kclass: KClass<T>,
options: SerdeOptions = SerdeOptions()
): Serde<T> {
return cache.getOrPut(kclass) { ReflectionSerdeFactory.create(kclass, options) } as Serde<T>
}

/**
* Creates or returns a [Serde] from the given type parameter [T].
*/
inline fun <reified T : Any> create(options: SerdeOptions = SerdeOptions()): Serde<T> {
return create(T::class, options)
}
}
@@ -0,0 +1,34 @@
package com.sksamuel.centurion.avro.io

import com.sksamuel.centurion.avro.decoders.SpecificRecordDecoder
import com.sksamuel.centurion.avro.encoders.SpecificRecordEncoder
import com.sksamuel.centurion.avro.generation.ReflectionSchemaBuilder
import kotlin.reflect.KClass

/**
* A [ReflectionSerdeFactory] will create a [Serde] for any given type using reflection based builders.
*
* This instance is thread safe.
*/
object ReflectionSerdeFactory {

/**
* Creates a [Serde] reflectively from the given [kclass] using a [ReflectionSchemaBuilder].
*/
fun <T : Any> create(
kclass: KClass<T>,
options: SerdeOptions = SerdeOptions()
): Serde<T> {
val schema = ReflectionSchemaBuilder(true).schema(kclass)
val encoder = SpecificRecordEncoder(kclass)
val decoder: SpecificRecordDecoder<T> = SpecificRecordDecoder(kclass)
return Serde(schema, encoder, decoder, options)
}

/**
* Creates a [Serde] reflectively from the given type parameter [T] using a [ReflectionSchemaBuilder].
*/
inline fun <reified T : Any> create(options: SerdeOptions = SerdeOptions()): Serde<T> {
return create(T::class, options)
}
}
@@ -1,22 +1,21 @@
package com.sksamuel.centurion.avro.io

import com.sksamuel.centurion.avro.decoders.Decoder
import com.sksamuel.centurion.avro.decoders.SpecificRecordDecoder
import com.sksamuel.centurion.avro.encoders.Encoder
import com.sksamuel.centurion.avro.encoders.SpecificRecordEncoder
import com.sksamuel.centurion.avro.generation.ReflectionSchemaBuilder
import org.apache.avro.Schema
import org.apache.avro.file.Codec
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.avro.io.EncoderFactory
import kotlin.reflect.KClass

/**
* A [Serde] provides an easy way to convert between data classes and avro encoded bytes.
* A [Serde] provides an easy way to convert between a specific data class [T] and avro encoded bytes
* by delegating to an [Encoder] and [Decoder] that handles that type.
*
* This class is thread safe.
* If you wish to create a [Serde] reflectively, see [ReflectionSerdeFactory].
*
* This class is thread safe once constructed.
*/
class Serde<T : Any>(
schema: Schema,
Expand All @@ -30,30 +29,6 @@ class Serde<T : Any>(
GenericData.get().setFastReaderEnabled(true)
}

companion object {

/**
* Creates a [Schema], [Encoder] and [Decoder] reflectively from the given [kclass]
* using a [ReflectionSchemaBuilder].
*/
operator fun <T : Any> invoke(
kclass: KClass<T>,
options: SerdeOptions = SerdeOptions()
): Serde<T> {
val schema = ReflectionSchemaBuilder(true).schema(kclass)
val encoder = SpecificRecordEncoder(kclass)
val decoder: SpecificRecordDecoder<T> = SpecificRecordDecoder(kclass)
return Serde(schema, encoder, decoder, options)
}

/**
* Creates a [Schema] reflectively from the given type parameter [T] using a [ReflectionSchemaBuilder].
*/
inline operator fun <reified T : Any> invoke(options: SerdeOptions = SerdeOptions()): Serde<T> {
return Serde(T::class, options)
}
}

private val encoderFactory = EncoderFactory()
.configureBufferSize(options.encoderBufferSize)
.configureBlockSize(options.blockBufferSize)
Expand Down

0 comments on commit a6ef0f0

Please sign in to comment.