-
Notifications
You must be signed in to change notification settings - Fork 11
/
builder.py
38 lines (32 loc) · 975 Bytes
/
builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from ..sink import SharedSinkProvider
from .sink import (
KafkaRouterSink,
KafkaSerializerSink,
KafkaTransportSink
)
from ..core import Scales
from ..loadbalancer import HeapBalancerSink
from ..resurrector import ResurrectorSink
class _KafkaIface(object):
def Put(self, topic, payloads=[], acks=1):
pass
class Kafka(object):
@staticmethod
def _get_sink_key(properties):
ep = properties['endpoint']
label = properties['label']
return ep.host, ep.port, label
@staticmethod
def NewBuilder():
return Scales.NewBuilder(_KafkaIface) \
.WithSink(KafkaRouterSink.Builder()) \
.WithSink(HeapBalancerSink.Builder()) \
.WithSink(KafkaSerializerSink.Builder()) \
.WithSink(SharedSinkProvider(Kafka._get_sink_key)) \
.WithSink(ResurrectorSink.Builder()) \
.WithSink(KafkaTransportSink.Builder())
@staticmethod
def NewClient(leaders):
return Kafka.NewBuilder() \
.SetUri(leaders) \
.Build()