Skip to content

Commit

Permalink
[fix][broker] Fix issue of field 'topic' is not set when handle GetSc…
Browse files Browse the repository at this point in the history
…hema request (#22377)

(cherry picked from commit d8903da)
  • Loading branch information
coderzc authored and lhotari committed Mar 28, 2024
1 parent 2e13fba commit 5a1fa0c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2389,9 +2389,10 @@ remoteAddress, new String(commandGetSchema.getSchemaVersion()),
schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
}

final String topic = commandGetSchema.getTopic();
String schemaName;
try {
schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName();
schemaName = TopicName.get(topic).getSchemaName();
} catch (Throwable t) {
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage());
return;
Expand All @@ -2400,7 +2401,7 @@ remoteAddress, new String(commandGetSchema.getSchemaVersion()),
schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
if (schemaAndMetadata == null) {
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound,
String.format("Topic not found or no-schema %s", commandGetSchema.getTopic()));
String.format("Topic not found or no-schema %s", topic));
} else {
commandSender.sendGetSchemaResponse(requestId,
SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.EqualsAndHashCode;
Expand All @@ -69,6 +70,8 @@
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
Expand Down Expand Up @@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
public void setup() throws Exception {
isTcpLookup = true;
super.internalSetup();

// Setup namespaces
Expand All @@ -106,6 +110,7 @@ public void setup() throws Exception {
.allowedClusters(Collections.singleton(CLUSTER_NAME))
.build();
admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
}

@AfterMethod(alwaysRun = true)
Expand All @@ -130,6 +135,34 @@ public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{
pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
}

@Test
public void testGetSchemaWithPatternTopic() throws Exception {
final String topicPrefix = "persistent://public/my-ns/test-getSchema";

int topicNums = 10;
for (int i = 0; i < topicNums; i++) {
String topic = topicPrefix + "-" + i;
admin.topics().createNonPartitionedTopic(topic);
}

Pattern pattern = Pattern.compile(topicPrefix + "-.*");
@Cleanup
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topicsPattern(pattern)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Shared)
.subscribe();

List<ConsumerImpl<GenericRecord>> consumers =
((MultiTopicsConsumerImpl<GenericRecord>) consumer).getConsumers();
Assert.assertEquals(topicNums, consumers.size());

for (int i = 0; i < topicNums; i++) {
String topic = topicPrefix + "-" + i;
admin.topics().delete(topic, true);
}
}

@Test
public void testMultiTopicSetSchemaProvider() throws Exception {
final String tenant = PUBLIC_TENANT;
Expand Down

0 comments on commit 5a1fa0c

Please sign in to comment.