Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory usage of SharedImagePool keeps growing #285

Open
KanaHayama opened this issue Jun 8, 2023 · 3 comments
Open

Memory usage of SharedImagePool keeps growing #285

KanaHayama opened this issue Jun 8, 2023 · 3 comments

Comments

@KanaHayama
Copy link

Hello, I am writing a composite component. The memory usage of the shared memory pool grows when a Join operator does not receive data from its primary stream.

The secondary stream of that Join operator is a video stream. I set delivery policies to LatestMessage so nothing should be queued at receivers' queues.
The growth of memory usage stops when a new data frame is available on the primary stream. That means, images on the secondary stream are queued, and the shared image pool is requesting more memory from the operating system.

I do not know why this happens, please help. Here is my code:

public sealed class ActionUnitDetector : Subpipeline, IProducer<IReadOnlyList<IReadOnlyDictionary<int, float>>> {

    private readonly Connector<IReadOnlyList<NormalizedLandmarkList>> _inConnector;
    private readonly Connector<Shared<Image>> _imageInConnector;
    private readonly Connector<IReadOnlyList<IReadOnlyDictionary<int, float>>> _outConnector;
    private readonly Connector<IReadOnlyList<Shared<Image>>> _alignedImagesOutConnector;

    public Receiver<IReadOnlyList<NormalizedLandmarkList>> DataIn => _inConnector.In;
    public Receiver<Shared<Image>> ImageIn => _imageInConnector.In;

    public Emitter<IReadOnlyList<IReadOnlyDictionary<int, float>>> Out => _outConnector.Out;
    public Emitter<IReadOnlyList<Shared<Image>>> AlignedImagesOut => _alignedImagesOutConnector.Out;

    public ActionUnitDetector(Pipeline pipeline) : base(pipeline, nameof(ActionUnitDetector), DeliveryPolicy.LatestMessage) {
        _inConnector = CreateInputConnectorFrom<IReadOnlyList<NormalizedLandmarkList>>(pipeline, nameof(DataIn));
        _imageInConnector = CreateInputConnectorFrom<Shared<Image>>(pipeline, nameof(ImageIn));

        _outConnector = CreateOutputConnectorTo<IReadOnlyList<IReadOnlyDictionary<int, float>>>(pipeline, nameof(Out));
        _alignedImagesOutConnector = CreateOutputConnectorTo<IReadOnlyList<Shared<Image>>>(pipeline, nameof(AlignedImagesOut));

        var convertedImage = _imageInConnector
            .Convert(PixelFormat.RGB_24bpp, DeliveryPolicy.LatestMessage);
        var aligner = new FaceImageAligner(this);
        _inConnector.Join(
                convertedImage,
                Reproducible.Exact<Shared<Image>>(),
                ValueTuple.Create,
                DeliveryPolicy.LatestMessage,
                DeliveryPolicy.LatestMessage
            )//TODO: shared image pool grows too large when no face is detected! Fuse() does not help; swapping streams does not help. Why?
            .PipeTo(aligner, DeliveryPolicy.LatestMessage);
        aligner.PipeTo(_alignedImagesOutConnector, DeliveryPolicy.LatestMessage);

        var inferenceRunner = new InferenceRunner(this);
        aligner.PipeTo(inferenceRunner, DeliveryPolicy.LatestMessage);
        inferenceRunner.PipeTo(_outConnector, DeliveryPolicy.LatestMessage);
    }
}

Thank you

@sandrist
Copy link
Contributor

One approach for getting to the bottom of this is to use Pipeline Diagnostics. Create your pipeline with enableDiagnostics set to true, persist the Diagnostics stream to a store, and then visualize it in PsiStudio. You'll be able to visualize all stream connections to quickly pinpoint, e.g., if you're missing a LatestMessage delivery policy anywhere. You can also inspect all the delivery queue sizes to see exactly where things are filling up. Take a look and let us know if that reveals anything.

@KanaHayama
Copy link
Author

Thank you. Missing a LatestMessage might be a reason. I will reply with my findings.

@danbohus
Copy link
Contributor

I think I have an idea of what might be going on. I suspect the images are getting queued inside the Join component. In order to perform synchronization, join internally holds its own queues with the messages it receives on both the primary and secondary streams (you can take a look at the implementation here). This internal queue is inside the component, and different from the normal psi delivery queue on each stream (which is controlled with the delivery policy). This is necessary to be able to correctly synchronize based on originating times. Imagine you have images (your secondary stream) arriving with originating times 0, 1, 2, 3, ... 100, and the primary (clock in your case) stream has originating times 0, 10, 20, 30, ... 100. Then, after first pairing (0, 0), join will internally queue all the images for originating time 1, 2, 3, ... and so on up until it receives the next clock message. That would be the clock message with originating time 10 (so we need to hold on to messages 1, 2, 3, etc. since we don't know what the next clock that arrives is). Theoretically the clock message with originating time 10 could arrive even later than "time 10", b/c of latencies. Join has to queue and operate that way in order to correctly synchronize. It will release the secondary messages from its internal queue only once it can prove that those messages will never be needed to synchronize with any other primary message.

Now, as to how to change your code to address this increasing memory issue, my suggestion would be to use a dense clock stream. (And this is a more general comment, generally you want to join dense streams). From your comments, it looks like your clock stream is a sparse stream perhaps signaling that a face was detected. Instead, can you construct a dense boolean stream (false when no face, true when face is detected) and do the join, and then filter out on the resulting tuples the ones that do not correspond to a face by using a Where() operator.

Another alternative you can use, but this loses the exact match synchronization based on originating times is the Pair operator, or a Fuse with the Available.LastOrDefault interpolator. You can read more about it here, if you look for explanations about Pair and Available.LastOrDefault. In this case the secondary messages are not queued internally, but rather only the last secondary messages is memorized. However, that also means that the pairing of messages that happens is no longer a guaranteed synchronization on originating times, as the results of the pairing will depend on the wall clock time of arrival (and hence latency) of the messages.

Hope this helps, but let us know what you find.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants