diff --git a/Node/Workers/FlowWorker.cs b/Node/Workers/FlowWorker.cs index ea382e875..4e460e6ce 100644 --- a/Node/Workers/FlowWorker.cs +++ b/Node/Workers/FlowWorker.cs @@ -8,6 +8,7 @@ using FileFlows.ServerShared.Workers; using FileFlows.Shared.Models; using Jint.Native.Json; +using System.Collections.Concurrent; namespace FileFlows.Node.Workers; @@ -15,7 +16,7 @@ namespace FileFlows.Node.Workers; /// /// A flow worker executes a flow and start a flow runner /// -public class FlowWorker : Worker +public class FlowWorker : Worker, IWorkerThatUsesTempDirectories { /// /// A unique identifier to identify the flow worker @@ -25,7 +26,13 @@ public class FlowWorker : Worker public readonly Guid Uid = Guid.NewGuid(); private readonly string _configKeyDefault = Guid.NewGuid().ToString(); - + + public bool IsTempDirectoryInUse(string directory) + { + var tempDirectoryInUse = ExecutingRunners.Keys.Any(t => directory.EndsWith(t.ToString(), StringComparison.OrdinalIgnoreCase)); + return tempDirectoryInUse; + } + /// /// Gets if the config encryption key /// @@ -78,7 +85,10 @@ private bool GetConfigNoEncrypt(ProcessingNode node) private static FlowWorker? Instance; private readonly Mutex mutex = new Mutex(); - private readonly List ExecutingRunners = new (); + /// + /// Concurrent because we have the TempFileCleaner running in parallel + /// + private readonly ConcurrentDictionary ExecutingRunners = new (); private const int DEFAULT_INTERVAL = 10; @@ -506,7 +516,7 @@ private void AddExecutingRunner(Guid uid) mutex.WaitOne(); try { - ExecutingRunners.Add(uid); + ExecutingRunners.TryAdd(uid, null); } finally { @@ -525,8 +535,10 @@ private void RemoveExecutingRunner(Guid uid) mutex.WaitOne(); try { - if (ExecutingRunners.Contains(uid)) - ExecutingRunners.Remove(uid); + if (ExecutingRunners.TryRemove(uid, out var _)) + { + + } else { Logger.Instance?.ILog("Executing runner not in list: " + uid +" => " + string.Join(",", ExecutingRunners.Select(x => x.ToString()))); @@ -742,5 +754,4 @@ private async Task UpdateConfiguration(ProcessingNode node) return true; } - } diff --git a/ServerShared/IWorkerThatUsesTempDirectories.cs b/ServerShared/IWorkerThatUsesTempDirectories.cs new file mode 100644 index 000000000..48dbdb8b9 --- /dev/null +++ b/ServerShared/IWorkerThatUsesTempDirectories.cs @@ -0,0 +1,7 @@ +namespace FileFlows.ServerShared +{ + public interface IWorkerThatUsesTempDirectories + { + public bool IsTempDirectoryInUse(string directory); + } +} diff --git a/ServerShared/Workers/TempFileCleaner.cs b/ServerShared/Workers/TempFileCleaner.cs index 32136fcd3..428f2eaea 100644 --- a/ServerShared/Workers/TempFileCleaner.cs +++ b/ServerShared/Workers/TempFileCleaner.cs @@ -8,14 +8,17 @@ namespace FileFlows.ServerShared.Workers; public class TempFileCleaner:Worker { private string nodeAddress; - + private readonly IWorkerThatUsesTempDirectories workerThatUsesTempDirectories; + /// /// Constructs a temp file cleaner /// The name of the node + /// The worker that uses temp directories, this is used to verify that a temp directory should not be removed while still in use" /// - public TempFileCleaner(string nodeAddress) : base(ScheduleType.Daily, 5) + public TempFileCleaner(string nodeAddress, IWorkerThatUsesTempDirectories workerThatUsesTempDirectories) : base(ScheduleType.Daily, 5) { this.nodeAddress = nodeAddress; + this.workerThatUsesTempDirectories = workerThatUsesTempDirectories; Trigger(); } @@ -37,6 +40,9 @@ protected sealed override void Execute() Logger.Instance?.ILog("About to clean temporary directory: " + tempDir.FullName); foreach (var dir in tempDir.GetDirectories()) { + if (workerThatUsesTempDirectories.IsTempDirectoryInUse(dir.Name)) + continue; + if (dir.CreationTimeUtc < DateTime.UtcNow.AddDays(-1)) { try