Skip to content

Commit

Permalink
Batch user notifications together
Browse files Browse the repository at this point in the history
  • Loading branch information
999eagle committed Mar 16, 2024
1 parent 99a5e9c commit 78768f1
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 33 deletions.
5 changes: 3 additions & 2 deletions src/invidious.cr
Expand Up @@ -180,8 +180,9 @@ if CONFIG.popular_enabled
Invidious::Jobs.register Invidious::Jobs::PullPopularVideosJob.new(PG_DB)
end

CONNECTION_CHANNEL = ::Channel({Bool, ::Channel(PQ::Notification)}).new(32)
Invidious::Jobs.register Invidious::Jobs::NotificationJob.new(CONNECTION_CHANNEL, CONFIG.database_url)
NOTIFICATION_CHANNEL = ::Channel(VideoNotification).new(32)
CONNECTION_CHANNEL = ::Channel({Bool, ::Channel(PQ::Notification)}).new(32)
Invidious::Jobs.register Invidious::Jobs::NotificationJob.new(NOTIFICATION_CHANNEL, CONNECTION_CHANNEL, CONFIG.database_url)

Invidious::Jobs.register Invidious::Jobs::ClearExpiredItemsJob.new

Expand Down
12 changes: 2 additions & 10 deletions src/invidious/channels/channels.cr
Expand Up @@ -249,11 +249,7 @@ def fetch_channel(ucid, pull_all_videos : Bool)

if was_insert
LOGGER.trace("fetch_channel: #{ucid} : video #{video_id} : Inserted, updating subscriptions")
if CONFIG.enable_user_notifications
Invidious::Database::Users.add_notification(video)
else
Invidious::Database::Users.feed_needs_update(video)
end
NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video))
else
LOGGER.trace("fetch_channel: #{ucid} : video #{video_id} : Updated")
end
Expand Down Expand Up @@ -285,11 +281,7 @@ def fetch_channel(ucid, pull_all_videos : Bool)
if Time.utc - video.published > 1.minute
was_insert = Invidious::Database::ChannelVideos.insert(video)
if was_insert
if CONFIG.enable_user_notifications
Invidious::Database::Users.add_notification(video)
else
Invidious::Database::Users.feed_needs_update(video)
end
NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video))
end
end
end
Expand Down
10 changes: 5 additions & 5 deletions src/invidious/database/users.cr
Expand Up @@ -119,15 +119,15 @@ module Invidious::Database::Users
# Update (notifs)
# -------------------

def add_notification(video : ChannelVideo)
def add_multiple_notifications(channel_id : String, video_ids : Array(String))
request = <<-SQL
UPDATE users
SET notifications = array_append(notifications, $1),
SET notifications = array_cat(notifications, $1),
feed_needs_update = true
WHERE $2 = ANY(subscriptions)
SQL

PG_DB.exec(request, video.id, video.ucid)
PG_DB.exec(request, video_ids, channel_id)
end

def remove_notification(user : User, vid : String)
Expand All @@ -154,14 +154,14 @@ module Invidious::Database::Users
# Update (misc)
# -------------------

def feed_needs_update(video : ChannelVideo)
def feed_needs_update(channel_id : String)
request = <<-SQL
UPDATE users
SET feed_needs_update = true
WHERE $1 = ANY(subscriptions)
SQL

PG_DB.exec(request, video.ucid)
PG_DB.exec(request, channel_id)
end

def update_preferences(user : User)
Expand Down
78 changes: 77 additions & 1 deletion src/invidious/jobs/notification_job.cr
@@ -1,15 +1,91 @@
struct VideoNotification
getter video_id : String
getter channel_id : String
getter published : Time

def_hash @channel_id, @video_id

def ==(other)
video_id == other.video_id
end

def self.from_video(video : ChannelVideo) : self
VideoNotification.new(video.id, video.ucid, video.published)
end

def initialize(@video_id, @channel_id, @published)
end

def clone : VideoNotification
VideoNotification.new(video_id.clone, channel_id.clone, published.clone)
end
end

class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
private getter notification_channel : ::Channel(VideoNotification)
private getter connection_channel : ::Channel({Bool, ::Channel(PQ::Notification)})
private getter pg_url : URI

def initialize(@connection_channel, @pg_url)
def initialize(@notification_channel, @connection_channel, @pg_url)
end

def begin
connections = [] of ::Channel(PQ::Notification)

PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) }

# hash of channels to their videos (id+published) that need notifying
to_notify = Hash(String, Set(VideoNotification)).new(->(hash : Hash(String, Set(VideoNotification)), key : String) { hash[key] = Set(VideoNotification).new })

# fiber to locally cache all incoming notifications (from pubsub webhooks and refresh channels job)
spawn do
begin
loop do
notification = notification_channel.receive
to_notify[notification.channel_id] << notification
end
end
end
# fiber to regularly persist all cached notifications
spawn do
loop do
begin
LOGGER.debug("NotificationJob: waking up")
cloned = to_notify.clone
to_notify.clear

cloned.each do |channel_id, notifications|
if notifications.empty?
next
end

LOGGER.info("NotificationJob: updating channel #{channel_id} with #{notifications.size} notifications")
if CONFIG.enable_user_notifications
video_ids = notifications.map { |n| n.video_id }
Invidious::Database::Users.add_multiple_notifications(channel_id, video_ids)
notifications.each do |n|
# Deliver notifications to `/api/v1/auth/notifications`
payload = {
"topic" => n.channel_id,
"videoId" => n.video_id,
"published" => n.published.to_unix,
}.to_json
PG_DB.exec("NOTIFY notifications, E'#{payload}'")
end
else
Invidious::Database::Users.feed_needs_update(channel_id)
end
end

LOGGER.trace("NotificationJob: Done, sleeping")
rescue ex
LOGGER.error("NotificationJob: #{ex.message}")
end
sleep 1.minute
Fiber.yield
end
end

loop do
action, connection = connection_channel.receive

Expand Down
16 changes: 1 addition & 15 deletions src/invidious/routes/feeds.cr
Expand Up @@ -425,16 +425,6 @@ module Invidious::Routes::Feeds
next # skip this video since it raised an exception (e.g. it is a scheduled live event)
end

if CONFIG.enable_user_notifications
# Deliver notifications to `/api/v1/auth/notifications`
payload = {
"topic" => video.ucid,
"videoId" => video.id,
"published" => published.to_unix,
}.to_json
PG_DB.exec("NOTIFY notifications, E'#{payload}'")
end

video = ChannelVideo.new({
id: id,
title: video.title,
Expand All @@ -450,11 +440,7 @@ module Invidious::Routes::Feeds

was_insert = Invidious::Database::ChannelVideos.insert(video, with_premiere_timestamp: true)
if was_insert
if CONFIG.enable_user_notifications
Invidious::Database::Users.add_notification(video)
else
Invidious::Database::Users.feed_needs_update(video)
end
NOTIFICATION_CHANNEL.send(VideoNotification.from_video(video))
end
end
end
Expand Down

0 comments on commit 78768f1

Please sign in to comment.