Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Image analysis job #307

Merged
merged 21 commits into from May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 45 additions & 2 deletions apps/server/src/routers/api/v1/media.rs
Expand Up @@ -3,13 +3,13 @@ use std::path::PathBuf;
use axum::{
extract::{DefaultBodyLimit, Multipart, Path, State},
middleware::from_extractor_with_state,
routing::{get, put},
routing::{get, post, put},
Json, Router,
};
use axum_extra::extract::Query;
use prisma_client_rust::{
and,
chrono::{Duration, Utc},
chrono::{DateTime, Duration, Utc},
operator::{self, or},
or, raw, Direction, PrismaValue,
};
Expand All @@ -23,6 +23,7 @@ use stump_core::{
CountQueryReturn,
},
filesystem::{
analyze_media_job::{AnalyzeMediaJob, AnalyzeMediaJobVariant},
get_unknown_thumnail,
image::{
generate_thumbnail, place_thumbnail, remove_thumbnails, ImageFormat,
Expand Down Expand Up @@ -81,6 +82,7 @@ pub(crate) fn mount(app_state: AppState) -> Router<AppState> {
// TODO: configurable max file size
.layer(DefaultBodyLimit::max(20 * 1024 * 1024)), // 20MB
)
.route("/analyze", post(start_media_analysis))
.route("/page/:page", get(get_media_page))
.route(
"/progress",
Expand Down Expand Up @@ -1193,6 +1195,7 @@ async fn replace_media_thumbnail(

// Note: I chose to *safely* attempt the removal as to not block the upload, however after some
// user testing I'd like to see if this becomes a problem. We'll see!
// TODO - What was the outcome of this testing?
JMicheli marked this conversation as resolved.
Show resolved Hide resolved
remove_thumbnails(&[book_id.clone()], ctx.config.get_thumbnails_dir())
.unwrap_or_else(|e| {
tracing::error!(
Expand All @@ -1209,6 +1212,46 @@ async fn replace_media_thumbnail(
)))
}

#[derive(Default, Deserialize, Serialize)]
pub struct MediaAnalysisStarted {
started_at: Option<String>,
}

JMicheli marked this conversation as resolved.
Show resolved Hide resolved
#[utoipa::path(
post,
path = "/api/v1/media/:id/analyze",
tag = "media",
params(
("id" = String, Path, description = "The ID of the media to analyze")
),
responses(
(status = 200, description = "Successfully started media analysis"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden"),
(status = 404, description = "Media not found"),
(status = 500, description = "Internal server error"),
)
)]
async fn start_media_analysis(
Path(id): Path<String>,
State(ctx): State<AppState>,
session: Session,
) -> APIResult<Json<MediaAnalysisStarted>> {
tracing::warn!("Recieved start_media_analysis request for id: {}", id);
// TODO enforce permissions

// Start analysis job
ctx.enqueue_job(AnalyzeMediaJob::new(
AnalyzeMediaJobVariant::AnalyzeSingleItem(id),
))
.unwrap();

APIResult::Ok(Json(MediaAnalysisStarted {
JMicheli marked this conversation as resolved.
Show resolved Hide resolved
// TODO fix this
started_at: Some("Now".to_string()),
}))
}

#[utoipa::path(
put,
path = "/api/v1/media/:id/progress/:page",
Expand Down
1 change: 1 addition & 0 deletions core/src/filesystem/image/thumbnail/generation_job.rs
Expand Up @@ -137,6 +137,7 @@ impl JobExt for ThumbnailGenerationJob {
ThumbnailGenerationJobVariant::MediaGroup(media_ids) => media_ids.clone(),
};

// TODO Should find a way to keep the same ThumbnailManager around for the whole job execution
let manager = ThumbnailManager::new(ctx.config.clone())
.map_err(|e| JobError::TaskFailed(e.to_string()))?;

Expand Down
1 change: 0 additions & 1 deletion core/src/filesystem/image/thumbnail/mod.rs
Expand Up @@ -68,7 +68,6 @@ pub fn generate_thumbnail(
Ok(thumbnail_path)
}

// TODO: does this need to return a result?
pub fn generate_thumbnails(
media: &[Media],
options: ImageProcessorOptions,
Expand Down
184 changes: 184 additions & 0 deletions core/src/filesystem/media/analyze_media_job.rs
@@ -0,0 +1,184 @@
use std::collections::VecDeque;

use serde::{Deserialize, Serialize};
use specta::Type;

use crate::{
filesystem::media::process::get_page_count,
job::{
error::JobError, JobExecuteLog, JobExt, JobOutputExt, JobTaskOutput, WorkerCtx,
WorkingState, WrappedJob,
},
prisma::{media, series},
};

#[derive(Clone)]
pub enum AnalyzeMediaJobVariant {
AnalyzeSingleItem(String),
AnalyzeLibrary(String),
AnalyzeSeries(String),
AnalyzeMediaGroup(Vec<String>),
}
JMicheli marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Serialize, Deserialize, Debug)]
pub enum AnalyzeMediaTask {
AnalyzeImage(String),
}

#[derive(Clone, Serialize, Deserialize, Default, Debug, Type)]
pub struct AnalyzeMediaOutput {
/// The number of images analyzed
images_analyzed: u64,
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could likely add more state to this, e.g. media_updated, but this is a good start and I'm fine leaving as-is if that is what you decide


impl JobOutputExt for AnalyzeMediaOutput {
fn update(&mut self, updated: Self) {
self.images_analyzed += updated.images_analyzed;
}
}

/// A job that analyzes a media item and updates the database
/// with information from the analysis.
#[derive(Clone)]
pub struct AnalyzeMediaJob {
pub variant: AnalyzeMediaJobVariant,
}

impl AnalyzeMediaJob {
/// Create a new [AnalyzeMediaJob] for the media specified by `id`.
pub fn new(variant: AnalyzeMediaJobVariant) -> Box<WrappedJob<AnalyzeMediaJob>> {
WrappedJob::new(Self { variant })
}
}

#[async_trait::async_trait]
impl JobExt for AnalyzeMediaJob {
const NAME: &'static str = "analyze_media";

type Output = AnalyzeMediaOutput;
type Task = AnalyzeMediaTask;

fn description(&self) -> Option<String> {
match &self.variant {
AnalyzeMediaJobVariant::AnalyzeSingleItem(id) => {
Some(format!("Analyze media item with id: {}", id))
},
AnalyzeMediaJobVariant::AnalyzeLibrary(id) => {
Some(format!("Analyze library with id: {}", id))
},
AnalyzeMediaJobVariant::AnalyzeSeries(id) => {
Some(format!("Analyze series with id: {}", id))
},
AnalyzeMediaJobVariant::AnalyzeMediaGroup(ids) => {
Some(format!("Analyze media group with ids: {:?}", ids))
},
}
}

async fn init(
&mut self,
ctx: &WorkerCtx,
) -> Result<WorkingState<Self::Output, Self::Task>, JobError> {
tracing::warn!("In AnalyzeMediaJob init()");
let output = Self::Output::default();

// We match over the job variant to build a list of tasks to process
let mut tasks = VecDeque::new();
JMicheli marked this conversation as resolved.
Show resolved Hide resolved
match &self.variant {
// Single item is easy
AnalyzeMediaJobVariant::AnalyzeSingleItem(id) => {
tasks.push_front(AnalyzeMediaTask::AnalyzeImage(id.clone()))
},
// For libraries we need a list of ids
AnalyzeMediaJobVariant::AnalyzeLibrary(id) => {
let library_media = ctx
.db
.media()
.find_many(vec![media::series::is(vec![series::library_id::equals(
Some(id.clone()),
)])])
.exec()
.await
.map_err(|e| JobError::InitFailed(e.to_string()))?;
JMicheli marked this conversation as resolved.
Show resolved Hide resolved

for media in library_media {
tasks.push_front(AnalyzeMediaTask::AnalyzeImage(media.id))
}
},
// We also need a list for series
AnalyzeMediaJobVariant::AnalyzeSeries(id) => {
let series_media = ctx
.db
.media()
.find_many(vec![media::series_id::equals(Some(id.clone()))])
.exec()
.await
.map_err(|e| JobError::InitFailed(e.to_string()))?;

for media in series_media {
tasks.push_front(AnalyzeMediaTask::AnalyzeImage(media.id))
}
},
// Media groups already include a vector of ids
AnalyzeMediaJobVariant::AnalyzeMediaGroup(ids) => {
for id in ids {
tasks.push_front(AnalyzeMediaTask::AnalyzeImage(id.clone()))
}
},
};

Ok(WorkingState {
output: Some(output),
tasks,
completed_tasks: 0,
logs: vec![],
})
}

async fn execute_task(
&self,
ctx: &WorkerCtx,
task: Self::Task,
) -> Result<JobTaskOutput<Self>, JobError> {
let output = Self::Output::default();

match task {
AnalyzeMediaTask::AnalyzeImage(id) => {
// Get media by id from the database
let media = ctx
.db
.media()
.find_unique(media::id::equals(id.clone()))
.exec()
.await
.map_err(|e| JobError::TaskFailed(e.to_string()))?;

// Error if media item unavailable
if media.is_none() {
return Err(JobError::TaskFailed(format!(
"Unable to find media item with id: {}",
id
)));
}
JMicheli marked this conversation as resolved.
Show resolved Hide resolved

// Get page count using file processing
let path = media.unwrap().path;
let page_count = get_page_count(&path, &ctx.config)?;

// Update media item in database
let media = ctx
.db
.media()
.update(media::id::equals(id), vec![media::pages::set(page_count)])
.exec()
.await?;
},
}

Ok(JobTaskOutput {
output,
subtasks: vec![],
logs: vec![],
})
}
}
5 changes: 5 additions & 0 deletions core/src/filesystem/media/epub.rs
Expand Up @@ -97,6 +97,11 @@ impl FileProcessor for EpubProcessor {
}
}

fn get_page_count(path: &str, _: &StumpConfig) -> Result<i32, FileError> {
let epub_file = Self::open(path)?;
Ok(epub_file.get_num_pages() as i32)
JMicheli marked this conversation as resolved.
Show resolved Hide resolved
}

fn get_page_content_types(
path: &str,
pages: Vec<i32>,
Expand Down
1 change: 1 addition & 0 deletions core/src/filesystem/media/mod.rs
@@ -1,3 +1,4 @@
pub mod analyze_media_job;
mod builder;
mod common;
pub(crate) mod epub;
Expand Down
7 changes: 7 additions & 0 deletions core/src/filesystem/media/pdf.rs
Expand Up @@ -118,6 +118,13 @@ impl FileProcessor for PdfProcessor {
}
}

fn get_page_count(path: &str, config: &StumpConfig) -> Result<i32, FileError> {
let pdfium = PdfProcessor::renderer(&config.pdfium_path)?;
let document = pdfium.load_pdf_from_file(path, None)?;

Ok(document.pages().len() as i32)
}

fn get_page_content_types(
_: &str,
pages: Vec<i32>,
Expand Down
19 changes: 19 additions & 0 deletions core/src/filesystem/media/process.rs
Expand Up @@ -69,6 +69,9 @@ pub trait FileProcessor {
config: &StumpConfig,
) -> Result<(ContentType, Vec<u8>), FileError>;

/// Get the number of pages in the file.
fn get_page_count(path: &str, config: &StumpConfig) -> Result<i32, FileError>;

/// Get the content types of a list of pages of the file. This should determine content
/// types by actually testing the bytes for each page.
fn get_page_content_types(
Expand Down Expand Up @@ -160,6 +163,22 @@ pub fn get_page(
}
}

pub fn get_page_count(path: &str, config: &StumpConfig) -> Result<i32, FileError> {
let mime = ContentType::from_file(path).mime_type();

match mime.as_str() {
"application/zip" | "application/vnd.comicbook+zip" => {
ZipProcessor::get_page_count(path, config)
},
"application/vnd.rar" | "application/vnd.comicbook-rar" => {
RarProcessor::get_page_count(path, config)
},
"application/epub+zip" => EpubProcessor::get_page_count(path, config),
"application/pdf" => PdfProcessor::get_page_count(path, config),
_ => Err(FileError::UnsupportedFileType(path.to_string())),
}
}

pub fn get_content_types_for_pages(
path: &str,
pages: Vec<i32>,
Expand Down
24 changes: 24 additions & 0 deletions core/src/filesystem/media/rar.rs
Expand Up @@ -208,6 +208,30 @@ impl FileProcessor for RarProcessor {
Ok((content_type, bytes.ok_or(FileError::NoImageError)?))
}

fn get_page_count(path: &str, _: &StumpConfig) -> Result<i32, FileError> {
let archive = RarProcessor::open_for_listing(path)?;

// Get all valid page entries
let valid_entries = archive
.into_iter()
.filter_map(|entry| entry.ok())
.filter(|entry| {
if entry.is_file() {
let filename =
entry.filename.as_path().to_string_lossy().to_lowercase();
filename.ends_with(".jpg")
|| filename.ends_with(".jpeg")
|| filename.ends_with(".png")
JMicheli marked this conversation as resolved.
Show resolved Hide resolved
} else {
false
}
})
.collect::<Vec<_>>();

// Return the count of them
Ok(valid_entries.len() as i32)
}

fn get_page_content_types(
path: &str,
pages: Vec<i32>,
Expand Down