diff --git a/src/probe_modules/module_bacnet.c b/src/probe_modules/module_bacnet.c index 06534fb98..3d747b117 100644 --- a/src/probe_modules/module_bacnet.c +++ b/src/probe_modules/module_bacnet.c @@ -37,7 +37,16 @@ static inline uint8_t get_invoke_id(uint32_t *validation) return (uint8_t)((validation[1] >> 24) & 0xFF); } -int bacnet_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg) +int bacnet_init_perthread(void **arg) +{ + uint32_t seed = aesrand_getword(zconf.aes); + aesrand_t *aes = aesrand_init_from_seed(seed); + *arg = aes; + + return EXIT_SUCCESS; +} + +int bacnet_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, UNUSED void *arg) { memset(buf, 0, MAX_PACKET_SIZE); struct ether_header *eth_header = (struct ether_header *)buf; @@ -67,10 +76,6 @@ int bacnet_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg) bnp->apdu.server_choice = 0x0c; memcpy(body, bacnet_body, BACNET_BODY_LEN); - uint32_t seed = aesrand_getword(zconf.aes); - aesrand_t *aes = aesrand_init_from_seed(seed); - *arg = aes; - return EXIT_SUCCESS; } @@ -184,8 +189,9 @@ probe_module_t module_bacnet = {.name = "bacnet", .pcap_filter = "udp || icmp", .pcap_snaplen = 1500, .port_args = 1, - .thread_initialize = &bacnet_init_perthread, .global_initialize = &bacnet_global_initialize, + .thread_initialize = &bacnet_init_perthread, + .prepare_packet = &bacnet_prepare_packet, .make_packet = &bacnet_make_packet, .print_packet = &udp_print_packet, .validate_packet = &bacnet_validate_packet, diff --git a/src/probe_modules/module_dns.c b/src/probe_modules/module_dns.c index 43f244ac5..1d60b9d1e 100644 --- a/src/probe_modules/module_dns.c +++ b/src/probe_modules/module_dns.c @@ -735,8 +735,8 @@ static int dns_global_cleanup(UNUSED struct state_conf *zconf, return EXIT_SUCCESS; } -int dns_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, - UNUSED void **arg_ptr) +int dns_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, + UNUSED void *arg_ptr) { memset(buf, 0, MAX_PACKET_SIZE); @@ -902,7 +902,7 @@ void dns_add_null_fs(fieldset_t *fs) void dns_process_packet(const u_char *packet, uint32_t len, fieldset_t *fs, uint32_t *validation, - __attribute__((unused)) struct timespec ts) + UNUSED struct timespec ts) { struct ip *ip_hdr = (struct ip *)&packet[sizeof(struct ether_header)]; if (ip_hdr->ip_p == IPPROTO_UDP) { @@ -1111,8 +1111,8 @@ probe_module_t module_dns = { .pcap_filter = "udp || icmp", .pcap_snaplen = PCAP_SNAPLEN, .port_args = 1, - .thread_initialize = &dns_init_perthread, .global_initialize = &dns_global_initialize, + .prepare_packet = &dns_prepare_packet, .make_packet = &dns_make_packet, .print_packet = &dns_print_packet, .validate_packet = &dns_validate_packet, diff --git a/src/probe_modules/module_icmp_echo.c b/src/probe_modules/module_icmp_echo.c index aa51a916c..a7f8f7533 100644 --- a/src/probe_modules/module_icmp_echo.c +++ b/src/probe_modules/module_icmp_echo.c @@ -137,8 +137,8 @@ int icmp_global_cleanup(__attribute__((unused)) struct state_conf *zconf, return EXIT_SUCCESS; } -static int icmp_echo_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, - UNUSED void **arg_ptr) +static int icmp_echo_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, + UNUSED void *arg_ptr) { memset(buf, 0, MAX_PACKET_SIZE); @@ -330,7 +330,7 @@ probe_module_t module_icmp_echo = { .port_args = 0, .global_initialize = &icmp_global_initialize, .close = &icmp_global_cleanup, - .thread_initialize = &icmp_echo_init_perthread, + .prepare_packet = &icmp_echo_prepare_packet, .make_packet = &icmp_echo_make_packet, .print_packet = &icmp_echo_print_packet, .process_packet = &icmp_echo_process_packet, diff --git a/src/probe_modules/module_icmp_echo_time.c b/src/probe_modules/module_icmp_echo_time.c index dcac00c47..a18b6bc58 100644 --- a/src/probe_modules/module_icmp_echo_time.c +++ b/src/probe_modules/module_icmp_echo_time.c @@ -34,8 +34,8 @@ struct icmp_payload_for_rtt { ipaddr_n_t dst; }; -static int icmp_echo_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, - UNUSED void **arg_ptr) +static int icmp_echo_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, + UNUSED void *arg_ptr) { memset(buf, 0, MAX_PACKET_SIZE); @@ -259,7 +259,7 @@ probe_module_t module_icmp_echo_time = { .pcap_filter = "icmp and icmp[0]!=8", .pcap_snaplen = 96, .port_args = 0, - .thread_initialize = &icmp_echo_init_perthread, + .prepare_packet = &icmp_echo_prepare_packet, .make_packet = &icmp_echo_make_packet, .print_packet = &icmp_echo_print_packet, .process_packet = &icmp_echo_process_packet, diff --git a/src/probe_modules/module_ipip.c b/src/probe_modules/module_ipip.c index fa082f96d..8251e24e8 100644 --- a/src/probe_modules/module_ipip.c +++ b/src/probe_modules/module_ipip.c @@ -133,8 +133,8 @@ int ipip_global_cleanup(UNUSED struct state_conf *zconf, return EXIT_SUCCESS; } -int ipip_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, - UNUSED void **arg_ptr) +int ipip_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, + UNUSED void *arg_ptr) { memset(buf, 0, MAX_PACKET_SIZE); struct ether_header *eth_header = (struct ether_header *)buf; @@ -403,8 +403,8 @@ probe_module_t module_ipip = { .pcap_filter = "udp || icmp", .pcap_snaplen = 1500, .port_args = 1, - .thread_initialize = &ipip_init_perthread, .global_initialize = &ipip_global_initialize, + .prepare_packet = &ipip_prepare_packet, .make_packet = &ipip_make_packet, .print_packet = &ipip_print_packet, .validate_packet = &ipip_validate_packet, diff --git a/src/probe_modules/module_ntp.c b/src/probe_modules/module_ntp.c index a7ccc1d8c..34d813914 100644 --- a/src/probe_modules/module_ntp.c +++ b/src/probe_modules/module_ntp.c @@ -154,7 +154,16 @@ void ntp_process_packet(const u_char *packet, UNUSED uint32_t len, } } -int ntp_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg) +int ntp_init_perthread(void **arg) +{ + uint32_t seed = aesrand_getword(zconf.aes); + aesrand_t *aes = aesrand_init_from_seed(seed); + *arg = aes; + + return EXIT_SUCCESS; +} + +int ntp_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, UNUSED void *arg) { memset(buf, 0, MAX_PACKET_SIZE); struct ether_header *eth_header = (struct ether_header *)buf; @@ -176,10 +185,6 @@ int ntp_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg) sizeof(struct udphdr) + sizeof(struct ntphdr); module_ntp.max_packet_length = header_len; - uint32_t seed = aesrand_getword(zconf.aes); - aesrand_t *aes = aesrand_init_from_seed(seed); - *arg = aes; - return EXIT_SUCCESS; } @@ -240,8 +245,9 @@ probe_module_t module_ntp = {.name = "ntp", .pcap_filter = "udp || icmp", .pcap_snaplen = 1500, .port_args = 1, - .thread_initialize = &ntp_init_perthread, .global_initialize = &ntp_global_initialize, + .thread_initialize = &ntp_init_perthread, + .prepare_packet = &ntp_prepare_packet, .make_packet = &udp_make_packet, .print_packet = &ntp_print_packet, .validate_packet = &ntp_validate_packet, diff --git a/src/probe_modules/module_tcp_synackscan.c b/src/probe_modules/module_tcp_synackscan.c index cc43334c5..78734468e 100644 --- a/src/probe_modules/module_tcp_synackscan.c +++ b/src/probe_modules/module_tcp_synackscan.c @@ -34,8 +34,8 @@ static int synackscan_global_initialize(struct state_conf *state) return EXIT_SUCCESS; } -static int synackscan_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, - UNUSED void **arg_ptr) +static int synackscan_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, + UNUSED void *arg_ptr) { memset(buf, 0, MAX_PACKET_SIZE); struct ether_header *eth_header = (struct ether_header *)buf; @@ -211,7 +211,7 @@ probe_module_t module_tcp_synackscan = { .pcap_snaplen = 96, .port_args = 1, .global_initialize = &synackscan_global_initialize, - .thread_initialize = &synackscan_init_perthread, + .prepare_packet = &synackscan_prepare_packet, .make_packet = &synackscan_make_packet, .print_packet = &synscan_print_packet, .process_packet = &synackscan_process_packet, diff --git a/src/probe_modules/module_tcp_synscan.c b/src/probe_modules/module_tcp_synscan.c index 2222b55d8..b9311e787 100644 --- a/src/probe_modules/module_tcp_synscan.c +++ b/src/probe_modules/module_tcp_synscan.c @@ -74,8 +74,8 @@ static int synscan_global_initialize(struct state_conf *state) return EXIT_SUCCESS; } -static int synscan_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, - UNUSED void **arg_ptr) +static int synscan_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, + UNUSED void *arg_ptr) { struct ether_header *eth_header = (struct ether_header *)buf; make_eth_header(eth_header, src, gw); @@ -259,7 +259,7 @@ probe_module_t module_tcp_synscan = { .pcap_snaplen = 96, .port_args = 1, .global_initialize = &synscan_global_initialize, - .thread_initialize = &synscan_init_perthread, + .prepare_packet = &synscan_prepare_packet, .make_packet = &synscan_make_packet, .print_packet = &synscan_print_packet, .process_packet = &synscan_process_packet, diff --git a/src/probe_modules/module_udp.c b/src/probe_modules/module_udp.c index bee16174a..f0a6f202e 100644 --- a/src/probe_modules/module_udp.c +++ b/src/probe_modules/module_udp.c @@ -241,7 +241,17 @@ int udp_global_cleanup(UNUSED struct state_conf *zconf, return EXIT_SUCCESS; } -int udp_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg_ptr) +int udp_init_perthread(void **arg_ptr) +{ + // Seed our random number generator with the global generator + uint32_t seed = aesrand_getword(zconf.aes); + aesrand_t *aes = aesrand_init_from_seed(seed); + *arg_ptr = aes; + + return EXIT_SUCCESS; +} + +int udp_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, UNUSED void *arg_ptr) { memset(buf, 0, MAX_PACKET_SIZE); struct ether_header *eth_header = (struct ether_header *)buf; @@ -260,11 +270,6 @@ int udp_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, void **arg_ptr) memcpy(payload, udp_fixed_payload, udp_fixed_payload_len); } - // Seed our random number generator with the global generator - uint32_t seed = aesrand_getword(zconf.aes); - aesrand_t *aes = aesrand_init_from_seed(seed); - *arg_ptr = aes; - return EXIT_SUCCESS; } @@ -857,8 +862,9 @@ probe_module_t module_udp = { .pcap_snaplen = MAX_UDP_PAYLOAD_LEN + 20 + 24, // Ether Header, IP Header with Options .port_args = 1, - .thread_initialize = &udp_init_perthread, .global_initialize = &udp_global_initialize, + .thread_initialize = &udp_init_perthread, + .prepare_packet = &udp_prepare_packet, .make_packet = &udp_make_packet, // can be overridden to udp_make_templated_packet by udp_global_initalize .print_packet = &udp_print_packet, diff --git a/src/probe_modules/module_upnp.c b/src/probe_modules/module_upnp.c index c778e6377..b07e318ff 100644 --- a/src/probe_modules/module_upnp.c +++ b/src/probe_modules/module_upnp.c @@ -39,8 +39,8 @@ int upnp_global_initialize(struct state_conf *state) return EXIT_SUCCESS; } -int upnp_init_perthread(void *buf, macaddr_t *src, macaddr_t *gw, - UNUSED void **arg_ptr) +int upnp_prepare_packet(void *buf, macaddr_t *src, macaddr_t *gw, + UNUSED void *arg_ptr) { memset(buf, 0, MAX_PACKET_SIZE); struct ether_header *eth_header = (struct ether_header *)buf; @@ -255,7 +255,7 @@ probe_module_t module_upnp = { .pcap_snaplen = 2048, .port_args = 1, .global_initialize = &upnp_global_initialize, - .thread_initialize = &upnp_init_perthread, + .prepare_packet = &upnp_prepare_packet, .make_packet = &udp_make_packet, .print_packet = &udp_print_packet, .process_packet = &upnp_process_packet, diff --git a/src/probe_modules/probe_modules.h b/src/probe_modules/probe_modules.h index 2b393a268..ecfd43ede 100644 --- a/src/probe_modules/probe_modules.h +++ b/src/probe_modules/probe_modules.h @@ -30,14 +30,28 @@ typedef struct probe_response_type { typedef int (*probe_global_init_cb)(struct state_conf *); -typedef int (*probe_thread_init_cb)(void *packetbuf, macaddr_t *src_mac, - macaddr_t *gw_mac, void **arg_ptr); +// Called once per send thread to initialize state. +typedef int (*probe_thread_init_cb)(void **arg_ptr); + +// The make_packet callback is passed a buffer pointing at an ethernet header. +// The buffer is MAX_PACKET_SIZE bytes. There are 1..n packet buffers in use +// per send thread. For each buffer, prepare_packet is called once before the +// first call to make_packet. +// +// The probe module is expected to write all the constant packet contents +// in prepare_packet, and only write the fields that need to be updated for +// each packet being sent in make_packet. +// +typedef int (*probe_prepare_packet_cb)(void *packetbuf, macaddr_t *src_mac, + macaddr_t *gw_mac, void *arg); // The make_packet callback is passed a buffer pointing at an ethernet header. // The buffer is MAX_PACKET_SIZE bytes. The callback must update the value // pointed at by buf_len with the actual length of the packet. The contents of -// the buffer will match the previous packet sent. Every invocation of -// make_packet contains a unique (src_ip, probe_num) tuple. +// the buffer will match a previously sent packet by this send thread, so +// content not overwritten by make_packet can be relied upon to be intact. +// Beyond that, the probe module should not make any assumptions about buffers. +// Every invocation of make_packet contains a unique (src_ip, probe_num) tuple. // // The probe module is responsible for populating the IP header. The src_ip, // dst_ip, and ttl are provided by the framework and must be set on the IP @@ -84,6 +98,7 @@ typedef struct probe_module { probe_global_init_cb global_initialize; probe_thread_init_cb thread_initialize; + probe_prepare_packet_cb prepare_packet; probe_make_packet_cb make_packet; probe_print_packet_cb print_packet; probe_validate_packet_cb validate_packet; diff --git a/src/recv-netmap.c b/src/recv-netmap.c index 3dba490a8..1b497e9e9 100644 --- a/src/recv-netmap.c +++ b/src/recv-netmap.c @@ -50,7 +50,6 @@ send_packet(make_packet_func_t mkpkt, void const *arg) sock.nm.tx_ring_fd = zconf.nm.nm_fd; batch_t *batch = create_packet_batch(1); - batch->packets[0].ip = 0; // unused by netmap batch->packets[0].len = mkpkt(batch->packets[0].buf, arg); assert(batch->packets[0].len <= MAX_PACKET_SIZE); batch->len = 1; @@ -67,7 +66,6 @@ static void submit_packet(make_packet_func_t mkpkt, void const *arg) { batch_t *batch = create_packet_batch(1); - batch->packets[0].ip = 0; // unused by netmap batch->packets[0].len = mkpkt(batch->packets[0].buf, arg); assert(batch->packets[0].len <= MAX_PACKET_SIZE); batch->len = 1; diff --git a/src/send-bsd.c b/src/send-bsd.c index af2631dce..4186f0e71 100644 --- a/src/send-bsd.c +++ b/src/send-bsd.c @@ -34,9 +34,10 @@ send_run_init(UNUSED sock_t sock) } static int -send_packet(sock_t sock, void *buf, int len, UNUSED uint32_t retry_ct) +send_packet(sock_t sock, uint8_t *buf, int len, UNUSED uint32_t retry_ct) { if (zconf.send_ip_pkts) { + buf += sizeof(struct ether_header); struct ip *iph = (struct ip *)buf; #if defined(__APPLE__) || (defined(__FreeBSD__) && __FreeBSD_version < 1100030) @@ -95,12 +96,11 @@ send_batch(sock_t sock, batch_t* batch, int retries) } if (rc < 0) { // packet couldn't be sent in retries number of attempts - struct in_addr addr; - addr.s_addr = batch->packets[packet_num].ip; + struct ip *iph = (struct ip *)(batch->packets[packet_num].buf + sizeof(struct ether_header)); char addr_str_buf[INET_ADDRSTRLEN]; const char *addr_str = inet_ntop( - AF_INET, &addr, + AF_INET, &iph->ip_dst, addr_str_buf, INET_ADDRSTRLEN); if (addr_str != NULL) { diff --git a/src/send-linux.c b/src/send-linux.c index 1b502d0fa..def7c2199 100644 --- a/src/send-linux.c +++ b/src/send-linux.c @@ -75,10 +75,14 @@ int send_batch(sock_t sock, batch_t* batch, int retries) { struct msghdr msgs[batch->capacity]; struct iovec iovs[batch->capacity]; + size_t buf_offset = 0; + if (zconf.send_ip_pkts) { + buf_offset = sizeof(struct ether_header); + } for (int i = 0; i < batch->len; ++i) { struct iovec *iov = &iovs[i]; - iov->iov_base = batch->packets[i].buf; - iov->iov_len = batch->packets[i].len; + iov->iov_base = batch->packets[i].buf + buf_offset; + iov->iov_len = batch->packets[i].len - buf_offset; struct msghdr *msg = &msgs[i]; memset(msg, 0, sizeof(struct msghdr)); // based on https://github.com/torvalds/linux/blob/master/net/socket.c#L2180 @@ -87,7 +91,7 @@ int send_batch(sock_t sock, batch_t* batch, int retries) { msg->msg_iov = iov; msg->msg_iovlen = 1; msgvec[i].msg_hdr = *msg; - msgvec[i].msg_len = batch->packets[i].len; + msgvec[i].msg_len = batch->packets[i].len - buf_offset; } // set up per-retry variables, so we can only re-submit what didn't send successfully struct mmsghdr* current_msg_vec = msgvec; diff --git a/src/send.c b/src/send.c index 81cb106e2..f60022bdf 100644 --- a/src/send.c +++ b/src/send.c @@ -201,8 +201,6 @@ int send_run(sock_t st, shard_t *s) { log_debug("send", "send thread started"); pthread_mutex_lock(&send_mutex); - // Allocate a buffer to hold the outgoing packet - char buf[MAX_PACKET_SIZE]; // allocate batch batch_t* batch = create_packet_batch(zconf.batch); @@ -211,6 +209,7 @@ int send_run(sock_t st, shard_t *s) pthread_mutex_unlock(&send_mutex); return EXIT_FAILURE; } + // MAC address length in characters char mac_buf[(ETHER_ADDR_LEN * 2) + (ETHER_ADDR_LEN - 1) + 1]; char *p = mac_buf; @@ -224,13 +223,27 @@ int send_run(sock_t st, shard_t *s) } } log_debug("send", "source MAC address %s", mac_buf); - void *probe_data; + + void *probe_data = NULL; if (zconf.probe_module->thread_initialize) { - zconf.probe_module->thread_initialize( - buf, zconf.hw_mac, zconf.gw_mac, &probe_data); + int rv = zconf.probe_module->thread_initialize(&probe_data); + if (rv != EXIT_SUCCESS) { + pthread_mutex_unlock(&send_mutex); + log_fatal("send", "Send thread initialization for probe module failed: %u", rv); + } } pthread_mutex_unlock(&send_mutex); + if (zconf.probe_module->prepare_packet) { + for (size_t i = 0; i < batch->capacity; i++) { + int rv = zconf.probe_module->prepare_packet( + batch->packets[i].buf, zconf.hw_mac, zconf.gw_mac, probe_data); + if (rv != EXIT_SUCCESS) { + log_fatal("send", "Probe module failed to prepare packet: %u", rv); + } + } + } + // adaptive timing to hit target rate uint64_t count = 0; uint64_t last_count = count; @@ -373,10 +386,11 @@ int send_run(sock_t st, shard_t *s) htons(current_port), (uint8_t *)validation); uint8_t ttl = zconf.probe_ttl; + size_t length = 0; zconf.probe_module->make_packet( - buf, &length, src_ip, current_ip, - htons(current_port), ttl, validation, i, + batch->packets[batch->len].buf, &length, + src_ip, current_ip, htons(current_port), ttl, validation, i, // Grab last 2 bytes of validation for ip_id (uint16_t)(validation[size_of_validation - 1] & 0xFFFF), probe_data); @@ -387,25 +401,14 @@ int send_run(sock_t st, shard_t *s) s->thread_id, length, MAX_PACKET_SIZE); } + batch->packets[batch->len].len = (uint32_t)length; + if (zconf.dryrun) { lock_file(stdout); zconf.probe_module->print_packet(stdout, - buf); + batch->packets[batch->len].buf); unlock_file(stdout); } else { - void *contents = - buf + - zconf.send_ip_pkts * - sizeof(struct ether_header); - length -= (zconf.send_ip_pkts * - sizeof(struct ether_header)); - // add packet to batch and update metadata - // this is an additional memcpy (packet created in buf, buf -> batch) - // but when I modified the TCP SYN module to write packet to batch directly, there wasn't any noticeable speedup. - // Using this approach for readability/minimal changes - batch->packets[batch->len].ip = current_ip; - batch->packets[batch->len].len = length; - memcpy(batch->packets[batch->len].buf, contents, length); batch->len++; if (batch->len == batch->capacity) { // batch is full, sending diff --git a/src/send.h b/src/send.h index 430d500f8..39b3cb437 100644 --- a/src/send.h +++ b/src/send.h @@ -12,22 +12,32 @@ #include "iterator.h" #include "socket.h" +#include + iterator_t *send_init(void); int send_run(sock_t, shard_t *); // Fit two packets with metadata into one 4k page. // 2k seems like more than enough with typical MTU of // 1500, and we don't want to cause IP fragmentation. -#define MAX_PACKET_SIZE (2048 - 2 * sizeof(uint32_t)) +#define MAX_PACKET_SIZE (2048 - sizeof(uint32_t) - 2 * sizeof(uint8_t)) // Metadata and initial packet bytes are adjacent, // for cache locality esp. with short packets. +// buf is aligned such that the end of the Ethernet +// header and beginning of the IP header will align +// to a 32 bit boundary, such that reading/writing +// IP addresses and other 32 bit header fields is +// properly aligned. struct batch_packet { - uint32_t ip; uint32_t len; + uint8_t unused[2]; uint8_t buf[MAX_PACKET_SIZE]; }; +static_assert((offsetof(struct batch_packet, buf) + sizeof(struct ether_header)) % sizeof(uint32_t) == 0, + "buf is aligned such that IP header is 32-bit aligned"); + typedef struct { struct batch_packet *packets; uint16_t len;