diff --git a/Node/Workers/FlowWorker.cs b/Node/Workers/FlowWorker.cs
index ea382e875..4e460e6ce 100644
--- a/Node/Workers/FlowWorker.cs
+++ b/Node/Workers/FlowWorker.cs
@@ -8,6 +8,7 @@
using FileFlows.ServerShared.Workers;
using FileFlows.Shared.Models;
using Jint.Native.Json;
+using System.Collections.Concurrent;
namespace FileFlows.Node.Workers;
@@ -15,7 +16,7 @@ namespace FileFlows.Node.Workers;
///
/// A flow worker executes a flow and start a flow runner
///
-public class FlowWorker : Worker
+public class FlowWorker : Worker, IWorkerThatUsesTempDirectories
{
///
/// A unique identifier to identify the flow worker
@@ -25,7 +26,13 @@ public class FlowWorker : Worker
public readonly Guid Uid = Guid.NewGuid();
private readonly string _configKeyDefault = Guid.NewGuid().ToString();
-
+
+ public bool IsTempDirectoryInUse(string directory)
+ {
+ var tempDirectoryInUse = ExecutingRunners.Keys.Any(t => directory.EndsWith(t.ToString(), StringComparison.OrdinalIgnoreCase));
+ return tempDirectoryInUse;
+ }
+
///
/// Gets if the config encryption key
///
@@ -78,7 +85,10 @@ private bool GetConfigNoEncrypt(ProcessingNode node)
private static FlowWorker? Instance;
private readonly Mutex mutex = new Mutex();
- private readonly List ExecutingRunners = new ();
+ ///
+ /// Concurrent because we have the TempFileCleaner running in parallel
+ ///
+ private readonly ConcurrentDictionary ExecutingRunners = new ();
private const int DEFAULT_INTERVAL = 10;
@@ -506,7 +516,7 @@ private void AddExecutingRunner(Guid uid)
mutex.WaitOne();
try
{
- ExecutingRunners.Add(uid);
+ ExecutingRunners.TryAdd(uid, null);
}
finally
{
@@ -525,8 +535,10 @@ private void RemoveExecutingRunner(Guid uid)
mutex.WaitOne();
try
{
- if (ExecutingRunners.Contains(uid))
- ExecutingRunners.Remove(uid);
+ if (ExecutingRunners.TryRemove(uid, out var _))
+ {
+
+ }
else
{
Logger.Instance?.ILog("Executing runner not in list: " + uid +" => " + string.Join(",", ExecutingRunners.Select(x => x.ToString())));
@@ -742,5 +754,4 @@ private async Task UpdateConfiguration(ProcessingNode node)
return true;
}
-
}
diff --git a/ServerShared/IWorkerThatUsesTempDirectories.cs b/ServerShared/IWorkerThatUsesTempDirectories.cs
new file mode 100644
index 000000000..48dbdb8b9
--- /dev/null
+++ b/ServerShared/IWorkerThatUsesTempDirectories.cs
@@ -0,0 +1,7 @@
+namespace FileFlows.ServerShared
+{
+ public interface IWorkerThatUsesTempDirectories
+ {
+ public bool IsTempDirectoryInUse(string directory);
+ }
+}
diff --git a/ServerShared/Workers/TempFileCleaner.cs b/ServerShared/Workers/TempFileCleaner.cs
index 32136fcd3..428f2eaea 100644
--- a/ServerShared/Workers/TempFileCleaner.cs
+++ b/ServerShared/Workers/TempFileCleaner.cs
@@ -8,14 +8,17 @@ namespace FileFlows.ServerShared.Workers;
public class TempFileCleaner:Worker
{
private string nodeAddress;
-
+ private readonly IWorkerThatUsesTempDirectories workerThatUsesTempDirectories;
+
///
/// Constructs a temp file cleaner
/// The name of the node
+ /// The worker that uses temp directories, this is used to verify that a temp directory should not be removed while still in use"
///
- public TempFileCleaner(string nodeAddress) : base(ScheduleType.Daily, 5)
+ public TempFileCleaner(string nodeAddress, IWorkerThatUsesTempDirectories workerThatUsesTempDirectories) : base(ScheduleType.Daily, 5)
{
this.nodeAddress = nodeAddress;
+ this.workerThatUsesTempDirectories = workerThatUsesTempDirectories;
Trigger();
}
@@ -37,6 +40,9 @@ protected sealed override void Execute()
Logger.Instance?.ILog("About to clean temporary directory: " + tempDir.FullName);
foreach (var dir in tempDir.GetDirectories())
{
+ if (workerThatUsesTempDirectories.IsTempDirectoryInUse(dir.Name))
+ continue;
+
if (dir.CreationTimeUtc < DateTime.UtcNow.AddDays(-1))
{
try