Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ActiveRecord adapter #340

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open

Implement ActiveRecord adapter #340

wants to merge 3 commits into from

Conversation

intale
Copy link

@intale intale commented Jul 17, 2023

Changes

  • implement ActiveRecord adapter. It can be configured as follows
MessageBus.configure(backend: :active_record, pubsub_redis_url: 'redis://localhost:6379/1')

Read more about it in readme

  • Refactor MessageBus::Backends::Redis#publish to no longer rely on lua script

Breaking changes

  • Change the implementation of MessageBus::Message#encode and MessageBus::Message.decode. Previously it was using custom format to encode/decode messages. In new implementation it uses json for this purpose. This change affects on messages which is currently queued for publishing via pub/sub implementation. But for redis adapter it also affects the backlog. So, if you use redis adapter - it is recommended to reset the whole db by running MessageBus.reset! right before applying new changes. For the pg and in memory adapter it is just enough to apply new changes.
  • Change the behaviour of client_message_filter. Previously it was accepting only one message argument(MessageBus::Message). In new implementation it accepts two arguments - params(Hash) and message(MessageBus::Message). Query params(params which go in the long polling request url) are now parsed and passed into client_message_filter proc for better manipulation over message filters. Example:
MessageBus.register_client_message_filter('/test') do |params, message|
  (Time.now.to_i - message.data['published_at']) <= params['age'].to_i
end

So, if you currently have something like this:

MessageBus.register_client_message_filter('/test') do |message|
  (Time.now.to_i - message.data[:published_at]) <= 20
end

you should change it to this:

MessageBus.register_client_message_filter('/test') do |_env, message|
  (Time.now.to_i - message.data['published_at']) <= 20
end

See docs for more info

@intale
Copy link
Author

intale commented Jul 18, 2023

@tgxworld @SamSaffron @benlangfeld hey guys. Could someone review this PR, please?

@SamSaffron
Copy link
Member

I am sorry, but I am not too happy about this change. This is an enormous breaking change, I am not seeing the value behind it? Can you explain why we would want to go down this path?

Moving away from LUA has performance impact.

Feels very much like a "shuffling stuff around" change, this is not widening message bus support and giving it a real new backend (like sudden support for mysql or some other provider)

@jeremyevans any thoughts here?

@jeremyevans
Copy link
Collaborator

I would guess the reason for this is you want the storage in PostgreSQL and the pub/sub in redis, but I'm not sure why you would want that. I understand the LISTEN/NOTIFY issue, but I don't understand why is the redis backend not sufficient in that case? Why do you want to separate the storage and pub/sub? Any separation seems to increase the likelihood that the storage and pub/sub could get out of sync, such as rolling back a containing PostgreSQL transaction (removing the storage but keeping the pub/sub).

Even if you do want storage in PostgreSQL and pub/sub in redis, if this is specific to PostgreSQL anyway, why involve ActiveRecord at all? You could design a backend that directly uses both redis and pg without involving ActiveRecord. What value does ActiveRecord add here? Why is it worth adding new test dependencies?

I don't think we should consider accepting this unless it can be implemented in a backwards compatible manner. Any backwards incompatible changes should require a detailed justification, and ideally in separate pull requests before this is considered. Gratuitous backwards incompatible changes like making the new block argument to MessageBus.register_client_message_filter the first argument instead of the second seem to indicate that backwards compatibility was not treated with due priority. Changing the method at all seems unnecessary, since you could add a new method for the case where you want to pass params, and turn existing calls into MessageBus.register_client_message_filter to call this new method internally.

@intale
Copy link
Author

intale commented Jul 24, 2023

@SamSaffron

This is an enormous breaking change, I am not seeing the value behind it?

The intention was to provide easy way to manipulate queries by introducing AR and by introducing jsonb column that stores data. During the development it appeared it is not that easy to pass custom condition into a AR client class to manipulate queries due to unsuitable architecture of the library. Eventually it was decided to go with AR adapter anyway - it is more convenient to deal with AR model - from the perspective of the understanding of the code and from the perspective of the integration.

Moving away from LUA has performance impact.

