Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][feature] New proxy to use the multi bundles feature (part1) #716

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

gaoran10
Copy link
Collaborator

@gaoran10 gaoran10 commented Nov 14, 2022

Motivation

Currently, the AoP can't leverage the multi bundles feature, exchanges and queues belonging to one AMQP virtual host are owned by one namespace bundle, they must work on one broker. So the broker can't scale up simply to support increasing exchanges and queues, if users want to use all brokers of a cluster, they need to use different virtual hosts for different exchanges and queues, it's very hard to adjust existing applications and it's not friendly to use.

Modifications

  • support exchange declare
  • support queue declare
  • support queue bind
  • support queue unbind
  • support basic publish
  • support basic consume

Verifying this change

Add new test.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (yes)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (yes)

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@github-actions github-actions bot added the doc-required This pr needs a document label Nov 14, 2022
Add a new proxy to use multi bundles feature.

- support exchange declare
- support queue declare
- support queue bind
- support queue unbind
- support basic publish
- support basic consume
bindingsJson = JSON_MAPPER.writeValueAsString(this.bindings);
} catch (JsonProcessingException e) {
log.error("Failed to bind queue {} to exchange {}", queue, exchangeName, e);
return CompletableFuture.failedFuture(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This failedFuture method was introduced in jdk9, we should support jdk8.

bindingsJson = JSON_MAPPER.writeValueAsString(this.bindings);
} catch (JsonProcessingException e) {
log.error("Failed to unbind queue {} to exchange {}", queue, exchangeName, e);
return CompletableFuture.failedFuture(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support jdk8.

try {
client = pulsar.getClient();
} catch (PulsarServerException e) {
return CompletableFuture.failedFuture(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support jdk8.

data = new byte[message.getDataBuffer().readableBytes()];
message.getDataBuffer().readBytes(data);
for (Destination des : destinations) {
futures.add(sendMessage(des, data, props));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might loop.

public void deleteFailed(ManagedLedgerException exception, Object ctx) {
log.error("{} Failed to delete message at {}", exchange.getName(), ctx, exception);
}
}, entry.getPosition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entry has been released, and the 190 line position variable should be used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice cache, fixed

log.error("Deserialize entry dataBuffer failed. exchangeName: {}, skip it first.",
exchange.getName(), e);
PENDING_SIZE_UPDATER.decrementAndGet(this);
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entry is not released.

}

@Override
public void receiveExchangeDeleteOk() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client needs to be notified of the result.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will be supported later.


@Override
public void receiveQueueDeleteOk(long messageCount) {
// nothing to do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client needs to be notified of the result.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will be supported later.

})
.thenAccept(position -> {
if (log.isDebugEnabled()) {
log.debug("Publish message success, position {}", position.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open Publish confirm, BasicAckBody not handled.

@gaoran10 gaoran10 changed the title [feature] New proxy to use the multi bundles feature [feature] New proxy to use the multi bundles feature (part1) Nov 21, 2022
@gaoran10 gaoran10 changed the title [feature] New proxy to use the multi bundles feature (part1) [WIP][feature] New proxy to use the multi bundles feature (part1) Dec 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-required This pr needs a document
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants