Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit jobs to 5 and wrap in exception handler to release #4366

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 35 additions & 13 deletions src/invidious/jobs.cr
@@ -1,40 +1,62 @@
module Invidious::Jobs
# This line defines an empty array named JOBS to hold objects of type BaseJob.
JOBS = [] of BaseJob

# Automatically generate a structure that wraps the various
# jobs' configs, so that the following YAML config can be used:
#
# jobs:
# job_name:
# enabled: true
# some_property: "value"
#
Comment on lines -4 to -11
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't touch comments for code that is not related to your PR!

# SEMAPHORE is a Channel that allows up to 5 items (jobs) to be processed concurrently.
# This is a way to limit the maximum number of jobs running at the same time to 5.
SEMAPHORE = ::Channel(Nil).new(5)

# The `macro finished` block is executed once the module definition is complete.
macro finished
# Define a struct named JobsConfig inside the module.
# This struct is for storing configuration for different jobs.
struct JobsConfig
include YAML::Serializable
include YAML::Serializable # Allows serialization and deserialization from YAML.

# This loop iterates over all subclasses of BaseJob.
{% for sc in BaseJob.subclasses %}
# Voodoo macro to transform `Some::Module::CustomJob` to `custom`
# Generate a getter method for each job. The job's class name is transformed
# from something like `Some::Module::CustomJob` to a simpler form `custom`.
{% class_name = sc.id.split("::").last.id.gsub(/Job$/, "").underscore %}

getter {{ class_name }} = {{ sc.name }}::Config.new
{% end %}

# Define an empty initializer.
def initialize
end
end
end

# This class method allows registration of a job to the JOBS array.
def self.register(job : BaseJob)
JOBS << job
end

# This class method starts all registered jobs.
def self.start_all
# Iterate over each job in the JOBS array.
JOBS.each do |job|
# Don't run the main rountine if the job is disabled by config
next if job.disabled?
# Send a nil value to the SEMAPHORE channel. This is like acquiring a "slot".
# If the SEMAPHORE is full (5 jobs running), this line will block until a slot is free.
SEMAPHORE.send(nil)

# Start a new concurrent fiber (lightweight thread) for the job.
spawn do
begin
# If the job is disabled in its configuration, skip the execution.
next if job.disabled?

spawn { job.begin }
# Start the job by calling its 'begin' method.
job.begin
rescue ex
# If an exception occurs, log the error.
Log.error { "Job failed: #{ex.message}" }
ensure
# After the job is finished or if an error occurred, release the "slot" in the semaphore.
SEMAPHORE.receive
end
end
end
end
end