Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide non-generic ISerializer, IAsyncSerializer, IDeserializer, IAsyncDeserializer #2186

Open
horato opened this issue Feb 6, 2024 · 0 comments

Comments

@horato
Copy link

horato commented Feb 6, 2024

Description

I am using one consumer to consume multiple topics with different avro schemas. I created a deserializer, but for the lack of non-generic IDeserializer, I had to resort to all kind of weird hacks.

Non-generic I(Async)Deserializer would reduce this code

public class TopicSpecificAvroDeserializer : IDeserializer<ISpecificRecord>
{
	private readonly ConcurrentDictionary<string, object> _deserializers = new ConcurrentDictionary<string, object>();
	private readonly ISchemaRegistryClient _schemaRegistryClient;

	private static readonly IDictionary<string, Type> Configuration = new Dictionary<string, Type>
	{
		{ "topic.A", typeof(TopicA) },
		{ "topic.B", typeof(TopicB) },
	};

	private delegate object DeserializeDelegate(ReadOnlySpan<byte> data, bool isNull, SerializationContext context);

	public TopicSpecificAvroDeserializer(ISchemaRegistryClient schemaRegistryClient)
	{
		_schemaRegistryClient = schemaRegistryClient ?? throw new ArgumentNullException(nameof(schemaRegistryClient));
	}

	public ISpecificRecord Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
	{
		var deserializer = _deserializers.GetOrAdd(context.Topic, CreateDeserializer);
		return CallDeserialize(deserializer, data, isNull, context);
	}
	
	private ISpecificRecord CallDeserialize(object deserializer, ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
	{
		var instance = Expression.Constant(deserializer);
		var dataParameter = Expression.Parameter(typeof(ReadOnlySpan<byte>), "data");
		var isNullParameter = Expression.Parameter(typeof(bool), "isNull");
		var contextParameter = Expression.Parameter(typeof(SerializationContext), "context");

		var method = deserializer.GetType().GetMethod(nameof(IDeserializer<object>.Deserialize));
		var call = Expression.Call(instance, method, dataParameter, isNullParameter, contextParameter);
		var expression = Expression.Lambda<DeserializeDelegate>(call, dataParameter, isNullParameter, contextParameter);
		var func = expression.Compile();
		return (ISpecificRecord)func(data, isNull, context);
	}

	private object CreateDeserializer(string topic)
	{
		if (!Configuration.TryGetValue(topic, out var type))
			throw new InvalidOperationException($"Missing message type for topic {topic}");

		var deserializerType = typeof(AvroDeserializer<>).MakeGenericType(type);
		var deserializerInstance = Activator.CreateInstance(deserializerType, _schemaRegistryClient, null);
		if (deserializerInstance == null)
			throw new InvalidOperationException($"Failed to create instance of {typeof(AvroDeserializer<>)} with generic argument {type}");

		var syncDeserializerType = typeof(SyncOverAsyncDeserializer<>).MakeGenericType(type);
		var syncDeserializerInstance = Activator.CreateInstance(syncDeserializerType, deserializerInstance);
		if (syncDeserializerInstance == null)
			throw new InvalidOperationException($"Failed to create instance of {typeof(SyncOverAsyncDeserializer<>)} with generic argument {type}");

		return syncDeserializerInstance;
	}

to this code

public class TopicSpecificAvroDeserializer : IDeserializer<ISpecificRecord>
{
	private readonly ConcurrentDictionary<string, IDeserializer> _deserializers = new ConcurrentDictionary<string, IDeserializer>();
	private readonly ISchemaRegistryClient _schemaRegistryClient;

	private static readonly IDictionary<string, Type> Configuration = new Dictionary<string, Type>
	{
		{ "topic.A", typeof(TopicA) },
		{ "topic.B", typeof(TopicB) },
	};
	
	public TopicSpecificAvroDeserializer(ISchemaRegistryClient schemaRegistryClient)
	{
		_schemaRegistryClient = schemaRegistryClient ?? throw new ArgumentNullException(nameof(schemaRegistryClient));
	}

	public ISpecificRecord Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
	{
		var deserializer = _deserializers.GetOrAdd(context.Topic, CreateDeserializer);
		return (ISpecificRecord)deserializer.Deserialize(data, isNull, context);
	}

	private IDeserializer CreateDeserializer(string topic)
	{
		if (!Configuration.TryGetValue(topic, out var type))
			throw new InvalidOperationException($"Missing message type for topic {topic}");

		var deserializerType = typeof(AvroDeserializer<>).MakeGenericType(type);
		var deserializerInstance = (IAsyncDeserializer)Activator.CreateInstance(deserializerType, _schemaRegistryClient, null);
		if (deserializerInstance == null)
			throw new InvalidOperationException($"Failed to create instance of {typeof(AvroDeserializer<>)} with generic argument {type}");

		return deserializerInstance.AsSyncOverAsync();
	}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant