Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Put receiver dropped packets into ring buffer #609

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

ForeverASilver
Copy link
Contributor

PR for #217

Added ring buffer for dropped packets, and pass them into new receiver session as they are created.

Added --max-session-packets and --max-sessions arguments to help determine maximum size for ring buffer.

@gavv gavv added ready for review Pull request can be reviewed contribution A pull-request by someone else except maintainers labels Oct 11, 2023
Copy link
Member

@gavv gavv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for PR!

@@ -10,6 +10,7 @@
#include "roc_address/socket_addr_to_str.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_core/parse_duration.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Unneeded include

Comment on lines 181 to 314
if (!rtcp_session_->is_valid()) {
drop_packet_(packet);
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We enter this branch if there were error initializing RTCP, e.g. allocation failure. If RTCP session failed to initialize, it is never fixed later, so there is no need to save packets for later.

It seems that we don't need to save control packets (route_control_packet_). We should bother only about transport packets (route_transport_packet_).

Comment on lines 190 to 325
bool ReceiverSessionGroup::can_create_session_(const packet::PacketPtr& packet) {
if (packet->flags() & packet::Packet::FlagRepair) {
roc_log(LogDebug, "session group: ignoring repair packet for unknown session");
drop_packet_(packet);
return false;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks surprising that method called "can_create_session_" (likes it's checking something) has such side-effects (saving packets).

Let's move saving packet from here to appropriate branch in route_transport_packet_().

Comment on lines 200 to 201
void ReceiverSessionGroup::drop_packet_(const packet::PacketPtr& packet_ptr) {
dropped_packet_buffer_.push_back(*packet_ptr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method actually does not drop packet, but saves it, the name "drop_packet_" looks a bit confusing.

I suggest to name this feature "pre-buffering", meaning that we're buffering packets in advance for session that was not created yet.

Then, we can rename things to something like:

  • drop_packet_ => enqueue_prebuf_packet_
  • handle_buffer_ => dequeue_prebuf_packets_
  • dropped_packet_buffer_ => prebuf_packets_

...or something like this.

Comment on lines 206 to 320
if (max_size == 0) {
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a dangerous behavior. It means that if max_sessions or max_session_packets is zero, we can accumulate pre-buffered packets indefinitely. If user hasn't set one of the limits, and some peer sends us some ill-formed packets (intentionally or not), and we never create session for those packets, we will consume more and more memory until we eat everything available on system.

Given it second thought, it seems that using max_session_packets * max_sessions formula here is not so good idea. I suggest to change approach a bit: instead of max_session_packets and max_sessions, introduce prebuf_len setting, measured in nanoseconds (core::nanoseconds_t).

Instead of limiting number of packets, it will limit the age of packets. When the age of the oldest (i.e. the first) packet in buffer becomes larger than prebuf_len, that packet is removed from the buffer.

We have a good default value for prebuf_len: if the user did not set it, it will be equal to target_latency - i.e. we will accumulate as much packets as needed for session start. We can add prebuf_len to ReceiverSessionConfig. We also need to add deduce_prebuf_len(), that sets prebuf_len to target_latency, and is called if the user did not provide --prebuf-len explicitly (similar to deduce_resampler_backend()).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To compute the age of a packet, we'll need to add receive timestamps to packets. Then, the age can be computed as now - receive_timestamp.

It should be easy:

@gavv
Copy link
Member

gavv commented Oct 12, 2023

We also need to cover this feature in pipeline tests. (We cover all important aspects like this).

Pipeline tests for receiver are here: https://github.com/roc-streaming/roc-toolkit/blob/develop/src/tests/roc_pipeline/test_receiver_source.cpp

Those tests work as follows:

  • create pipeline::ReceiverSource
  • using test::PacketWriter, prepare and write packets to receiver, simulating delivery from network
  • using test::FrameReader, read and inspect audio frames from receiver, simulating audio playback thread

Basically, receiver converts sequence of network packets into sequence of audio frames. Each tests prepares its own sequence of packets and checks what audio receiver produces for it. For example, a test may reorder packets and ensure that receiver restores correct order. Or a test can generate packets for multiple sessions and ensure that receiver will mix them together into one stream.

The "Tests" section in the issue should give an idea what kind of tests would be fine for this feature. Let me know if you have questions.

@gavv gavv added needs revision Pull request should be revised by its author and removed ready for review Pull request can be reviewed labels Oct 12, 2023
@ForeverASilver
Copy link
Contributor Author

@gavv
any other pointers you could give for making this test?
so far im a bit stuck as to how to go about this

@gavv
Copy link
Member

gavv commented Oct 14, 2023

Yeah, our pipeline tests are quite complicated. However, over time they prevent A LOT of regressions, so it's worth it.


Currently, the main use-case for pre-bufferring is preserving repair packets delivered before first source packet. So we should use this use-case for the test.

There are three types of packets: source, repair, and control. Source packets contain audio data. Repair (FEC) packets contain redundancy data that may be used to restore lost source packets, if needed. Please read this page first: https://roc-streaming.org/toolkit/docs/internals/fec.html

Without pre-bufferring, all repair packets received before first source packet are dropped, because we can create session only based on source packet. With pre-buffering, those repair packet are saved and then "delivered" right after delivering the first source packet.

Hence, the test should simulate situation when repair packets go before source packets. And it should pass if pre-buffering works fine and fail if it doesn't.

Repair packets are needed to restore losses. Hence, we need to simulate situation when repair packets go before source packets AND some source packets are lost. In this case, if pre-bufferring works fine, receiver will successfully recover losses, otherwise it won't.

Test will produce sequence of source and repair packets, pass it to receiver, and inspect audio produced by receiver. If receiver produces correct audio despite losses, the test passes.


Tests for receiver pipeline (test_receiver_source.cpp) use test::PacketWriter to produce source packets and test::ControlWriter to produce control packets. Currently this group of tests does not use repair packets, so we'll need to add it.

Currently, test::PacketWriter generates sequence of packets using packet factory (to allocate packet), encoder (to fill packet with samples), and composer (to serialize packet into bytes).

Now we need to teach test::PacketWriter to use fec::Writer in addition. fec::Writer works rather simple. It implements IWriter interface, and in its constructor you pass it next writer. Then you write source packets to fec::Writer, and it just writes all these packets to the next writer, and in addition it generates repair packets and also writes them to next writer. In other words, you pass a sequence of source packets to fec::Writer, it it produces a sequence of source and repair packets.

So, we should add a mode for test::PacketWriter when it uses fec::Writer to produce repair packets in addition to the source packets. In this mode, it'll need to use different composers for source and repair packets. The flow will become the following: first use packet factory (to allocate source packet), then encoder (to fill source packet with samples), then fec::Writer (to generate repair packets), then composers (to serialize source and repair packets into bytes).

Example of using fec::Writer in tests, including creating and using composers, can be found here: https://github.com/roc-streaming/roc-toolkit/blob/develop/src/tests/roc_fec/test_writer_reader.cpp


The sequence of source and repair packets produced by fec::Writer will look like:

[S-1.1] [S-1.2] [S-1.3] ... [R-1.1] [R-1.2] [R-1.3] ...
[S-2.1] [S-2.2] [S-2.3] ... [R-2.1] [R-2.2] [R-2.3] ...
...

Where S-X.Y is source packet number Y of FEC block number X, and R-X.Y is repair packet. You can see that source and repair packets are divided into blocks. Repair packets usually come in the end of the block.

For the purpose of our test, we can alter the sequence of packets:

  • reorder them, so that repair packets go in the beginning of the first block instead of in the end
  • drop one or a few source packets in the middle of the first block

Then we should check that audio produced by receiver is expected sequence, using test::FrameReader, same way as we do in all other tests. If receiver isn't able to restore losses, audio will have zeros in places of those packets, and the test will fail.

@gavv
Copy link
Member

gavv commented Oct 14, 2023

Short summary of the above:

  • teach test::PacketWriter to produce repair packets using fec::Writer and proper composers for source and repair packets

  • add new test to test_receiver_source.cpp that does the following:

    • tells test::PacketWriter to produce source and repair packets
    • alters sequence of packets produced by test::PacketWriter:
      • reorder packets, so that repair packets go in the beginning of the first block instead of in the end
      • drops one or a few source packets in the middle of the first block
    • passes resulting packet sequence to receiver pipeline
    • reads and checks audio from receiver pipeline using test::FrameReader

If pre-bufferring works, test::FrameReader will see complete audio stream. If pre-bufferring does not work, test::FrameReader will see unexpected zeros where lost packets were not restored, and will fail.

@gavv
Copy link
Member

gavv commented Oct 14, 2023

A word on how we can alter the sequence of packets. test::PacketWriter writes produced packets to IWriter. Usually we configure it to write packets directly in receiver pipeline (into its endpoint writers).

Instead of that, our test can configure it to write packets to packet::Queue. Then test can read packets from the queue and write them to pipeline in different order.

@gavv
Copy link
Member

gavv commented Oct 14, 2023

All this sounds quite big, but I think the actual implementation shouldn't be that complicated once you'll get familiar with the context. In test_receiver_source.cpp we have many relatively simple and short 50-lines tests that still would need many words to describe them from scratch :)

If you'll have any more questions, feel free to ask here on in chat. And thanks for looking into this.

@gavv
Copy link
Member

gavv commented Oct 14, 2023

Added hacktoberfest-accepted just in case you need it.

@ForeverASilver
Copy link
Contributor Author

@gavv
looked at this on and off for the last few days trying to see what i could come up with while looking at test_writer_reader.cpp

so far ive gotten a packet writer set up with an LDPC source composer and an fec writer, but when it comes to writing the packets using PacketWriter, i get an error from fec writer saying unexpected non-fec packet and am not sure how to deal with it.

I would have assumed the fec data would have been set by a composer at some point before being sent off to the fec writer.

Any help would be greatly appreciated!

@gavv
Copy link
Member

gavv commented Oct 17, 2023

You can push your code somewhere (this PR or other branch) and I'll take a look.

@ForeverASilver
Copy link
Contributor Author

@gavv
pushed the commit whenever you have a chance to take a look

@gavv
Copy link
Member

gavv commented Nov 2, 2023

Hi, sorry for delay, I was occupied on other projects last weeks.

Actually you were very close, you configured everything correctly. The reason of the panic is quite subtle.

If you look here:

packet::PacketPtr new_packet_(size_t samples_per_packet,

you can see that before passing packet to destination writer, PacketWriter creates a copy of the packet from the buffer of the original packet, and uses that copy.

When we receive packet from network, it has only buffer with data. However as packet goes through the pipeline, it gets populated with various meta-information, like flags, parsed fields, etc. However, in pipeline tests, we're simulating how receiver handles packets received from network, and so we should pass to the receiver packets without all these meta-information. So we "strip" it by making a copy of packet that has only buffer with data.

However, fec::Writer lives in the middle of the pipeline, and expected that packet will already have all necessary meta-information. And you were passing to it packets from PacketWriter, which are already stripped. Hence you got a panic.

The solution is to move fec::Writer inside test::PacketWriter. PacketWriter should create packet, pass it to fec::Writer, and only then strip packets and pass them to destination writer.

I've implemented this approach and pushed to develop: 3f4d3ed

PacketWriter now has two constructors, one to work without FEC, and another to work with FEC. Currently all tests use version without FEC, but you can use the new version in your test.

I also refactored PacketWriter and add comments to make the behavior I described more obvious from code.

Here is the updated version of your tests that compiles and runs (though I didn't try to verify its logic, just fixed errors):

TEST(receiver_source, packet_buffer) {
    packet::PacketPtr source_packets[20];

    enum { Rate = SampleRate, Chans = Chans_Stereo, MaxPackets = 10 };

    init(Rate, Chans, Rate, Chans);

    ReceiverConfig config = make_config();
    config.default_session.prebuf_len = 0;
    ReceiverSource receiver(config, format_map, packet_factory, byte_buffer_factory,
                            sample_buffer_factory, arena);
    CHECK(receiver.is_valid());

    ReceiverSlot* slot = create_slot(receiver);
    CHECK(slot);

    packet::IWriter* source_endpoint_writer =
        create_endpoint(slot, address::Iface_AudioSource, address::Proto_RTP_LDPC_Source);
    CHECK(source_endpoint_writer);

    packet::IWriter* repair_endpoint_writer =
        create_endpoint(slot, address::Iface_AudioRepair, address::Proto_LDPC_Repair);
    CHECK(repair_endpoint_writer);

    fec::WriterConfig fec_config;

    test::PacketWriter packet_writer(
        arena, *source_endpoint_writer, *repair_endpoint_writer, format_map,
        packet_factory, byte_buffer_factory, src1, dst1, dst2, PayloadType_Ch2,
        packet::FEC_LDPC_Staircase, fec_config);

    // setup reader
    test::FrameReader frame_reader(receiver, sample_buffer_factory);

    packet_writer.write_packets(fec_config.n_source_packets, SamplesPerPacket,
                                output_sample_spec);

    receiver.refresh(frame_reader.refresh_ts());
    frame_reader.read_nonzero_samples(SamplesPerFrame, output_sample_spec);
    UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions());
}

A few more notes:

  • you need to create two endpoints on receiver, one for source and another for repair packets
  • in FEC mode, test::PacketWriter has two destination writers (source and repair); the test can use packet::Queue as destination writers and then modify packet sequence before passing them to endpoints (e.g. drop some packets or reorder)
  • it's better to switch from ldpc to rs8m FEC scheme because rs8m is considered "default" and ldpc may become optional in future (may be disabled at build-time)

@ForeverASilver
Copy link
Contributor Author

Thanks for the update and explanation! Ill update my tests once i get the chance to

@@ -286,6 +286,7 @@ void UdpReceiverPort::recv_cb_(uv_udp_t* handle,

pp->udp()->src_addr = src_addr;
pp->udp()->dst_addr = self.config_.bind_address;
pp->udp()->recieve_timestamp = core::timestamp(core::ClockMonotonic);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For timestamping audio frames we use core::ClockUnix

cc @gavv

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I think recieve_timestamp should use the same.

@gavv gavv force-pushed the ring_buffer branch 2 times, most recently from 93c939f to 83aa7be Compare November 3, 2023 23:49
@gavv
Copy link
Member

gavv commented Nov 3, 2023

Sorry, I accidentally pushed to PR's branch, I returned it back to the same commit.

Copy link

github-actions bot commented Nov 6, 2023

☔ The latest upstream change (presumably these) made this pull request unmergeable. Please resolve the merge conflicts.

@github-actions github-actions bot added the needs rebase Pull request has conflicts and should be rebased label Nov 6, 2023
@gavv
Copy link
Member

gavv commented Nov 6, 2023

Based on discussion here, I've added some docs:

I also added new packet flag (f7e3a92) that will make panic that happened here a bit more clear (it will state that prepare() was not called on a packet passed to fec writer, and the role of prepare() is now documented in the page I linked above)

@github-actions github-actions bot removed the needs rebase Pull request has conflicts and should be rebased label Nov 12, 2023
Copy link

🤖 The latest upstream change made this pull request unmergeable. Please resolve the merge conflicts.

@github-actions github-actions bot added the needs rebase Pull request has conflicts and should be rebased label Nov 19, 2023
Copy link

🤖 The latest upstream change made this pull request unmergeable. Please resolve the merge conflicts.

@gavv
Copy link
Member

gavv commented Nov 19, 2023

Hi, I'm preparing 0.3.0 release and have rebased develop on master (this workflow is described here).

Please reset develop in your fork to up-to-date version and rebase your PR on that. (You can use git rebase --onto or just cherry-pick your commits.)

@github-actions github-actions bot removed the needs rebase Pull request has conflicts and should be rebased label Dec 22, 2023
@gavv
Copy link
Member

gavv commented Dec 22, 2023

Hey, I've rebased the patch on develop to keep PR alive.

@github-actions github-actions bot added the needs rebase Pull request has conflicts and should be rebased label Dec 29, 2023
Copy link

🤖 The latest upstream change made this pull request unmergeable. Please resolve the merge conflicts.

@github-actions github-actions bot removed the needs rebase Pull request has conflicts and should be rebased label May 30, 2024
@gavv
Copy link
Member

gavv commented May 30, 2024

Rebased on fresh develop (didn't touch logic).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
contribution A pull-request by someone else except maintainers hacktoberfest-accepted needs revision Pull request should be revised by its author
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants