Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-core concurrent packet processing #2234

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
112 changes: 75 additions & 37 deletions osdep/BSDEthernetTap.cpp
Expand Up @@ -39,7 +39,9 @@
#include <net/if_dl.h>
#include <net/if_media.h>
#include <net/route.h>
#include <pthread_np.h>

#include <sched.h>
#include <string>
#include <map>
#include <set>
Expand All @@ -53,6 +55,7 @@
#include "BSDEthernetTap.hpp"

#define ZT_BASE32_CHARS "0123456789abcdefghijklmnopqrstuv"
#define ZT_TAP_BUF_SIZE (1024 * 16)

// ff:ff:ff:ff:ff:ff with no ADI
static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0);
Expand All @@ -61,6 +64,7 @@ namespace ZeroTier {

BSDEthernetTap::BSDEthernetTap(
const char *homePath,
unsigned int concurrency,
const MAC &mac,
unsigned int mtu,
unsigned int metric,
Expand All @@ -69,6 +73,7 @@ BSDEthernetTap::BSDEthernetTap(
void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int),
void *arg) :
_handler(handler),
_concurrency(concurrency),
_arg(arg),
_nwid(nwid),
_mtu(mtu),
Expand Down Expand Up @@ -195,11 +200,9 @@ BSDEthernetTap::BSDEthernetTap(
BSDEthernetTap::~BSDEthernetTap()
{
::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit
Thread::join(_thread);
::close(_fd);
::close(_shutdownSignalPipe[0]);
::close(_shutdownSignalPipe[1]);

long cpid = (long)vfork();
if (cpid == 0) {
#ifdef ZT_TRACE
Expand All @@ -211,6 +214,10 @@ BSDEthernetTap::~BSDEthernetTap()
int exitcode = -1;
::waitpid(cpid,&exitcode,0);
}
Thread::join(_thread);
for (std::thread &t : _rxThreads) {
t.join();
}
}

void BSDEthernetTap::setEnabled(bool en)
Expand Down Expand Up @@ -418,53 +425,84 @@ void BSDEthernetTap::setMtu(unsigned int mtu)
void BSDEthernetTap::threadMain()
throw()
{
fd_set readfds,nullfds;
MAC to,from;
int n,nfds,r;
char getBuf[ZT_MAX_MTU + 64];
bool _enablePinning = false;
char* envvar = std::getenv("ZT_CPU_PINNING");
if (envvar) {
int tmp = atoi(envvar);
if (tmp > 0) {
_enablePinning = true;
}
}

// Wait for a moment after startup -- wait for Network to finish
// constructing itself.
Thread::sleep(500);

FD_ZERO(&readfds);
FD_ZERO(&nullfds);
nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
for (unsigned int i = 0; i < _concurrency; ++i) {
_rxThreads.push_back(std::thread([this, i, _enablePinning] {

if (_enablePinning) {
int pinCore = i % _concurrency;
fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore);
pthread_t self = pthread_self();
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(pinCore, &cpuset);
//int rc = sched_setaffinity(self, sizeof(cpu_set_t), &cpuset);
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
if (rc != 0)
{
fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno));
exit(1);
}
}

r = 0;
for(;;) {
FD_SET(_shutdownSignalPipe[0],&readfds);
FD_SET(_fd,&readfds);
select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
uint8_t b[ZT_TAP_BUF_SIZE];
MAC to, from;
fd_set readfds, nullfds;
int n, nfds, r;

if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
break;
FD_ZERO(&readfds);
FD_ZERO(&nullfds);
nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;

r = 0;

if (FD_ISSET(_fd,&readfds)) {
n = (int)::read(_fd,getBuf + r,sizeof(getBuf) - r);
if (n < 0) {
if ((errno != EINTR)&&(errno != ETIMEDOUT))
for(;;) {
FD_SET(_shutdownSignalPipe[0],&readfds);
FD_SET(_fd,&readfds);
select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);

if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
break;
} else {
// Some tap drivers like to send the ethernet frame and the
// payload in two chunks, so handle that by accumulating
// data until we have at least a frame.
r += n;
if (r > 14) {
if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
r = _mtu + 14;

if (_enabled) {
to.setTo(getBuf,6);
from.setTo(getBuf + 6,6);
unsigned int etherType = ntohs(((const uint16_t *)getBuf)[6]);
_handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(getBuf + 14),r - 14);
}

r = 0;
if (FD_ISSET(_fd,&readfds)) {
n = (int)::read(_fd,b + r,sizeof(b) - r);
if (n < 0) {
if ((errno != EINTR)&&(errno != ETIMEDOUT))
break;
} else {
// Some tap drivers like to send the ethernet frame and the
// payload in two chunks, so handle that by accumulating
// data until we have at least a frame.
r += n;
if (r > 14) {
if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
r = _mtu + 14;

if (_enabled) {
to.setTo(b,6);
from.setTo(b + 6,6);
unsigned int etherType = ntohs(((const uint16_t *)b)[6]);
_handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(b + 14),r - 14);
}

r = 0;
}
}
}
}
}
}));
}
}

Expand Down
4 changes: 4 additions & 0 deletions osdep/BSDEthernetTap.hpp
Expand Up @@ -20,6 +20,7 @@
#include <string>
#include <vector>
#include <stdexcept>
#include <thread>

#include "../node/Constants.hpp"
#include "../node/MulticastGroup.hpp"
Expand All @@ -34,6 +35,7 @@ class BSDEthernetTap : public EthernetTap
public:
BSDEthernetTap(
const char *homePath,
unsigned int concurrency,
const MAC &mac,
unsigned int mtu,
unsigned int metric,
Expand Down Expand Up @@ -62,6 +64,7 @@ class BSDEthernetTap : public EthernetTap
private:
void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int);
void *_arg;
unsigned int _concurrency;
uint64_t _nwid;
Thread _thread;
std::string _dev;
Expand All @@ -73,6 +76,7 @@ class BSDEthernetTap : public EthernetTap
volatile bool _enabled;
mutable std::vector<InetAddress> _ifaddrs;
mutable uint64_t _lastIfAddrsUpdate;
std::vector<std::thread> _rxThreads;
};

} // namespace ZeroTier
Expand Down
15 changes: 8 additions & 7 deletions osdep/EthernetTap.cpp
Expand Up @@ -58,6 +58,7 @@ namespace ZeroTier {
std::shared_ptr<EthernetTap> EthernetTap::newInstance(
const char *tapDeviceType, // OS-specific, NULL for default
const char *homePath,
unsigned int concurrency,
const MAC &mac,
unsigned int mtu,
unsigned int metric,
Expand All @@ -83,16 +84,16 @@ std::shared_ptr<EthernetTap> EthernetTap::newInstance(
// The "feth" virtual Ethernet device type appeared in Darwin 17.x.x. Older versions
// (Sierra and earlier) must use the a kernel extension.
if (strtol(osrelease,(char **)0,10) < 17) {
return std::shared_ptr<EthernetTap>(new MacKextEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new MacKextEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
} else {
return std::shared_ptr<EthernetTap>(new MacEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new MacEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
}
}
}
#endif // __APPLE__

#ifdef __LINUX__
return std::shared_ptr<EthernetTap>(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new LinuxEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
#endif // __LINUX__

#ifdef __WINDOWS__
Expand Down Expand Up @@ -126,19 +127,19 @@ std::shared_ptr<EthernetTap> EthernetTap::newInstance(
_comInit = true;
}
}
return std::shared_ptr<EthernetTap>(new WindowsEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new WindowsEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
#endif // __WINDOWS__

#ifdef __FreeBSD__
return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
#endif // __FreeBSD__

#ifdef __NetBSD__
return std::shared_ptr<EthernetTap>(new NetBSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new NetBSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
#endif // __NetBSD__

#ifdef __OpenBSD__
return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
#endif // __OpenBSD__

#endif // ZT_SDK?
Expand Down
1 change: 1 addition & 0 deletions osdep/EthernetTap.hpp
Expand Up @@ -33,6 +33,7 @@ class EthernetTap
static std::shared_ptr<EthernetTap> newInstance(
const char *tapDeviceType, // OS-specific, NULL for default
const char *homePath,
unsigned int concurrency,
const MAC &mac,
unsigned int mtu,
unsigned int metric,
Expand Down