Skip to content

Commit

Permalink
Smaller MAX_PACKET_SIZE and better batch cache locality (#816)
Browse files Browse the repository at this point in the history
  • Loading branch information
droe committed Mar 10, 2024
1 parent afbeb0b commit 7dce05c
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/probe_modules/module_upnp.c
Expand Up @@ -62,7 +62,7 @@ int upnp_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw,
MAX_PACKET_SIZE);

assert(MAX_PACKET_SIZE - ((char *)payload - (char *)buf) >
(int)strlen(upnp_query));
strlen(upnp_query));
strcpy(payload, upnp_query);

return EXIT_SUCCESS;
Expand Down
3 changes: 1 addition & 2 deletions src/probe_modules/packet.h
Expand Up @@ -18,12 +18,11 @@
#include "../../lib/blocklist.h"
#include "../../lib/pbm.h"
#include "../state.h"
#include "../send.h"

#ifndef PACKET_H
#define PACKET_H

#define MAX_PACKET_SIZE 4096

#define ICMP_UNREACH_HEADER_SIZE 8

#define PACKET_VALID 1
Expand Down
12 changes: 6 additions & 6 deletions src/recv-netmap.c
Expand Up @@ -50,9 +50,9 @@ send_packet(make_packet_func_t mkpkt, void const *arg)
sock.nm.tx_ring_fd = zconf.nm.nm_fd;

batch_t *batch = create_packet_batch(1);
batch->lens[0] = (int)mkpkt((uint8_t *)batch->packets, arg);
assert(batch->lens[0] <= MAX_PACKET_SIZE);
batch->ips[0] = 0; // unused by netmap
batch->packets[0].ip = 0; // unused by netmap
batch->packets[0].len = mkpkt(batch->packets[0].buf, arg);
assert(batch->packets[0].len <= MAX_PACKET_SIZE);
batch->len = 1;
if (send_batch_internal(sock, batch) != 1) {
log_fatal("recv-netmap", "Failed to send packet: %d: %s", errno, strerror(errno));
Expand All @@ -67,9 +67,9 @@ static void
submit_packet(make_packet_func_t mkpkt, void const *arg)
{
batch_t *batch = create_packet_batch(1);
batch->lens[0] = (int)mkpkt((uint8_t *)batch->packets, arg);
assert(batch->lens[0] <= MAX_PACKET_SIZE);
batch->ips[0] = 0; // unused by netmap
batch->packets[0].ip = 0; // unused by netmap
batch->packets[0].len = mkpkt(batch->packets[0].buf, arg);
assert(batch->packets[0].len <= MAX_PACKET_SIZE);
batch->len = 1;
submit_batch_internal(batch); // consumes batch
}
Expand Down
4 changes: 2 additions & 2 deletions src/send-bsd.c
Expand Up @@ -87,7 +87,7 @@ send_batch(sock_t sock, batch_t* batch, int retries)
int rc = 0;
for (int packet_num = 0; packet_num < batch->len; packet_num++) {
for (int retry_ct = 0; retry_ct < retries; retry_ct++) {
rc = send_packet(sock, ((uint8_t *)batch->packets) + (packet_num * MAX_PACKET_SIZE), batch->lens[packet_num], retry_ct);
rc = send_packet(sock, batch->packets[packet_num].buf, batch->packets[packet_num].len, retry_ct);
if (rc >= 0) {
packets_sent++;
break;
Expand All @@ -96,7 +96,7 @@ send_batch(sock_t sock, batch_t* batch, int retries)
if (rc < 0) {
// packet couldn't be sent in retries number of attempts
struct in_addr addr;
addr.s_addr = batch->ips[packet_num];
addr.s_addr = batch->packets[packet_num].ip;
char addr_str_buf[INET_ADDRSTRLEN];
const char *addr_str =
inet_ntop(
Expand Down
6 changes: 3 additions & 3 deletions src/send-linux.c
Expand Up @@ -77,8 +77,8 @@ int send_batch(sock_t sock, batch_t* batch, int retries) {

for (int i = 0; i < batch->len; ++i) {
struct iovec *iov = &iovs[i];
iov->iov_base = ((void *)batch->packets) + (i * MAX_PACKET_SIZE);
iov->iov_len = batch->lens[i];
iov->iov_base = batch->packets[i].buf;
iov->iov_len = batch->packets[i].len;
struct msghdr *msg = &msgs[i];
memset(msg, 0, sizeof(struct msghdr));
// based on https://github.com/torvalds/linux/blob/master/net/socket.c#L2180
Expand All @@ -87,7 +87,7 @@ int send_batch(sock_t sock, batch_t* batch, int retries) {
msg->msg_iov = iov;
msg->msg_iovlen = 1;
msgvec[i].msg_hdr = *msg;
msgvec[i].msg_len = batch->lens[i];
msgvec[i].msg_len = batch->packets[i].len;
}
// set up per-retry variables, so we can only re-submit what didn't send successfully
struct mmsghdr* current_msg_vec = msgvec;
Expand Down
9 changes: 3 additions & 6 deletions src/send-netmap.c
Expand Up @@ -89,12 +89,9 @@ send_batch_internal(sock_t sock, batch_t *batch)
return -1;
}

void *src_buf = (void *)((uint8_t *)batch->packets + i * MAX_PACKET_SIZE);
int len = batch->lens[i];
assert((uint32_t)len <= ring->nr_buf_size);

void *dst_buf = NETMAP_BUF(ring, ring->slot[ring->cur].buf_idx);
memcpy(dst_buf, src_buf, len);
uint32_t len = batch->packets[i].len;
assert(len <= ring->nr_buf_size);
memcpy(NETMAP_BUF(ring, ring->slot[ring->cur].buf_idx), batch->packets[i].buf, len);
ring->slot[ring->cur].len = len;
ring->head = ring->cur = nm_ring_next(ring, ring->cur);
}
Expand Down
32 changes: 11 additions & 21 deletions src/send.c
Expand Up @@ -26,6 +26,7 @@
#include "../lib/blocklist.h"
#include "../lib/lockfd.h"
#include "../lib/pbm.h"
#include "../lib/xalloc.h"

#include "send-internal.h"
#include "aesrand.h"
Expand Down Expand Up @@ -402,9 +403,9 @@ int send_run(sock_t st, shard_t *s)
// this is an additional memcpy (packet created in buf, buf -> batch)
// but when I modified the TCP SYN module to write packet to batch directly, there wasn't any noticeable speedup.
// Using this approach for readability/minimal changes
memcpy(((uint8_t *)batch->packets) + (batch->len * MAX_PACKET_SIZE), contents, length);
batch->lens[batch->len] = length;
batch->ips[batch->len] = current_ip;
batch->packets[batch->len].ip = current_ip;
batch->packets[batch->len].len = length;
memcpy(batch->packets[batch->len].buf, contents, length);
batch->len++;
if (batch->len == batch->capacity) {
// batch is full, sending
Expand Down Expand Up @@ -464,27 +465,16 @@ int send_run(sock_t st, shard_t *s)
return EXIT_SUCCESS;
}

batch_t* create_packet_batch(uint16_t capacity) {
// calculate how many bytes are needed for each component of a batch
int size_of_packet_array = MAX_PACKET_SIZE * capacity;
int size_of_ips_array = sizeof(uint32_t) * capacity;
int size_of_lens_array = sizeof(int) * capacity;

// allocating batch and associated data structures in single calloc for cache locality
void* batch_and_batch_arrs = calloc(sizeof(batch_t) + size_of_packet_array + size_of_ips_array + size_of_lens_array, sizeof(char));
// chunk off parts of batch
batch_t* batch = batch_and_batch_arrs;
batch->packets = (char *)batch + sizeof(batch_t);
batch->ips = (uint32_t *)(batch->packets + size_of_packet_array);
batch->lens = (int *) ((char *)batch->ips + size_of_ips_array);

batch_t *create_packet_batch(uint16_t capacity) {
// allocating batch and associated data structures in single xmalloc for cache locality
batch_t *batch = (batch_t *)xmalloc(sizeof(batch_t) + capacity * sizeof(struct batch_packet));
batch->packets = (struct batch_packet *)(batch + 1);
batch->capacity = capacity;
batch->len = 0;

return batch;
}

void free_packet_batch(batch_t* batch) {
// batch was created with a single calloc, so this will free all the component arrays too
free(batch);
void free_packet_batch(batch_t *batch) {
// batch was created with a single xmalloc, so this will free the component array too
xfree(batch);
}
17 changes: 14 additions & 3 deletions src/send.h
Expand Up @@ -15,10 +15,21 @@
iterator_t *send_init(void);
int send_run(sock_t, shard_t *);

// Fit two packets with metadata into one 4k page.
// 2k seems like more than enough with typical MTU of
// 1500, and we don't want to cause IP fragmentation.
#define MAX_PACKET_SIZE (2048 - 2 * sizeof(uint32_t))

// Metadata and initial packet bytes are adjacent,
// for cache locality esp. with short packets.
struct batch_packet {
uint32_t ip;
uint32_t len;
uint8_t buf[MAX_PACKET_SIZE];
};

typedef struct {
char* packets;
uint32_t* ips;
int* lens;
struct batch_packet *packets;
uint16_t len;
uint16_t capacity;
}batch_t;
Expand Down
1 change: 0 additions & 1 deletion src/state.h
Expand Up @@ -23,7 +23,6 @@
#include "filter.h"
#include "types.h"

#define MAX_PACKET_SIZE 4096
#define MAC_ADDR_LEN_BYTES 6

#define DEDUP_METHOD_DEFAULT 0
Expand Down

0 comments on commit 7dce05c

Please sign in to comment.