Certainly. But the difference is so irrelevant that it doesn't worse that overengineering using LUA. So, in this case simpler -> better. Besides, I got rid from that custom format of encoded MessageBus::Message. A script that was used for measuring(codebase was used from my repo from main branch, and redis_old is redis.rb from your repo from main branch; docker-compose.yml was used from my repo to run redis server):

require 'message_bus'
require 'message_bus/backends/redis'
require 'message_bus/backends/redis_old'
require 'benchmark'

new_client = MessageBus::Backends::Redis.new(url: 'redis://localhost:6479/1')
old_client = MessageBus::Backends::RedisOld.new(url: 'redis://localhost:6479/1')

newc_result = Benchmark.realtime do
  100_000.times do
    new_client.publish('new-channel', { foo: :bar })
  end
end

oldc_result = Benchmark.realtime do
  100_000.times do
    old_client.publish('new-channel', { foo: :bar })
  end
end

puts "New client speed: #{newc_result / 100_000.0} seconds per publish command"
puts "Old client speed: #{oldc_result / 100_000.0} seconds per publish command"

Results:

New client speed: 0.00020852351593000093 seconds per publish command
Old client speed: 3.347475096999915e-05 seconds per publish command

@jeremyevans

I would guess the reason for this is you want the storage in PostgreSQL and the pub/sub in redis, but I'm not sure why you would want that. I understand the LISTEN/NOTIFY issue, but I don't understand why is the redis backend not sufficient in that case?

To take the best from two worlds - from pg I would like to use jsonb to have a well structured data and from redis I want the ability to handle thousands of connections at a time.

Why do you want to separate the storage and pub/sub? Any separation seems to increase the likelihood that the storage and pub/sub could get out of sync, such as rolling back a containing PostgreSQL transaction (removing the storage but keeping the pub/sub).

Only redis implementation has atomic storage & publish. All other implementations(pg, redis, in-memory) - has it as two different operations, which is logical. Why would AR adapter work differently? Certainly - if someone would add "dangerous" logic into AR model that could cause rollbacks - then yes, but I don't think I am responsible for that mistakes.

Even if you do want storage in PostgreSQL and pub/sub in redis, if this is specific to PostgreSQL anyway, why involve ActiveRecord at all? You could design a backend that directly uses both redis and pg without involving ActiveRecord. What value does ActiveRecord add here?

It is easier to deal with, like debug, manipulate, etc.

Why is it worth adding new test dependencies?

Do you mean dotenv gem? For the convenience of the development of course. 🤷‍♂️

I don't think we should consider accepting this unless it can be implemented in a backwards compatible manner. Any backwards incompatible changes should require a detailed justification, and ideally in separate pull requests before this is considered. Gratuitous backwards incompatible changes like making the new block argument to MessageBus.register_client_message_filter the first argument instead of the second seem to indicate that backwards compatibility was not treated with due priority. Changing the method at all seems unnecessary, since you could add a new method for the case where you want to pass params, and turn existing calls into MessageBus.register_client_message_filter to call this new method internally.

Well, the only reason why I would need to make it backwards compatible is to target these changes to message_bus v4. And this is impossible due to improvements I did to redis adapter and MessageBus::Message encode/decode functional. So, these changes are targeted to message_bus v5 I would say.

Thanks for your feedback and for your time, guys. If after my explanations you still don't like it - simply close this PR, AR adapter support will keep living under yousty/message_bus fork. Those changes were implemented as a part of needs of Yousty team - we just wanted to share them with public ;)

@ncri
Copy link

ncri commented Jul 24, 2023

@SamSaffron @jeremyevans thanks for looking into this. Yes, as @intale explained, we made the changes because of needs we had for extending message bus, mainly:

  • Pub/sub issue Postgres: we cannot subscribe from a specific position in the messages table, so when restarting our listener, we cannot start where we left off
  • Avoid connection limit issues with postgres when polling for new messages
  • Add capability to filter messages based on polling parameters and custom message payload

We decided to use ActiveRecord, because it is convenient for us as we use Rails.

@SamSaffron
Copy link
Member

I hear you on all of this here but the change is way too big to consume as is, it mixes in too many things in one spot.

it is not ideal for you to run from a fork but we need to be much more careful about stability here, breaking changes should be an absolute last resort.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants