Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Less copying, more batch optimisation #820

Merged
merged 2 commits into from Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/probe_modules/module_bacnet.c
Expand Up @@ -37,7 +37,16 @@ static inline uint8_t get_invoke_id(uint32_t *validation)
return (uint8_t)((validation[1] >> 24) & 0xFF);
}

int bacnet_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg)
int bacnet_init_perthread(void **arg)
{
uint32_t seed = aesrand_getword(zconf.aes);
aesrand_t *aes = aesrand_init_from_seed(seed);
*arg = aes;

return EXIT_SUCCESS;
}

int bacnet_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, UNUSED void *arg)
{
memset(buf, 0, MAX_PACKET_SIZE);
struct ether_header *eth_header = (struct ether_header *)buf;
Expand Down Expand Up @@ -67,10 +76,6 @@ int bacnet_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg)
bnp->apdu.server_choice = 0x0c;
memcpy(body, bacnet_body, BACNET_BODY_LEN);

uint32_t seed = aesrand_getword(zconf.aes);
aesrand_t *aes = aesrand_init_from_seed(seed);
*arg = aes;

return EXIT_SUCCESS;
}

Expand Down Expand Up @@ -184,8 +189,9 @@ probe_module_t module_bacnet = {.name = "bacnet",
.pcap_filter = "udp || icmp",
.pcap_snaplen = 1500,
.port_args = 1,
.thread_initialize = &bacnet_init_perthread,
.global_initialize = &bacnet_global_initialize,
.thread_initialize = &bacnet_init_perthread,
.prepare_packet = &bacnet_prepare_packet,
.make_packet = &bacnet_make_packet,
.print_packet = &udp_print_packet,
.validate_packet = &bacnet_validate_packet,
Expand Down
8 changes: 4 additions & 4 deletions src/probe_modules/module_dns.c
Expand Up @@ -735,8 +735,8 @@ static int dns_global_cleanup(UNUSED struct state_conf *zconf,
return EXIT_SUCCESS;
}

int dns_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void **arg_ptr)
int dns_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void *arg_ptr)
{
memset(buf, 0, MAX_PACKET_SIZE);

Expand Down Expand Up @@ -902,7 +902,7 @@ void dns_add_null_fs(fieldset_t *fs)

void dns_process_packet(const u_char *packet, uint32_t len, fieldset_t *fs,
uint32_t *validation,
__attribute__((unused)) struct timespec ts)
UNUSED struct timespec ts)
{
struct ip *ip_hdr = (struct ip *)&packet[sizeof(struct ether_header)];
if (ip_hdr->ip_p == IPPROTO_UDP) {
Expand Down Expand Up @@ -1111,8 +1111,8 @@ probe_module_t module_dns = {
.pcap_filter = "udp || icmp",
.pcap_snaplen = PCAP_SNAPLEN,
.port_args = 1,
.thread_initialize = &dns_init_perthread,
.global_initialize = &dns_global_initialize,
.prepare_packet = &dns_prepare_packet,
.make_packet = &dns_make_packet,
.print_packet = &dns_print_packet,
.validate_packet = &dns_validate_packet,
Expand Down
6 changes: 3 additions & 3 deletions src/probe_modules/module_icmp_echo.c
Expand Up @@ -137,8 +137,8 @@ int icmp_global_cleanup(__attribute__((unused)) struct state_conf *zconf,
return EXIT_SUCCESS;
}

static int icmp_echo_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void **arg_ptr)
static int icmp_echo_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void *arg_ptr)
{
memset(buf, 0, MAX_PACKET_SIZE);

Expand Down Expand Up @@ -330,7 +330,7 @@ probe_module_t module_icmp_echo = {
.port_args = 0,
.global_initialize = &icmp_global_initialize,
.close = &icmp_global_cleanup,
.thread_initialize = &icmp_echo_init_perthread,
.prepare_packet = &icmp_echo_prepare_packet,
.make_packet = &icmp_echo_make_packet,
.print_packet = &icmp_echo_print_packet,
.process_packet = &icmp_echo_process_packet,
Expand Down
6 changes: 3 additions & 3 deletions src/probe_modules/module_icmp_echo_time.c
Expand Up @@ -34,8 +34,8 @@ struct icmp_payload_for_rtt {
ipaddr_n_t dst;
};

static int icmp_echo_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void **arg_ptr)
static int icmp_echo_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void *arg_ptr)
{
memset(buf, 0, MAX_PACKET_SIZE);

Expand Down Expand Up @@ -259,7 +259,7 @@ probe_module_t module_icmp_echo_time = {
.pcap_filter = "icmp and icmp[0]!=8",
.pcap_snaplen = 96,
.port_args = 0,
.thread_initialize = &icmp_echo_init_perthread,
.prepare_packet = &icmp_echo_prepare_packet,
.make_packet = &icmp_echo_make_packet,
.print_packet = &icmp_echo_print_packet,
.process_packet = &icmp_echo_process_packet,
Expand Down
6 changes: 3 additions & 3 deletions src/probe_modules/module_ipip.c
Expand Up @@ -133,8 +133,8 @@ int ipip_global_cleanup(UNUSED struct state_conf *zconf,
return EXIT_SUCCESS;
}

int ipip_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void **arg_ptr)
int ipip_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void *arg_ptr)
{
memset(buf, 0, MAX_PACKET_SIZE);
struct ether_header *eth_header = (struct ether_header *)buf;
Expand Down Expand Up @@ -403,8 +403,8 @@ probe_module_t module_ipip = {
.pcap_filter = "udp || icmp",
.pcap_snaplen = 1500,
.port_args = 1,
.thread_initialize = &ipip_init_perthread,
.global_initialize = &ipip_global_initialize,
.prepare_packet = &ipip_prepare_packet,
.make_packet = &ipip_make_packet,
.print_packet = &ipip_print_packet,
.validate_packet = &ipip_validate_packet,
Expand Down
18 changes: 12 additions & 6 deletions src/probe_modules/module_ntp.c
Expand Up @@ -154,7 +154,16 @@ void ntp_process_packet(const u_char *packet, UNUSED uint32_t len,
}
}

int ntp_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg)
int ntp_init_perthread(void **arg)
{
uint32_t seed = aesrand_getword(zconf.aes);
aesrand_t *aes = aesrand_init_from_seed(seed);
*arg = aes;

return EXIT_SUCCESS;
}

int ntp_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, UNUSED void *arg)
{
memset(buf, 0, MAX_PACKET_SIZE);
struct ether_header *eth_header = (struct ether_header *)buf;
Expand All @@ -176,10 +185,6 @@ int ntp_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg)
sizeof(struct udphdr) + sizeof(struct ntphdr);
module_ntp.max_packet_length = header_len;

uint32_t seed = aesrand_getword(zconf.aes);
aesrand_t *aes = aesrand_init_from_seed(seed);
*arg = aes;

return EXIT_SUCCESS;
}

Expand Down Expand Up @@ -240,8 +245,9 @@ probe_module_t module_ntp = {.name = "ntp",
.pcap_filter = "udp || icmp",
.pcap_snaplen = 1500,
.port_args = 1,
.thread_initialize = &ntp_init_perthread,
.global_initialize = &ntp_global_initialize,
.thread_initialize = &ntp_init_perthread,
.prepare_packet = &ntp_prepare_packet,
.make_packet = &udp_make_packet,
.print_packet = &ntp_print_packet,
.validate_packet = &ntp_validate_packet,
Expand Down
6 changes: 3 additions & 3 deletions src/probe_modules/module_tcp_synackscan.c
Expand Up @@ -34,8 +34,8 @@ static int synackscan_global_initialize(struct state_conf *state)
return EXIT_SUCCESS;
}

static int synackscan_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void **arg_ptr)
static int synackscan_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void *arg_ptr)
{
memset(buf, 0, MAX_PACKET_SIZE);
struct ether_header *eth_header = (struct ether_header *)buf;
Expand Down Expand Up @@ -211,7 +211,7 @@ probe_module_t module_tcp_synackscan = {
.pcap_snaplen = 96,
.port_args = 1,
.global_initialize = &synackscan_global_initialize,
.thread_initialize = &synackscan_init_perthread,
.prepare_packet = &synackscan_prepare_packet,
.make_packet = &synackscan_make_packet,
.print_packet = &synscan_print_packet,
.process_packet = &synackscan_process_packet,
Expand Down
6 changes: 3 additions & 3 deletions src/probe_modules/module_tcp_synscan.c
Expand Up @@ -74,8 +74,8 @@ static int synscan_global_initialize(struct state_conf *state)
return EXIT_SUCCESS;
}

static int synscan_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void **arg_ptr)
static int synscan_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void *arg_ptr)
{
struct ether_header *eth_header = (struct ether_header *)buf;
make_eth_header(eth_header, src, gw);
Expand Down Expand Up @@ -259,7 +259,7 @@ probe_module_t module_tcp_synscan = {
.pcap_snaplen = 96,
.port_args = 1,
.global_initialize = &synscan_global_initialize,
.thread_initialize = &synscan_init_perthread,
.prepare_packet = &synscan_prepare_packet,
.make_packet = &synscan_make_packet,
.print_packet = &synscan_print_packet,
.process_packet = &synscan_process_packet,
Expand Down
20 changes: 13 additions & 7 deletions src/probe_modules/module_udp.c
Expand Up @@ -241,7 +241,17 @@ int udp_global_cleanup(UNUSED struct state_conf *zconf,
return EXIT_SUCCESS;
}

int udp_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg_ptr)
int udp_init_perthread(void **arg_ptr)
{
// Seed our random number generator with the global generator
uint32_t seed = aesrand_getword(zconf.aes);
aesrand_t *aes = aesrand_init_from_seed(seed);
*arg_ptr = aes;

return EXIT_SUCCESS;
}

int udp_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, UNUSED void *arg_ptr)
{
memset(buf, 0, MAX_PACKET_SIZE);
struct ether_header *eth_header = (struct ether_header *)buf;
Expand All @@ -260,11 +270,6 @@ int udp_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg_ptr)
memcpy(payload, udp_fixed_payload, udp_fixed_payload_len);
}

// Seed our random number generator with the global generator
uint32_t seed = aesrand_getword(zconf.aes);
aesrand_t *aes = aesrand_init_from_seed(seed);
*arg_ptr = aes;

return EXIT_SUCCESS;
}

Expand Down Expand Up @@ -857,8 +862,9 @@ probe_module_t module_udp = {
.pcap_snaplen =
MAX_UDP_PAYLOAD_LEN + 20 + 24, // Ether Header, IP Header with Options
.port_args = 1,
.thread_initialize = &udp_init_perthread,
.global_initialize = &udp_global_initialize,
.thread_initialize = &udp_init_perthread,
.prepare_packet = &udp_prepare_packet,
.make_packet =
&udp_make_packet, // can be overridden to udp_make_templated_packet by udp_global_initalize
.print_packet = &udp_print_packet,
Expand Down
6 changes: 3 additions & 3 deletions src/probe_modules/module_upnp.c
Expand Up @@ -39,8 +39,8 @@ int upnp_global_initialize(struct state_conf *state)
return EXIT_SUCCESS;
}

int upnp_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void **arg_ptr)
int upnp_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw,
UNUSED void *arg_ptr)
{
memset(buf, 0, MAX_PACKET_SIZE);
struct ether_header *eth_header = (struct ether_header *)buf;
Expand Down Expand Up @@ -255,7 +255,7 @@ probe_module_t module_upnp = {
.pcap_snaplen = 2048,
.port_args = 1,
.global_initialize = &upnp_global_initialize,
.thread_initialize = &upnp_init_perthread,
.prepare_packet = &upnp_prepare_packet,
.make_packet = &udp_make_packet,
.print_packet = &udp_print_packet,
.process_packet = &upnp_process_packet,
Expand Down
23 changes: 19 additions & 4 deletions src/probe_modules/probe_modules.h
Expand Up @@ -30,14 +30,28 @@ typedef struct probe_response_type {

typedef int (*probe_global_init_cb)(struct state_conf *);

typedef int (*probe_thread_init_cb)(void *packetbuf, macaddr_t *src_mac,
macaddr_t *gw_mac, void **arg_ptr);
// Called once per send thread to initialize state.
typedef int (*probe_thread_init_cb)(void **arg_ptr);

// The make_packet callback is passed a buffer pointing at an ethernet header.
// The buffer is MAX_PACKET_SIZE bytes. There are 1..n packet buffers in use
// per send thread. For each buffer, prepare_packet is called once before the
// first call to make_packet.
//
// The probe module is expected to write all the constant packet contents
// in prepare_packet, and only write the fields that need to be updated for
// each packet being sent in make_packet.
//
typedef int (*probe_prepare_packet_cb)(void *packetbuf, macaddr_t *src_mac,
macaddr_t *gw_mac, void *arg);

// The make_packet callback is passed a buffer pointing at an ethernet header.
// The buffer is MAX_PACKET_SIZE bytes. The callback must update the value
// pointed at by buf_len with the actual length of the packet. The contents of
// the buffer will match the previous packet sent. Every invocation of
// make_packet contains a unique (src_ip, probe_num) tuple.
// the buffer will match a previously sent packet by this send thread, so
// content not overwritten by make_packet can be relied upon to be intact.
// Beyond that, the probe module should not make any assumptions about buffers.
// Every invocation of make_packet contains a unique (src_ip, probe_num) tuple.
//
// The probe module is responsible for populating the IP header. The src_ip,
// dst_ip, and ttl are provided by the framework and must be set on the IP
Expand Down Expand Up @@ -84,6 +98,7 @@ typedef struct probe_module {

probe_global_init_cb global_initialize;
probe_thread_init_cb thread_initialize;
probe_prepare_packet_cb prepare_packet;
probe_make_packet_cb make_packet;
probe_print_packet_cb print_packet;
probe_validate_packet_cb validate_packet;
Expand Down
2 changes: 0 additions & 2 deletions src/recv-netmap.c
Expand Up @@ -50,7 +50,6 @@ send_packet(make_packet_func_t mkpkt, void const *arg)
sock.nm.tx_ring_fd = zconf.nm.nm_fd;

batch_t *batch = create_packet_batch(1);
batch->packets[0].ip = 0; // unused by netmap
batch->packets[0].len = mkpkt(batch->packets[0].buf, arg);
assert(batch->packets[0].len <= MAX_PACKET_SIZE);
batch->len = 1;
Expand All @@ -67,7 +66,6 @@ static void
submit_packet(make_packet_func_t mkpkt, void const *arg)
{
batch_t *batch = create_packet_batch(1);
batch->packets[0].ip = 0; // unused by netmap
batch->packets[0].len = mkpkt(batch->packets[0].buf, arg);
assert(batch->packets[0].len <= MAX_PACKET_SIZE);
batch->len = 1;
Expand Down
8 changes: 4 additions & 4 deletions src/send-bsd.c
Expand Up @@ -34,9 +34,10 @@ send_run_init(UNUSED sock_t sock)
}

static int
send_packet(sock_t sock, void *buf, int len, UNUSED uint32_t retry_ct)
send_packet(sock_t sock, uint8_t *buf, int len, UNUSED uint32_t retry_ct)
{
if (zconf.send_ip_pkts) {
buf += sizeof(struct ether_header);
struct ip *iph = (struct ip *)buf;

#if defined(__APPLE__) || (defined(__FreeBSD__) && __FreeBSD_version < 1100030)
Expand Down Expand Up @@ -95,12 +96,11 @@ send_batch(sock_t sock, batch_t* batch, int retries)
}
if (rc < 0) {
// packet couldn't be sent in retries number of attempts
struct in_addr addr;
addr.s_addr = batch->packets[packet_num].ip;
struct ip *iph = (struct ip *)(batch->packets[packet_num].buf + sizeof(struct ether_header));
char addr_str_buf[INET_ADDRSTRLEN];
const char *addr_str =
inet_ntop(
AF_INET, &addr,
AF_INET, &iph->ip_dst,
addr_str_buf,
INET_ADDRSTRLEN);
if (addr_str != NULL) {
Expand Down
10 changes: 7 additions & 3 deletions src/send-linux.c
Expand Up @@ -75,10 +75,14 @@ int send_batch(sock_t sock, batch_t* batch, int retries) {
struct msghdr msgs[batch->capacity];
struct iovec iovs[batch->capacity];

size_t buf_offset = 0;
if (zconf.send_ip_pkts) {
buf_offset = sizeof(struct ether_header);
}
for (int i = 0; i < batch->len; ++i) {
struct iovec *iov = &iovs[i];
iov->iov_base = batch->packets[i].buf;
iov->iov_len = batch->packets[i].len;
iov->iov_base = batch->packets[i].buf + buf_offset;
iov->iov_len = batch->packets[i].len - buf_offset;
struct msghdr *msg = &msgs[i];
memset(msg, 0, sizeof(struct msghdr));
// based on https://github.com/torvalds/linux/blob/master/net/socket.c#L2180
Expand All @@ -87,7 +91,7 @@ int send_batch(sock_t sock, batch_t* batch, int retries) {
msg->msg_iov = iov;
msg->msg_iovlen = 1;
msgvec[i].msg_hdr = *msg;
msgvec[i].msg_len = batch->packets[i].len;
msgvec[i].msg_len = batch->packets[i].len - buf_offset;
}
// set up per-retry variables, so we can only re-submit what didn't send successfully
struct mmsghdr* current_msg_vec = msgvec;
Expand Down