You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am using one consumer to consume multiple topics with different avro schemas. I created a deserializer, but for the lack of non-generic IDeserializer, I had to resort to all kind of weird hacks.
Non-generic I(Async)Deserializer would reduce this code
public class TopicSpecificAvroDeserializer : IDeserializer<ISpecificRecord>
{
private readonly ConcurrentDictionary<string, object> _deserializers = new ConcurrentDictionary<string, object>();
private readonly ISchemaRegistryClient _schemaRegistryClient;
private static readonly IDictionary<string, Type> Configuration = new Dictionary<string, Type>
{
{ "topic.A", typeof(TopicA) },
{ "topic.B", typeof(TopicB) },
};
private delegate object DeserializeDelegate(ReadOnlySpan<byte> data, bool isNull, SerializationContext context);
public TopicSpecificAvroDeserializer(ISchemaRegistryClient schemaRegistryClient)
{
_schemaRegistryClient = schemaRegistryClient ?? throw new ArgumentNullException(nameof(schemaRegistryClient));
}
public ISpecificRecord Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
var deserializer = _deserializers.GetOrAdd(context.Topic, CreateDeserializer);
return CallDeserialize(deserializer, data, isNull, context);
}
private ISpecificRecord CallDeserialize(object deserializer, ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
var instance = Expression.Constant(deserializer);
var dataParameter = Expression.Parameter(typeof(ReadOnlySpan<byte>), "data");
var isNullParameter = Expression.Parameter(typeof(bool), "isNull");
var contextParameter = Expression.Parameter(typeof(SerializationContext), "context");
var method = deserializer.GetType().GetMethod(nameof(IDeserializer<object>.Deserialize));
var call = Expression.Call(instance, method, dataParameter, isNullParameter, contextParameter);
var expression = Expression.Lambda<DeserializeDelegate>(call, dataParameter, isNullParameter, contextParameter);
var func = expression.Compile();
return (ISpecificRecord)func(data, isNull, context);
}
private object CreateDeserializer(string topic)
{
if (!Configuration.TryGetValue(topic, out var type))
throw new InvalidOperationException($"Missing message type for topic {topic}");
var deserializerType = typeof(AvroDeserializer<>).MakeGenericType(type);
var deserializerInstance = Activator.CreateInstance(deserializerType, _schemaRegistryClient, null);
if (deserializerInstance == null)
throw new InvalidOperationException($"Failed to create instance of {typeof(AvroDeserializer<>)} with generic argument {type}");
var syncDeserializerType = typeof(SyncOverAsyncDeserializer<>).MakeGenericType(type);
var syncDeserializerInstance = Activator.CreateInstance(syncDeserializerType, deserializerInstance);
if (syncDeserializerInstance == null)
throw new InvalidOperationException($"Failed to create instance of {typeof(SyncOverAsyncDeserializer<>)} with generic argument {type}");
return syncDeserializerInstance;
}
to this code
public class TopicSpecificAvroDeserializer : IDeserializer<ISpecificRecord>
{
private readonly ConcurrentDictionary<string, IDeserializer> _deserializers = new ConcurrentDictionary<string, IDeserializer>();
private readonly ISchemaRegistryClient _schemaRegistryClient;
private static readonly IDictionary<string, Type> Configuration = new Dictionary<string, Type>
{
{ "topic.A", typeof(TopicA) },
{ "topic.B", typeof(TopicB) },
};
public TopicSpecificAvroDeserializer(ISchemaRegistryClient schemaRegistryClient)
{
_schemaRegistryClient = schemaRegistryClient ?? throw new ArgumentNullException(nameof(schemaRegistryClient));
}
public ISpecificRecord Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
var deserializer = _deserializers.GetOrAdd(context.Topic, CreateDeserializer);
return (ISpecificRecord)deserializer.Deserialize(data, isNull, context);
}
private IDeserializer CreateDeserializer(string topic)
{
if (!Configuration.TryGetValue(topic, out var type))
throw new InvalidOperationException($"Missing message type for topic {topic}");
var deserializerType = typeof(AvroDeserializer<>).MakeGenericType(type);
var deserializerInstance = (IAsyncDeserializer)Activator.CreateInstance(deserializerType, _schemaRegistryClient, null);
if (deserializerInstance == null)
throw new InvalidOperationException($"Failed to create instance of {typeof(AvroDeserializer<>)} with generic argument {type}");
return deserializerInstance.AsSyncOverAsync();
}
The text was updated successfully, but these errors were encountered:
Description
I am using one consumer to consume multiple topics with different avro schemas. I created a deserializer, but for the lack of non-generic IDeserializer, I had to resort to all kind of weird hacks.
Non-generic I(Async)Deserializer would reduce this code
to this code
The text was updated successfully, but these errors were encountered: