Skip to content

Commit

Permalink
Added data reader/writer
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed May 5, 2024
1 parent 1537180 commit 1c45b96
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 183 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
package com.sksamuel.centurion.avro.io

import org.apache.avro.Schema
import org.apache.avro.file.Codec
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import java.io.InputStream
import java.nio.ByteBuffer

/**
* Creates an [BinaryReaderFactory] for a given schema which can then be used
* to create [BinaryReader]s.
*
* All readers created from this factory share a thread safe [DatumReader] for efficiency.
* All [BinaryReader]s created from this factory share a thread safe [DatumReader] for efficiency.
*
* Pass in a pre-created [DecoderFactory] if you wish to configure buffer size.
*/
Expand All @@ -27,7 +25,44 @@ class BinaryReaderFactory(
constructor(reader: Schema, writer: Schema) : this(reader, writer, DecoderFactory.get())
constructor(schema: Schema, factory: DecoderFactory) : this(schema, schema, factory)

private val datumReader = GenericDatumReader<GenericRecord>(/* writer = */ writer, /* reader = */ reader)
private val datum = GenericDatumReader<GenericRecord>(/* writer = */ writer, /* reader = */ reader)

companion object {

/**
* Reads a [GenericRecord] from the given [bytes].
*
* This method is a convenience function that is useful when you want to read a single record.
* If you wish to read multiple records, create a [BinaryReader] using a [BinaryReaderFactory].
* This will also allow customization of the [DecoderFactory] and schema evolution.
*/
fun fromBytes(schema: Schema, bytes: ByteArray): GenericRecord {
val datumReader = GenericDatumReader<GenericRecord>(/* writer = */ schema, /* reader = */ schema)
return BinaryReader(
datumReader = datumReader,
input = null,
bytes = bytes,
factory = DecoderFactory.get(),
).read()
}

/**
* Reads a [GenericRecord] from the given [bytes].
*
* This method is a convenience function that is useful when you want to read a single record.
* If you wish to read multiple records, create a [BinaryReader] using a [BinaryReaderFactory].
* This will also allow customization of the [DecoderFactory] and schema evolution.
*
* The given [input] stream will be closed after this function returns.
*
* This variant is slower than using a byte array. If you already have
* the bytes available, that should be preferred.
*/
fun fromBytes(schema: Schema, input: InputStream): GenericRecord {
val datumReader = GenericDatumReader<GenericRecord>(/* writer = */ schema, /* reader = */ schema)
return BinaryReader(datumReader, input, null, DecoderFactory.get()).use { it.read() }
}
}

/**
* Creates an [BinaryReader] that reads from the given [InputStream].
Expand All @@ -36,52 +71,20 @@ class BinaryReaderFactory(
* the bytes available, that should be preferred.
*/
fun reader(input: InputStream): BinaryReader {
return BinaryReader(datumReader, input, null, factory)
return BinaryReader(datum, input, null, factory)
}

/**
* Creates an [BinaryReader] that reads from the given [ByteArray].
*
* Pass in a [Codec] if the input is compressed.
*/
fun reader(bytes: ByteArray, codec: Codec? = null): BinaryReader {
fun reader(bytes: ByteArray): BinaryReader {
return BinaryReader(
datumReader = datumReader,
datumReader = datum,
input = null,
bytes = codec?.decompress(ByteBuffer.wrap(bytes))?.array() ?: bytes,
bytes = bytes,
factory = factory
)
}

/**
* Reads avro encoded bytes from the given [bytes] to a [GenericRecord].
* This method is a convenience function that is useful when you want to read a single record.
* If you wish to read multiple records, create a [BinaryWriter] using [reader].
*
* Pass in a [Codec] if the input is compressed.
*/
fun read(bytes: ByteArray, codec: Codec? = null): GenericRecord {
return BinaryReader(
datumReader = datumReader,
input = null,
bytes = codec?.decompress(ByteBuffer.wrap(bytes))?.array() ?: bytes,
factory = factory
).read()
}

/**
* Reads avro encoded bytes from the given [input] stream to a [GenericRecord].
* This method is a convenience function that is useful when you want to read a single record.
* If you wish to read multiple records, create a [BinaryWriter] using [reader].
*
* The given [input] stream will be closed after this function returns.
*
* This variant is slower than using a byte array. If you already have
* the bytes available, that should be preferred.
*/
fun read(input: InputStream): GenericRecord {
return BinaryReader(datumReader, input, null, factory).use { it.read() }
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package com.sksamuel.centurion.avro.io

import org.apache.avro.Schema
import org.apache.avro.file.Codec
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DatumWriter
import org.apache.avro.io.EncoderFactory
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.nio.ByteBuffer

///**
// * An [AvroWriter] will write [GenericRecord]s to an output stream.
Expand All @@ -23,8 +21,9 @@ import java.nio.ByteBuffer
// */

/**
* Creates an [BinaryWriterFactory] for a given schema which can then be used
* to create [BinaryWriter]s. All writers created from this factory share a thread safe [DatumWriter].
* Creates a [BinaryWriterFactory] for a given schema which can then be used to create [BinaryWriter]s.
*
* All writers created from this factory share a thread safe [DatumWriter].
*
* Pass in a pre-created [EncoderFactory] if you wish to configure buffer size.
*/
Expand All @@ -34,99 +33,78 @@ class BinaryWriterFactory(
) {

/**
* Creates an [BinaryWriterFactory] with the default [EncoderFactory].
* Creates a [BinaryWriterFactory] with the default [EncoderFactory].
*/
constructor(schema: Schema) : this(schema, EncoderFactory.get())

companion object {

/**
* Creates an avro encoded byte array from the given [record].
* This method is a convenience function that is useful when you want to write a single record.
*
* Pass in a [Codec] to compress output.
* This method is a convenience function that is useful when you want to write a single record
* in a single method call.
*
* For better performance, considering creating a [BinaryWriterFactory] which will use
* a shared [GenericDatumWriter] and allows customizating the [EncoderFactory].
*/
fun write(record: GenericRecord, codec: Codec? = null): ByteArray {
val datumWriter = GenericDatumWriter<GenericRecord>(record.schema)
fun toBytes(record: GenericRecord): ByteArray {
val baos = ByteArrayOutputStream()
toBytes(record, baos)
return baos.toByteArray()
}

val writer = BinaryWriter(datumWriter, ByteArrayOutputStream(), EncoderFactory.get())
writer.write(record)
writer.close()
return if (codec == null) writer.bytes() else {
val compressed = codec.compress(ByteBuffer.wrap(writer.bytes()))
val b = ByteArray(compressed.remaining())
compressed.get(b)
b
}
/**
* Writes avro encoded bytes to the given [output] stream from the given [record].
*
* This method is a convenience function that is useful when you want to write a single record
* in a single method call.
*
* The given [output] stream will be closed after this function returns.
*
* For better performance, considering creating a [BinaryWriterFactory] which will use
* a shared [GenericDatumWriter] and allows customizating the [EncoderFactory].
*/
fun toBytes(record: GenericRecord, output: OutputStream) {
val datumWriter = GenericDatumWriter<GenericRecord>(record.schema)
BinaryWriter(datumWriter, output, EncoderFactory.get()).use { it.write(record) }
}
}

private val datumWriter = GenericDatumWriter<GenericRecord>(schema)

/**
* Creates an [BinaryWriter] that writes to the given [OutputStream].
* Creates a [BinaryWriter] that writes to the given [OutputStream].
* Calling close on the created writer will close this stream and ensure data is flushed.
*/
fun writer(output: OutputStream): BinaryWriter {
return BinaryWriter(datumWriter, output, factory)
}

/**
* Creates an [BinaryWriter] that uses a [ByteArrayOutputStream].
* Creates a [BinaryWriter] that uses a [ByteArrayOutputStream].
* Once records have been written, users can call bytes() to retrieve the [ByteArray].
*/
fun writer(): BinaryWriter {
return BinaryWriter(datumWriter, ByteArrayOutputStream(), factory)
}

/**
* Creates an avro encoded byte array from the given [record].
* This method is a convenience function that is useful when you want to write a single record.
* If you wish to write multiple records, create a [BinaryWriter] using [writer].
*
* Pass in a [Codec] to compress output.
*/
fun write(record: GenericRecord, codec: Codec? = null): ByteArray {
val writer = BinaryWriter(datumWriter, ByteArrayOutputStream(), factory)
writer.write(record)
writer.close()
return if (codec == null) writer.bytes() else {
val compressed = codec.compress(ByteBuffer.wrap(writer.bytes()))
val b = ByteArray(compressed.remaining())
compressed.get(b)
b
}
}

/**
* Writes avro encoded bytes to the given [output] stream from the given [record].
* This method is a convenience function that is useful when you want to write a single record.
* If you wish to write multiple records, create a [BinaryWriter] using [writer].
*
* The given [output] stream will be closed after this function returns.
*/
fun write(record: GenericRecord, output: OutputStream) {
BinaryWriter(datumWriter, output, factory).use { it.write(record) }
}
}

/**
* An [BinaryWriter] is a non-thread safe, one time use, writer to a given stream.
* A [BinaryWriter] is a non-thread safe, one time use, writer to a given stream.
*
* Call [close] when all records have been written to ensure data is flushed to the underlying stream.
*/
class BinaryWriter(
private val datumWriter: DatumWriter<GenericRecord>,
private val datum: DatumWriter<GenericRecord>,
private val output: OutputStream,
factory: EncoderFactory,
) : AutoCloseable {

private val encoder = factory.binaryEncoder(output, null)

fun write(record: GenericRecord): BinaryWriter {
datumWriter.write(record, encoder)
datum.write(record, encoder)
return this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,29 @@ import java.util.concurrent.ConcurrentHashMap
import kotlin.reflect.KClass

/**
* A [CachedReflectionSerdeFactory] will create a [Serde] once for a given type via delegation
* to a [ReflectionSerdeFactory] and return that cached [Serde] upon future invocations.
* A [CachedReflectionSerdeFactory] will create a [SpecificSerde] once for a given type via delegation
* to a [ReflectionSerdeFactory] and return that cached [SpecificSerde] upon future invocations.
*
* This instance is thread safe.
*/
object CachedReflectionSerdeFactory {

private val cache = ConcurrentHashMap<KClass<*>, Serde<*>>()
private val cache = ConcurrentHashMap<KClass<*>, SpecificSerde<*>>()

/**
* Creates or returns a [Serde] for the given [kclass].
* Creates or returns a [SpecificSerde] for the given [kclass].
*/
fun <T : Any> create(
kclass: KClass<T>,
options: SerdeOptions = SerdeOptions()
): Serde<T> {
return cache.getOrPut(kclass) { ReflectionSerdeFactory.create(kclass, options) } as Serde<T>
): SpecificSerde<T> {
return cache.getOrPut(kclass) { ReflectionSerdeFactory.create(kclass, options) } as SpecificSerde<T>
}

/**
* Creates or returns a [Serde] from the given type parameter [T].
* Creates or returns a [SpecificSerde] from the given type parameter [T].
*/
inline fun <reified T : Any> create(options: SerdeOptions = SerdeOptions()): Serde<T> {
inline fun <reified T : Any> create(options: SerdeOptions = SerdeOptions()): SpecificSerde<T> {
return create(T::class, options)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.sksamuel.centurion.avro.io

import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.file.DataFileReader
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.file.SeekableInput
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory

/**
* Creates a [DataReaderFactory] for a given schema which can then be used to create [DataReader]s.
*
* All readers created from this factory share a thread safe [DatumReader] for efficiency.
*
* Pass in a pre-created [DecoderFactory] if you wish to configure buffer size.
*/
class DataReaderFactory(
reader: Schema,
writer: Schema,
private val factory: DecoderFactory,
private val codecFactory: CodecFactory,
) {

private val datum = GenericDatumReader<GenericRecord>(reader, writer)

/**
* Creates a [DataReader] that reads from the given [ByteArray].
*/
fun reader(bytes: ByteArray): DataReader {
return DataReader(SeekableByteArrayInput(bytes), datum)
}
}

class DataReader(private val input: SeekableInput, datum: DatumReader<GenericRecord>) : AutoCloseable {

private val reader = DataFileReader.openReader(input, datum)

fun hasNext() = reader.hasNext()
fun read(): GenericRecord = reader.next()
fun iterator() = reader.iterator()

override fun close() {
reader.close()
input.close()
}
}

0 comments on commit 1c45b96

Please sign in to comment.