Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[changelog] Let bootstrapping changelog consumer consumes after-image #966

Merged
merged 15 commits into from
Apr 30, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ public ChangelogClientConfig setRocksDBBlockCacheSizeInBytes(long rocksDBBlockCa
return this;
}

public ChangelogClientConfig setSpecificValue(Class<T> specificValue) {
this.innerClientConfig.setSpecificValueClass(specificValue);
return this;
}

public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(ChangelogClientConfig<V> config) {
ChangelogClientConfig<V> newConfig = new ChangelogClientConfig<V>().setStoreName(config.getStoreName())
.setLocalD2ZkHosts(config.getLocalD2ZkHosts())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.views.ChangeCaptureView;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -63,7 +63,7 @@
import org.apache.logging.log4j.Logger;


class InternalLocalBootstrappingVeniceChangelogConsumer<K, V> extends VeniceChangelogConsumerImpl<K, V>
class InternalLocalBootstrappingVeniceChangelogConsumer<K, V> extends VeniceAfterImageConsumerImpl<K, V>
implements BootstrappingVeniceChangelogConsumer<K, V> {
private static final Logger LOGGER = LogManager.getLogger(InternalLocalBootstrappingVeniceChangelogConsumer.class);
private static final String CHANGE_CAPTURE_COORDINATE = "ChangeCaptureCoordinatePosition";
Expand Down Expand Up @@ -229,7 +229,7 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
.getByKeyPrefix(state.getKey(), null, new BytesStreamingCallback() {
@Override
public void onRecordReceived(byte[] key, byte[] value) {
onRecordReceivedForStorage(key, value, state.getKey(), resultSet);
onRecordReceivedFromStorage(key, value, state.getKey(), resultSet);
}

@Override
Expand Down Expand Up @@ -283,7 +283,7 @@ private void syncOffset(int partitionId, BootstrapState bootstrapState) {
}

@VisibleForTesting
void onRecordReceivedForStorage(
void onRecordReceivedFromStorage(
byte[] key,
byte[] value,
int partition,
Expand All @@ -293,7 +293,6 @@ void onRecordReceivedForStorage(
// a user
// schema for deserialization
ValueRecord valueRecord = ValueRecord.parseAndCreate(value);

// Create a change event to wrap the record we pulled from disk and deserialize the record
ChangeEvent<V> changeEvent = new ChangeEvent<>(
null,
Expand Down Expand Up @@ -340,18 +339,15 @@ VeniceConcurrentHashMap<Integer, BootstrapState> getBootstrapStateMap() {
}

/**
* Polls change capture client and persist the results to local disk. Also updates the bootstrapStateMap with latest offsets
* and if the client has caught up or not.
* Polls change capture client and persist the results to local disk. Also updates the bootstrapStateMap with latest
* offsets and if the client has caught up or not.
*
* @param timeoutInMs timeout on Poll
* @param topicSuffix internal topic suffix
* @return
*/
private Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pollAndCatchup(
long timeoutInMs,
String topicSuffix) {
private void pollAndCatchup(long timeoutInMs, String topicSuffix) {
Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> polledResults =
super.internalPoll(timeoutInMs, topicSuffix);
super.internalPoll(timeoutInMs, topicSuffix, true);
for (PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> record: polledResults) {
BootstrapState currentPartitionState = bootstrapStateMap.get(record.getPartition());
currentPartitionState.currentPubSubPosition = record.getOffset();
Expand All @@ -365,7 +361,6 @@ private Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pol
}
}
}
return polledResults;
}

@Override
Expand All @@ -376,7 +371,7 @@ protected <T> T processRecordBytes(
ByteBuffer value,
PubSubTopicPartition partition,
int readerSchemaId,
long recordOffset) throws IOException {
long recordOffset) {
if (deserializedValue instanceof RecordChangeEvent) {
RecordChangeEvent recordChangeEvent = (RecordChangeEvent) deserializedValue;
if (recordChangeEvent.currentValue == null) {
Expand All @@ -391,11 +386,9 @@ protected <T> T processRecordBytes(
.serialize());
}
} else {
byte[] valueBytes = ByteUtils.extractByteArray(decompressedBytes);
storageService.getStorageEngine(localStateTopicName)
.put(
partition.getPartitionNumber(),
key,
ValueRecord.create(readerSchemaId, decompressedBytes.array()).serialize());
.put(partition.getPartitionNumber(), key, ValueRecord.create(readerSchemaId, valueBytes).serialize());
}

// Update currentPubSubPosition for a partition
Expand Down Expand Up @@ -436,7 +429,10 @@ public CompletableFuture<Void> seekWithBootStrap(Set<Integer> partitions) {
VeniceChangeCoordinate localCheckpoint;
try {
if (StringUtils.isEmpty(offsetString)) {
LOGGER.info("No local checkpoint found for partition: {}", partition);
LOGGER.info(
"No local checkpoint found for partition: {}, will initialize checkpoint to offset: {}",
partition,
offsetRecord.getLocalVersionTopicOffset());
localCheckpoint = new VeniceChangeCoordinate(
getTopicPartition(partition).getPubSubTopic().getName(),
new ApacheKafkaOffsetPosition(offsetRecord.getLocalVersionTopicOffset()),
Expand Down Expand Up @@ -470,15 +466,25 @@ public CompletableFuture<Void> seekWithBootStrap(Set<Integer> partitions) {
}
}

// Seek to the current position so we can catch up from there to target
seekToCheckpoint(
bootstrapStateMap.values().stream().map(state -> state.currentPubSubPosition).collect(Collectors.toSet()));
// Seek to the current position, so we can catch up from there to target
try {
seekToCheckpoint(
bootstrapStateMap.values().stream().map(state -> state.currentPubSubPosition).collect(Collectors.toSet()))
.get();
} catch (Exception e) {
throw new VeniceException("Caught exception when seeking to bootstrap", e);
}

// Poll until we've caught up completely for all subscribed partitions.
while (bootstrapStateMap.entrySet()
.stream()
.anyMatch(s -> s.getValue().bootstrapState.equals(PollState.CATCHING_UP))) {
pollAndCatchup(5000L, ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX);
/**
* TODO: For now we change to support after-image only use case for bootstrapping changelog consumer.
* We will subscribe to version topic for now. If there is support for different use case in the future, we need
* to further tweak it based on config.
*/
pollAndCatchup(5000L, "");
sixpluszero marked this conversation as resolved.
Show resolved Hide resolved
}

LOGGER.info("Bootstrap completed!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,11 @@ protected VeniceChangeCoordinate(String topic, PubSubPosition pubSubPosition, In
public static String convertVeniceChangeCoordinateToStringAndEncode(VeniceChangeCoordinate veniceChangeCoordinate)
throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
try {
try (ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) {
veniceChangeCoordinate.writeExternal(outputStream);
outputStream.flush();
byte[] data = byteArrayOutputStream.toByteArray();
return Base64.getEncoder().encodeToString(data);
} finally {
outputStream.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,12 @@ public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangel
*/
public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangelogConsumer(
String storeName,
String consumerId) {
String consumerId,
Class clazz) {
return (BootstrappingVeniceChangelogConsumer<K, V>) storeClientMap
.computeIfAbsent(suffixConsumerIdToStore(storeName, consumerId), name -> {
ChangelogClientConfig newStoreChangelogClientConfig = getNewStoreChangelogClientConfig(storeName);
ChangelogClientConfig newStoreChangelogClientConfig =
getNewStoreChangelogClientConfig(storeName).setSpecificValue(clazz);
String viewClass = getViewClass(newStoreChangelogClientConfig, storeName);
String consumerName =
suffixConsumerIdToStore(storeName + "-" + viewClass.getClass().getSimpleName(), consumerId);
Expand All @@ -116,6 +118,12 @@ public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangel
});
}

public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangelogConsumer(
String storeName,
String consumerId) {
return getBootstrappingChangelogConsumer(storeName, consumerId, null);
}

private ChangelogClientConfig getNewStoreChangelogClientConfig(String storeName) {
ChangelogClientConfig newStoreChangelogClientConfig =
ChangelogClientConfig.cloneConfig(globalChangelogClientConfig).setStoreName(storeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsume

protected ThinClientMetaStoreBasedRepository storeRepository;

protected final AbstractAvroChunkingAdapter<RecordChangeEvent> recordChangeEventChunkingAdapter =
new SpecificRecordChunkingAdapter<>();

protected final AbstractAvroChunkingAdapter<V> userEventChunkingAdapter;

protected final SchemaReader schemaReader;
Expand Down Expand Up @@ -356,11 +353,9 @@ public CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> chec

private void pubSubConsumerSeek(PubSubTopicPartition topicPartition, Long offset) {
// Offset the seek to next operation inside venice pub sub consumer adapter subscription logic.
if (offset == OffsetRecord.LOWEST_OFFSET) {
pubSubConsumer.subscribe(topicPartition, OffsetRecord.LOWEST_OFFSET);
} else {
pubSubConsumer.subscribe(topicPartition, offset - 1);
}
long targetOffset = offset == OffsetRecord.LOWEST_OFFSET ? OffsetRecord.LOWEST_OFFSET : offset - 1;
pubSubConsumer.subscribe(topicPartition, targetOffset);
LOGGER.info("Topic partition: {} consumer seek to offset: {}", topicPartition, targetOffset);
}

@Override
Expand Down Expand Up @@ -487,7 +482,8 @@ public Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll

protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> internalPoll(
long timeoutInMs,
String topicSuffix) {
String topicSuffix,
boolean includeControlMessage) {
List<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pubSubMessages = new ArrayList<>();
Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> messagesMap;
synchronized (pubSubConsumer) {
Expand All @@ -503,6 +499,18 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
if (handleControlMessage(controlMessage, pubSubTopicPartition, topicSuffix)) {
break;
}
if (includeControlMessage) {
pubSubMessages.add(
new ImmutableChangeCapturePubSubMessage<>(
null,
null,
message.getTopicPartition(),
message.getOffset(),
0,
0,
false));
}

} else {
Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pubSubMessage =
convertPubSubMessageToPubSubChangeEventMessage(message, pubSubTopicPartition);
Expand All @@ -513,6 +521,12 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
return pubSubMessages;
}

protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> internalPoll(
long timeoutInMs,
String topicSuffix) {
return internalPoll(timeoutInMs, topicSuffix, false);
}

/**
* Handle control message from the given topic. Returns true if a topic switch should occur and records should be returned
*
Expand Down Expand Up @@ -625,7 +639,6 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
// it's waiting for more input. In this case, just return an empty optional for now.
return Optional.empty();
}

try {
assembledObject = processRecordBytes(
compressor.decompress(put.getPutValue()),
Expand All @@ -638,7 +651,6 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
} catch (Exception ex) {
throw new VeniceException(ex);
}

if (assembledObject instanceof RecordChangeEvent) {
recordChangeEvent = (RecordChangeEvent) assembledObject;
replicationCheckpoint = recordChangeEvent.replicationCheckpointVector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ public void testStart() throws ExecutionException, InterruptedException {
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1));
PubSubTopic changeCaptureTopic =
pubSubTopicRepository.getTopic(versionTopic.getName() + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX);
PubSubTopicPartition topicPartition_0 = new PubSubTopicPartitionImpl(changeCaptureTopic, 0);
PubSubTopicPartition topicPartition_1 = new PubSubTopicPartitionImpl(changeCaptureTopic, 1);
PubSubTopicPartition topicPartition_0 = new PubSubTopicPartitionImpl(versionTopic, 0);
PubSubTopicPartition topicPartition_1 = new PubSubTopicPartitionImpl(versionTopic, 1);
Set<PubSubTopicPartition> assignments = ImmutableSet.of(topicPartition_0, topicPartition_1);
doReturn(assignments).when(pubSubConsumer).getAssignment();
doReturn(0L).when(pubSubConsumer).getLatestOffset(topicPartition_0);
Expand Down Expand Up @@ -251,7 +251,7 @@ public void testStart() throws ExecutionException, InterruptedException {
ValueBytes valueBytes = new ValueBytes();
valueBytes.schemaId = TEST_SCHEMA_ID;
valueBytes.value = ByteBuffer.wrap(valueSerializer.serialize(TEST_NEW_VALUE_1));
bootstrappingVeniceChangelogConsumer.onRecordReceivedForStorage(
bootstrappingVeniceChangelogConsumer.onRecordReceivedFromStorage(
testRecord.getKey().getKey(),
ValueRecord.create(TEST_SCHEMA_ID, valueBytes.value.array()).serialize(),
0,
Expand Down Expand Up @@ -282,7 +282,7 @@ public void testStart() throws ExecutionException, InterruptedException {
valueBytes.schemaId = TEST_SCHEMA_ID;
valueBytes.value = ByteBuffer.wrap(valueSerializer.serialize(TEST_NEW_VALUE_2));

bootstrappingVeniceChangelogConsumer.onRecordReceivedForStorage(
bootstrappingVeniceChangelogConsumer.onRecordReceivedFromStorage(
testRecord.getKey().getKey(),
ValueRecord.create(TEST_SCHEMA_ID, valueBytes.value.array()).serialize(),
1,
Expand Down
1 change: 1 addition & 0 deletions gradle/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@
<Class name="com.linkedin.venice.fastclient.schema.TestValueSchema"/>
<Class name="com.linkedin.venice.utils.TestMockTime"/>
<Class name="com.linkedin.davinci.ingestion.IsolatedIngestionBackend"/>
<Class name="com.linkedin.venice.endToEnd.TestChangelogValue"/>
</Or>
</Match>
<Match>
Expand Down
4 changes: 3 additions & 1 deletion internal/venice-test-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ def integrationTestBuckets = [
"C": [
"com.linkedin.venice.endToEnd.ActiveActive*"],
"D": [
"com.linkedin.venice.endToEnd.TestActiveActive*"],
"com.linkedin.venice.endToEnd.TestActiveActive*",
"com.linkedin.venice.endToEnd.TestChangelogConsumer",
"com.linkedin.venice.endToEnd.TestBootstrappingChangelogConsumer"],
"E": [
"com.linkedin.venice.helix.*",
"com.linkedin.venice.helixrebalance.*"],
Expand Down