Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rule engine: Kafka consumer group per partition #10728

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void init() {
.queueKey(new QueueKey(ServiceType.TB_CORE))
.config(CoreQueueConfig.of(consumerPerPartition, (int) pollInterval))
.msgPackProcessor(this::processMsgs)
.consumerCreator(config -> queueFactory.createToCoreMsgConsumer())
.consumerCreator((config, partitionId) -> queueFactory.createToCoreMsgConsumer())
.consumerExecutor(consumersExecutor)
.scheduler(scheduler)
.taskExecutor(mgmtExecutor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

@Slf4j
Expand All @@ -51,7 +51,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
@Getter
protected C config;
protected final MsgPackProcessor<M, C> msgPackProcessor;
protected final Function<C, TbQueueConsumer<M>> consumerCreator;
protected final BiFunction<C, Integer, TbQueueConsumer<M>> consumerCreator;
protected final ExecutorService consumerExecutor;
protected final ScheduledExecutorService scheduler;
protected final ExecutorService taskExecutor;
Expand All @@ -67,7 +67,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
@Builder
public MainQueueConsumerManager(QueueKey queueKey, C config,
MsgPackProcessor<M, C> msgPackProcessor,
Function<C, TbQueueConsumer<M>> consumerCreator,
BiFunction<C, Integer, TbQueueConsumer<M>> consumerCreator,
ExecutorService consumerExecutor,
ScheduledExecutorService scheduler,
ExecutorService taskExecutor) {
Expand Down Expand Up @@ -273,8 +273,9 @@ public void updatePartitions(Set<TopicPartitionInfo> partitions) {
removedPartitions.forEach((tpi) -> consumers.remove(tpi).awaitCompletion());

addedPartitions.forEach((tpi) -> {
String key = queueKey + "-" + tpi.getPartition().orElse(-1);
TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, consumerCreator.apply(config));
Integer partitionId = tpi.getPartition().orElse(-1);
String key = queueKey + "-" + partitionId;
TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, () -> consumerCreator.apply(config, partitionId));
consumers.put(tpi, consumer);
consumer.subscribe(Set.of(tpi));
launchConsumer(consumer);
Expand Down Expand Up @@ -303,7 +304,7 @@ public void updatePartitions(Set<TopicPartitionInfo> partitions) {
}

if (consumer == null) {
consumer = new TbQueueConsumerTask<>(queueKey, consumerCreator.apply(config));
consumer = new TbQueueConsumerTask<>(queueKey, () -> consumerCreator.apply(config, null)); // no partitionId passed
}
consumer.subscribe(partitions);
if (!consumer.isRunning()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,57 @@
package org.thingsboard.server.service.queue.ruleengine;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@RequiredArgsConstructor
@Slf4j
public class TbQueueConsumerTask<M extends TbQueueMsg> {

@Getter
private final Object key;
@Getter
private final TbQueueConsumer<M> consumer;
private volatile TbQueueConsumer<M> consumer;
private volatile Supplier<TbQueueConsumer<M>> consumerSupplier;

@Setter
private Future<?> task;

public TbQueueConsumerTask(Object key, Supplier<TbQueueConsumer<M>> consumerSupplier) {
this.key = key;
this.consumer = null;
this.consumerSupplier = consumerSupplier;
}

public TbQueueConsumer<M> getConsumer() {
if (consumer == null) {
synchronized (this) {
if (consumer == null) {
Objects.requireNonNull(consumerSupplier, "consumerSupplier for key [" + key + "] is null");
consumer = consumerSupplier.get();
Objects.requireNonNull(consumer, "consumer for key [" + key + "] is null");
consumerSupplier = null;
}
}
}
return consumer;
}

public void subscribe(Set<TopicPartitionInfo> partitions) {
log.trace("[{}] Subscribing to partitions: {}", key, partitions);
consumer.subscribe(partitions);
getConsumer().subscribe(partitions);
}

public void initiateStop() {
log.debug("[{}] Initiating stop", key);
consumer.stop();
getConsumer().stop();
}

public void awaitCompletion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils;
Expand Down Expand Up @@ -55,6 +55,8 @@
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.provider.KafkaMonolithQueueFactory;
import org.thingsboard.server.queue.provider.KafkaTbRuleEngineQueueFactory;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory;
Expand All @@ -79,10 +81,12 @@
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willCallRealMethod;
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
Expand Down Expand Up @@ -114,6 +118,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
private PartitionService partitionService;
@Mock
private TbQueueProducerProvider producerProvider;
@Mock
private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
@Mock
private TbQueueAdmin queueAdmin;
Expand Down Expand Up @@ -148,7 +153,7 @@ actorContext, statsFactory, spy(new TbRuleEngineSubmitStrategyFactory()),
log.trace("totalProcessedMsgs = {}", totalProcessedMsgs);
return null;
}).when(actorContext).tell(any());
ruleEngineMsgProducer = mock(TbQueueProducer.class);

when(producerProvider.getRuleEngineMsgProducer()).thenReturn(ruleEngineMsgProducer);
consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer"));
mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(3, "tb-rule-engine-mgmt");
Expand Down Expand Up @@ -180,7 +185,7 @@ actorContext, statsFactory, spy(new TbRuleEngineSubmitStrategyFactory()),
}
consumers.add(consumer);
return consumer;
}).when(queueFactory).createToRuleEngineMsgConsumer(any());
}).when(queueFactory).createToRuleEngineMsgConsumer(any(), any());

QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue);
consumerManager = TbRuleEngineQueueConsumerManager.create()
Expand Down Expand Up @@ -210,6 +215,20 @@ public void afterEach() {
}
}

@ParameterizedTest
@ValueSource(classes = {KafkaMonolithQueueFactory.class, KafkaTbRuleEngineQueueFactory.class})
public void testUnsupported_createToRuleEngineMsgConsumer_KafkaTbRuleEngineQueueFactory(Class<TbRuleEngineQueueFactory> factoryClass) {
// obsolete, but need to pass the afterEach
queue.setConsumerPerPartition(false);
consumerManager.init(queue);

var factory = mock(factoryClass);
willCallRealMethod().given(factory).createToRuleEngineMsgConsumer(any());
assertThatThrownBy(() -> factory.createToRuleEngineMsgConsumer(mock(Queue.class)))
.isInstanceOf(UnsupportedOperationException.class);

}

@Test
public void testInit_consumerPerPartition() {
queue.setConsumerPerPartition(true);
Expand Down Expand Up @@ -247,7 +266,8 @@ public void testPartitionsUpdate_singleConsumer() {

Set<TopicPartitionInfo> partitions = Collections.emptySet();
consumerManager.update(partitions);
verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any());
verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any(), any());
verify(queueFactory, never()).createToRuleEngineMsgConsumer(any());

partitions = createTpis(1);
consumerManager.update(partitions);
Expand Down Expand Up @@ -278,7 +298,8 @@ public void testPartitionsUpdate_consumerPerPartition() {
consumerManager.init(queue);

consumerManager.update(Collections.emptySet());
verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any());
verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any(), any());
verify(queueFactory, never()).createToRuleEngineMsgConsumer(any());

consumerManager.update(createTpis(1));
TestConsumer consumer1 = getConsumer(1);
Expand Down Expand Up @@ -420,7 +441,8 @@ public void testStop() {
consumerManager.update(createTpis(1));
TestConsumer consumer = getConsumer(1);
verifySubscribedAndLaunched(consumer, 1);
verify(queueFactory, times(1)).createToRuleEngineMsgConsumer(any());
verify(queueFactory, times(1)).createToRuleEngineMsgConsumer(any(), any());
verify(queueFactory, never()).createToRuleEngineMsgConsumer(any());

consumerManager.stop();
consumerManager.update(createTpis(1, 2, 3, 4)); // to check that no new tasks after stop are processed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,17 @@ public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantI
public String buildTopicName(String topic) {
return prefix.isBlank() ? topic : prefix + "." + topic;
}
}

public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) {
return this.buildTopicName(
servicePrefix + queueName
+ (tenantId.isSysTenantId() ? "" : ("-isolated-" + tenantId))
+ "-consumer"
+ suffix(partitionId));
}

String suffix(Integer partitionId) {
return partitionId == null ? "" : "-" + partitionId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
*/
package org.thingsboard.server.queue.kafka;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.util.PropertyUtils;
Expand All @@ -27,6 +31,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Created by ashvayka on 24.09.18.
Expand Down Expand Up @@ -120,4 +126,53 @@ public CreateTopicsResult createTopic(NewTopic topic) {
public void destroy() {
}

/**
* Sync offsets from a fat group to a single-partition group
* Migration back from single-partition consumer to a fat group is not supported
* TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers
* */
public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) {
try {
syncOffsetsUnsafe(fatGroupId, newGroupId, partitionId);
} catch (Exception e) {
log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e);
}
}

void syncOffsetsUnsafe(String fatGroupId, String newGroupId, Integer partitionId) throws ExecutionException, InterruptedException, TimeoutException {
log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId);
if (partitionId == null) {
return;
}
Map<TopicPartition, OffsetAndMetadata> oldOffsets =
settings.getAdminClient().listConsumerGroupOffsets(fatGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
if (oldOffsets.isEmpty()) {
return;
}

for (var consumerOffset : oldOffsets.entrySet()) {
var tp = consumerOffset.getKey();
if (!tp.topic().endsWith("." + partitionId)) {
continue;
}
var om = consumerOffset.getValue();
Map<TopicPartition, OffsetAndMetadata> newOffsets =
settings.getAdminClient().listConsumerGroupOffsets(newGroupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);

var existingOffset = newOffsets.get(tp);
if (existingOffset == null) {
log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets);
} else if (existingOffset.offset() >= om.offset()) {
log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset());
break;
} else {
log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset());
}
settings.getAdminClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS);
log.info("[{}] altered new consumer groupId {}", tp, newGroupId);
break;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbKafkaConsumerStatsService consumerStatsService;

private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbKafkaAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorRequestAdmin;
private final TbQueueAdmin jsExecutorResponseAdmin;
private final TbQueueAdmin transportApiRequestAdmin;
Expand Down Expand Up @@ -187,18 +187,29 @@ public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMs

@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration) {
throw new UnsupportedOperationException("Rule engine consumer should use a partitionId");
}

@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) {
String queueName = configuration.getName();
String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId);

ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
groupId, partitionId);

TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(configuration.getTopic()));
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-isolated-" + configuration.getTenantId())) + "-consumer"));
consumerBuilder.groupId(groupId);
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build();
}


@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
Expand Down