Skip to content

Commit

Permalink
[changelog] Let bootstrapping changelog consumer consumes after-image (
Browse files Browse the repository at this point in the history
…#966)

Make bootstrapping consumer also consume after-image data from version topic, instead of before-after image data from cc topic. Fixed a few small bugs during the journey, and clean up the integration tests.
Bugs:

seekToChekpoint needs to be blocked by get(), otherwise consumer will jump to non-deterministic offsets and yield wrong results.
when persisting after-image values to local storage, we should extra bytes from ByteBuffer, instead of converting to array directly.
  • Loading branch information
sixpluszero committed Apr 30, 2024
1 parent 1bd3b09 commit eade77a
Show file tree
Hide file tree
Showing 15 changed files with 937 additions and 1,394 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ public ChangelogClientConfig setRocksDBBlockCacheSizeInBytes(long rocksDBBlockCa
return this;
}

public ChangelogClientConfig setSpecificValue(Class<T> specificValue) {
this.innerClientConfig.setSpecificValueClass(specificValue);
return this;
}

public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(ChangelogClientConfig<V> config) {
ChangelogClientConfig<V> newConfig = new ChangelogClientConfig<V>().setStoreName(config.getStoreName())
.setLocalD2ZkHosts(config.getLocalD2ZkHosts())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.views.ChangeCaptureView;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -63,7 +63,7 @@
import org.apache.logging.log4j.Logger;


class InternalLocalBootstrappingVeniceChangelogConsumer<K, V> extends VeniceChangelogConsumerImpl<K, V>
class InternalLocalBootstrappingVeniceChangelogConsumer<K, V> extends VeniceAfterImageConsumerImpl<K, V>
implements BootstrappingVeniceChangelogConsumer<K, V> {
private static final Logger LOGGER = LogManager.getLogger(InternalLocalBootstrappingVeniceChangelogConsumer.class);
private static final String CHANGE_CAPTURE_COORDINATE = "ChangeCaptureCoordinatePosition";
Expand Down Expand Up @@ -229,7 +229,7 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
.getByKeyPrefix(state.getKey(), null, new BytesStreamingCallback() {
@Override
public void onRecordReceived(byte[] key, byte[] value) {
onRecordReceivedForStorage(key, value, state.getKey(), resultSet);
onRecordReceivedFromStorage(key, value, state.getKey(), resultSet);
}

@Override
Expand Down Expand Up @@ -283,7 +283,7 @@ private void syncOffset(int partitionId, BootstrapState bootstrapState) {
}

@VisibleForTesting
void onRecordReceivedForStorage(
void onRecordReceivedFromStorage(
byte[] key,
byte[] value,
int partition,
Expand All @@ -293,7 +293,6 @@ void onRecordReceivedForStorage(
// a user
// schema for deserialization
ValueRecord valueRecord = ValueRecord.parseAndCreate(value);

// Create a change event to wrap the record we pulled from disk and deserialize the record
ChangeEvent<V> changeEvent = new ChangeEvent<>(
null,
Expand Down Expand Up @@ -340,18 +339,15 @@ VeniceConcurrentHashMap<Integer, BootstrapState> getBootstrapStateMap() {
}

/**
* Polls change capture client and persist the results to local disk. Also updates the bootstrapStateMap with latest offsets
* and if the client has caught up or not.
* Polls change capture client and persist the results to local disk. Also updates the bootstrapStateMap with latest
* offsets and if the client has caught up or not.
*
* @param timeoutInMs timeout on Poll
* @param topicSuffix internal topic suffix
* @return
*/
private Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pollAndCatchup(
long timeoutInMs,
String topicSuffix) {
private void pollAndCatchup(long timeoutInMs, String topicSuffix) {
Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> polledResults =
super.internalPoll(timeoutInMs, topicSuffix);
super.internalPoll(timeoutInMs, topicSuffix, true);
for (PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> record: polledResults) {
BootstrapState currentPartitionState = bootstrapStateMap.get(record.getPartition());
currentPartitionState.currentPubSubPosition = record.getOffset();
Expand All @@ -365,7 +361,6 @@ private Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pol
}
}
}
return polledResults;
}

@Override
Expand All @@ -376,7 +371,7 @@ protected <T> T processRecordBytes(
ByteBuffer value,
PubSubTopicPartition partition,
int readerSchemaId,
long recordOffset) throws IOException {
long recordOffset) {
if (deserializedValue instanceof RecordChangeEvent) {
RecordChangeEvent recordChangeEvent = (RecordChangeEvent) deserializedValue;
if (recordChangeEvent.currentValue == null) {
Expand All @@ -391,11 +386,9 @@ protected <T> T processRecordBytes(
.serialize());
}
} else {
byte[] valueBytes = ByteUtils.extractByteArray(decompressedBytes);
storageService.getStorageEngine(localStateTopicName)
.put(
partition.getPartitionNumber(),
key,
ValueRecord.create(readerSchemaId, decompressedBytes.array()).serialize());
.put(partition.getPartitionNumber(), key, ValueRecord.create(readerSchemaId, valueBytes).serialize());
}

// Update currentPubSubPosition for a partition
Expand Down Expand Up @@ -436,7 +429,10 @@ public CompletableFuture<Void> seekWithBootStrap(Set<Integer> partitions) {
VeniceChangeCoordinate localCheckpoint;
try {
if (StringUtils.isEmpty(offsetString)) {
LOGGER.info("No local checkpoint found for partition: {}", partition);
LOGGER.info(
"No local checkpoint found for partition: {}, will initialize checkpoint to offset: {}",
partition,
offsetRecord.getLocalVersionTopicOffset());
localCheckpoint = new VeniceChangeCoordinate(
getTopicPartition(partition).getPubSubTopic().getName(),
new ApacheKafkaOffsetPosition(offsetRecord.getLocalVersionTopicOffset()),
Expand Down Expand Up @@ -470,15 +466,25 @@ public CompletableFuture<Void> seekWithBootStrap(Set<Integer> partitions) {
}
}

// Seek to the current position so we can catch up from there to target
seekToCheckpoint(
bootstrapStateMap.values().stream().map(state -> state.currentPubSubPosition).collect(Collectors.toSet()));
// Seek to the current position, so we can catch up from there to target
try {
seekToCheckpoint(
bootstrapStateMap.values().stream().map(state -> state.currentPubSubPosition).collect(Collectors.toSet()))
.get();
} catch (Exception e) {
throw new VeniceException("Caught exception when seeking to bootstrap", e);
}

// Poll until we've caught up completely for all subscribed partitions.
while (bootstrapStateMap.entrySet()
.stream()
.anyMatch(s -> s.getValue().bootstrapState.equals(PollState.CATCHING_UP))) {
pollAndCatchup(5000L, ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX);
/**
* TODO: For now we change to support after-image only use case for bootstrapping changelog consumer.
* We will subscribe to version topic for now. If there is support for different use case in the future, we need
* to further tweak it based on config.
*/
pollAndCatchup(5000L, "");
}

LOGGER.info("Bootstrap completed!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,11 @@ protected VeniceChangeCoordinate(String topic, PubSubPosition pubSubPosition, In
public static String convertVeniceChangeCoordinateToStringAndEncode(VeniceChangeCoordinate veniceChangeCoordinate)
throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
try {
try (ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) {
veniceChangeCoordinate.writeExternal(outputStream);
outputStream.flush();
byte[] data = byteArrayOutputStream.toByteArray();
return Base64.getEncoder().encodeToString(data);
} finally {
outputStream.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangel
*/
public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangelogConsumer(
String storeName,
String consumerId) {
String consumerId,
Class clazz) {
return (BootstrappingVeniceChangelogConsumer<K, V>) storeClientMap
.computeIfAbsent(suffixConsumerIdToStore(storeName, consumerId), name -> {
ChangelogClientConfig newStoreChangelogClientConfig = getNewStoreChangelogClientConfig(storeName);
ChangelogClientConfig newStoreChangelogClientConfig =
getNewStoreChangelogClientConfig(storeName).setSpecificValue(clazz);
String viewClass = getViewClass(newStoreChangelogClientConfig, storeName);
String consumerName =
suffixConsumerIdToStore(storeName + "-" + viewClass.getClass().getSimpleName(), consumerId);
Expand All @@ -117,6 +119,12 @@ public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangel
});
}

public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangelogConsumer(
String storeName,
String consumerId) {
return getBootstrappingChangelogConsumer(storeName, consumerId, null);
}

private ChangelogClientConfig getNewStoreChangelogClientConfig(String storeName) {
ChangelogClientConfig newStoreChangelogClientConfig =
ChangelogClientConfig.cloneConfig(globalChangelogClientConfig).setStoreName(storeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,9 @@ public CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> chec

private void pubSubConsumerSeek(PubSubTopicPartition topicPartition, Long offset) {
// Offset the seek to next operation inside venice pub sub consumer adapter subscription logic.
if (offset == OffsetRecord.LOWEST_OFFSET) {
pubSubConsumer.subscribe(topicPartition, OffsetRecord.LOWEST_OFFSET);
} else {
pubSubConsumer.subscribe(topicPartition, offset - 1);
}
long targetOffset = offset == OffsetRecord.LOWEST_OFFSET ? OffsetRecord.LOWEST_OFFSET : offset - 1;
pubSubConsumer.subscribe(topicPartition, targetOffset);
LOGGER.info("Topic partition: {} consumer seek to offset: {}", topicPartition, targetOffset);
}

@Override
Expand Down Expand Up @@ -507,7 +505,8 @@ public Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll

protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> internalPoll(
long timeoutInMs,
String topicSuffix) {
String topicSuffix,
boolean includeControlMessage) {
List<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pubSubMessages = new ArrayList<>();
Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> messagesMap;
synchronized (pubSubConsumer) {
Expand All @@ -528,6 +527,18 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
message.getValue().getProducerMetadata().getMessageTimestamp())) {
break;
}
if (includeControlMessage) {
pubSubMessages.add(
new ImmutableChangeCapturePubSubMessage<>(
null,
null,
message.getTopicPartition(),
message.getOffset(),
0,
0,
false));
}

} else {
Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pubSubMessage =
convertPubSubMessageToPubSubChangeEventMessage(message, pubSubTopicPartition);
Expand All @@ -541,6 +552,12 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
return pubSubMessages;
}

protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> internalPoll(
long timeoutInMs,
String topicSuffix) {
return internalPoll(timeoutInMs, topicSuffix, false);
}

/**
* Handle control message from the given topic. Returns true if a topic switch should occur and records should be returned
*
Expand Down Expand Up @@ -659,7 +676,6 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
// it's waiting for more input. In this case, just return an empty optional for now.
return Optional.empty();
}

try {
assembledObject = processRecordBytes(
compressor.decompress(put.getPutValue()),
Expand All @@ -672,7 +688,6 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
} catch (Exception ex) {
throw new VeniceException(ex);
}

if (assembledObject instanceof RecordChangeEvent) {
recordChangeEvent = (RecordChangeEvent) assembledObject;
replicationCheckpoint = recordChangeEvent.replicationCheckpointVector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ public void testStart() throws ExecutionException, InterruptedException {
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1));
PubSubTopic changeCaptureTopic =
pubSubTopicRepository.getTopic(versionTopic.getName() + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX);
PubSubTopicPartition topicPartition_0 = new PubSubTopicPartitionImpl(changeCaptureTopic, 0);
PubSubTopicPartition topicPartition_1 = new PubSubTopicPartitionImpl(changeCaptureTopic, 1);
PubSubTopicPartition topicPartition_0 = new PubSubTopicPartitionImpl(versionTopic, 0);
PubSubTopicPartition topicPartition_1 = new PubSubTopicPartitionImpl(versionTopic, 1);
Set<PubSubTopicPartition> assignments = ImmutableSet.of(topicPartition_0, topicPartition_1);
doReturn(assignments).when(pubSubConsumer).getAssignment();
doReturn(0L).when(pubSubConsumer).getLatestOffset(topicPartition_0);
Expand Down Expand Up @@ -251,7 +251,7 @@ public void testStart() throws ExecutionException, InterruptedException {
ValueBytes valueBytes = new ValueBytes();
valueBytes.schemaId = TEST_SCHEMA_ID;
valueBytes.value = ByteBuffer.wrap(valueSerializer.serialize(TEST_NEW_VALUE_1));
bootstrappingVeniceChangelogConsumer.onRecordReceivedForStorage(
bootstrappingVeniceChangelogConsumer.onRecordReceivedFromStorage(
testRecord.getKey().getKey(),
ValueRecord.create(TEST_SCHEMA_ID, valueBytes.value.array()).serialize(),
0,
Expand Down Expand Up @@ -282,7 +282,7 @@ public void testStart() throws ExecutionException, InterruptedException {
valueBytes.schemaId = TEST_SCHEMA_ID;
valueBytes.value = ByteBuffer.wrap(valueSerializer.serialize(TEST_NEW_VALUE_2));

bootstrappingVeniceChangelogConsumer.onRecordReceivedForStorage(
bootstrappingVeniceChangelogConsumer.onRecordReceivedFromStorage(
testRecord.getKey().getKey(),
ValueRecord.create(TEST_SCHEMA_ID, valueBytes.value.array()).serialize(),
1,
Expand Down
1 change: 1 addition & 0 deletions gradle/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@
<Class name="com.linkedin.venice.fastclient.schema.TestValueSchema"/>
<Class name="com.linkedin.venice.utils.TestMockTime"/>
<Class name="com.linkedin.davinci.ingestion.IsolatedIngestionBackend"/>
<Class name="com.linkedin.venice.endToEnd.TestChangelogValue"/>
</Or>
</Match>
<Match>
Expand Down
4 changes: 3 additions & 1 deletion internal/venice-test-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ def integrationTestBuckets = [
"C": [
"com.linkedin.venice.endToEnd.ActiveActive*"],
"D": [
"com.linkedin.venice.endToEnd.TestActiveActive*"],
"com.linkedin.venice.endToEnd.TestActiveActive*",
"com.linkedin.venice.endToEnd.TestChangelogConsumer",
"com.linkedin.venice.endToEnd.TestBootstrappingChangelogConsumer"],
"E": [
"com.linkedin.venice.helix.*",
"com.linkedin.venice.helixrebalance.*"],
Expand Down

0 comments on commit eade77a

Please sign in to comment.