Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Apache Pulsar Connector #170

Open
tspannhw opened this issue Nov 17, 2022 · 3 comments
Open

[FEATURE] Apache Pulsar Connector #170

tspannhw opened this issue Nov 17, 2022 · 3 comments
Labels

Comments

@tspannhw
Copy link

Is your feature request related to a problem? Please describe.
Add Pulsar as a Source. All the feature of Kafka. Pulsar also supports AMQP, MQTT, WebSockets, and Kafka as protocols.

Describe the solution you'd like
Add Pulsar the same as Kafka

Describe alternatives you've considered
We can use Pulsar's other protocols, but native is the fastest and easiest.

Additional context
Contact me for assistance. https://github.com/tspannhw/SpeakerProfile

@github-actions github-actions bot added the needs triage New issue, needs triage label Nov 17, 2022
@awmatheson awmatheson added the type:feature New feature request label Nov 28, 2022
@awmatheson
Copy link
Contributor

awmatheson commented Dec 9, 2022

👋 @tspannhw. Thanks for opening this! An apache pulsar connector would be great eventually. We will have some better examples on how to add connectors to bytewax in the future. For now, people getting started can use the Kafka protocol to take advantage of the KafkaInput, I made an example with this working here. However, it is important to note that some functionality will potentially not work out of the box, like recovery for instance

You can also use the Pulsar Python client to connect directly in the input or output code. For the input, state recovery and managing offsets would have to be managed in the input code.

@awmatheson awmatheson added solution provided and removed needs triage New issue, needs triage labels Dec 9, 2022
@spacid
Copy link

spacid commented Feb 28, 2024

Hello, we're thinking about using a Stream Processing Framework like Bytewax. However, we would need to connect to Apache Pulsar. Is there, by any chance, an update about the implementation of a native Apache Pulsar connector and/or documentation on how to implement (such) a connector?

@jhanninen
Copy link
Contributor

If someone needs inspiration how to implement an own customised connector for Pulsar, here's our PoC:
https://github.com/HSLdevcom/ajoaikadata/blob/main/src/connectors/pulsar.py

The strategy I used there was to create Bytewax messages as dicts, like this:

class BytewaxMsg(TypedDict):
    data: dict                    # the actual message content
    msgs: NotRequired[list[str]]  # the list of Pulsar msg ids for acking

Disclaimer:

  • This connector has not been used for a while, so it may be broken (last run with Bytewax 0.17, but code should be updated for 0.19).
  • It has only been quickly tested in local environment, so the implementation is not guaranteed to handle all messages correctly nor optimised performance-wise.
  • It contains some use-case-specific tricks, such as ability to filter messages or group multiple messages together and ack them once.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants