-
Hi, I am a bit stuck trying to get something to work. I would greatly appreciate some help! The scenario is the following:
However:
This is causing the pipeline created in Here are some snippets of the code which I think are important: The code that opens the store, reads the webcam stream and pipes it to a the await dataset.CreateDerivedPartitionAsync((pipeline, importer, exporter) =>
{
IProducer<Shared<Image>> webcamStream = importer.OpenStream<Shared<Image>>(ParticipantPipeline.WEBCAM_STREAM_NAME);
EmotionsDetector emotionsDetector = new EmotionsDetector(pipeline);
webcamStream.Do(frame => this.DrawFrame(frame)).PipeTo(emotionsDetector);
IProducer<DetectedFace> largestFace = emotionsDetector.Out.Select(e => e.OrderByDescending(e2 => e2.Box.Width * e2.Box.Height).ElementAt(0));
largestFace.Select(f => f.Box).Write(StoreNamesJurriaan.FACE_RECTANGLES_STREAM_NAME, exporter);
IProducer<Emotions> emotions = largestFace.Select(f => f.Emotions);
emotions.Select(e => e.Angry).Write(StoreNamesJurriaan.ANGRY_STREAM_NAME, exporter);
emotions.Select(e => e.Disgust).Write(StoreNamesJurriaan.DISGUST_STREAM_NAME, exporter);
emotions.Select(e => e.Fear).Write(StoreNamesJurriaan.FEAR_STREAM_NAME, exporter);
emotions.Select(e => e.Happy).Write(StoreNamesJurriaan.HAPPY_STREAM_NAME, exporter);
emotions.Select(e => e.Neutral).Write(StoreNamesJurriaan.NEUTRAL_STREAM_NAME, exporter);
emotions.Select(e => e.Sad).Write(StoreNamesJurriaan.SAD_STREAM_NAME, exporter);
emotions.Select(e => e.Surprise).Write(StoreNamesJurriaan.SURPRISE_STREAM_NAME, exporter);
//webcamStream.Do(frame => this.DrawFrame(frame));
},
StoreNamesJurriaan.DETECTED_FACES_STORE_NAME,
false,
StoreNamesJurriaan.DETECTED_FACES_STORE_NAME,
detectedFacesStoreWithVersionPath,
this.asFastAsPossible == true ? ReplayDescriptor.ReplayAll : ReplayDescriptor.ReplayAllRealTime,
null,
false,
this.replayProgress); The public class EmotionsDetector : IConsumer<Shared<Image>>, IProducer<List<DetectedFace>>, ISourceComponent, IDisposable
{
NetMQWriter<byte[]> frameWriter;
NetMQSource<dynamic> faceDetectionSource;
Connector<Shared<Image>> inFrames;
public EmotionsDetector(Pipeline pipeline)
{
this.frameWriter = new NetMQWriter<byte[]>(
pipeline,
"webcamFrames",
"tcp://127.0.0.1:12345",
MessagePackFormat.Instance
);
this.inFrames = pipeline.CreateConnector<Shared<Image>>(nameof(this.inFrames));
this.In = this.inFrames.In;
this.inFrames.Out
.EncodeJpeg(90)
.Select(frame => frame.Resource.GetBuffer())
.PipeTo(frameWriter);
this.faceDetectionSource = new NetMQSource<dynamic>(
pipeline,
"faces",
"tcp://127.0.0.1:12346",
MessagePackFormat.Instance
);
int i = 0;
this.Out = faceDetectionSource
.Select((faces, e) => ((IEnumerable<dynamic>)faces)
.Select(faceMsg =>
{
DetectedFace detectedFace = new DetectedFace
{
Box = new Rectangle(faceMsg["box"][0], faceMsg["box"][1], faceMsg["box"][2], faceMsg["box"][3]),
//Box = new Rectangle(0, 1, 20, 30),
Emotions = new Emotions
{
Angry = faceMsg["emotions"]["angry"],
Disgust = faceMsg["emotions"]["disgust"],
Fear = faceMsg["emotions"]["fear"],
Happy = faceMsg["emotions"]["happy"],
Neutral = faceMsg["emotions"]["neutral"],
Sad = faceMsg["emotions"]["sad"],
Surprise = faceMsg["emotions"]["surprise"]
}
};
Console.WriteLine(i++ + ": EmotionsDetector: " + detectedFace);
return detectedFace;
}).ToList()
).Out;
}
public Emitter<List<DetectedFace>> Out { get; private set; }
public Receiver<Shared<Image>> In { get; }
public void Dispose()
{
Console.WriteLine("Disposing EmotionsDetector");
this.frameWriter?.Dispose();
this.faceDetectionSource?.Dispose();
}
public void Start(Action<DateTime> notifyCompletionTime)
{
notifyCompletionTime(DateTime.MaxValue);
}
public void Stop(DateTime finalOriginatingTime, Action notifyCompleted)
{
faceDetectionSource.Stop(finalOriginatingTime, notifyCompleted);
this.Dispose();
}
} Any help is greatly appreciated! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
I think that what you’re running into is a common scenario and is pointing out that we need to improve the infrastructure. You’re correct that that pipeline is shutting down while results are still pending from the Python side. What we really need is a mechanism by which to signal the Python side when a \psi stream closes (data store exhausted) and similarly a mechanism for Python to signal when a ZeroMQ stream is complete and make Here is a workaround in the meantime: Firstly, the pipeline is shutting down when the store is complete because the pipeline.ProposeReplayTime(TimeInterval.LeftBounded(DateTime.UtcNow)); Next, we need to create a finite source that will “hold open” the pipeline. For example, to force the pipeline to continue running for 60 seconds: Generators.Repeat(pipeline, true, 60, TimeSpan.FromSeconds(1)); This assumes that we know that processing will require less than 60 seconds. Another solution may be to wait for a period of wall-clock time (say, 10 seconds) after the last message received from the Python side: var lastReceivedWallClockTime = DateTime.MaxValue;
this.faceDetectionSource.Do((_, e) => lastReceivedWallClockTime = DateTime.UtcNow);
IEnumerable<bool> WaitForQuiescence()
{
while (DateTime.UtcNow - lastReceivedWallClockTime < TimeSpan.FromSeconds(10))
{
yield return true;
}
}
Generators.Sequence(pipeline, WaitForQuiescence(), TimeSpan.FromSeconds(1)); Or we could do what you suggested and wait for matching originating times coming back from Python. This assumes that no messages will be dropped and that Python will return a result for every frame: var lastSentOriginatingTime = DateTime.MaxValue;
this.inFrames.Do((_, e) => lastSentOriginatingTime = e.OriginatingTime);
var lastReceivedOriginatingTime = DateTime.MinValue;
this.faceDetectionSource.Do((_, e) => lastReceivedOriginatingTime = e.OriginatingTime);
IEnumerable<bool> WaitForPending()
{
while (lastReceivedOriginatingTime < lastSentOriginatingTime)
{
yield return true;
}
}
Generators.Sequence(pipeline, WaitForPending(), TimeSpan.FromSeconds(1)); Additionally, to explain why the Hopefully this helps to unblock you! |
Beta Was this translation helpful? Give feedback.
I think that what you’re running into is a common scenario and is pointing out that we need to improve the infrastructure. You’re correct that that pipeline is shutting down while results are still pending from the Python side. What we really need is a mechanism by which to signal the Python side when a \psi stream closes (data store exhausted) and similarly a mechanism for Python to signal when a ZeroMQ stream is complete and make
NetMQSource
finite. We will open a work item on our end to look into this.Here is a workaround in the meantime: Firstly, the pipeline is shutting down when the store is complete because the
Importer
component internally proposes a replay time of the extents of…