Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Port to Tokio 1, Actix 0.11, and RedBPF 1.3 (unreleased) #307

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
690 changes: 107 additions & 583 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 7 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ingraind"
version = "1.0.0"
version = "1.1.0"
authors = ["Peter Parkanyi <peter@redsift.io>"]
license = "GPL-3.0"
description = "eBPF-based system monitoring agent"
Expand All @@ -9,25 +9,24 @@ build = "build.rs"
edition = "2018"

[build-dependencies]
cargo-bpf = { version = "^1.2", default-features = false, features = ["build"] }
cargo-bpf = { path = "../redbpf/cargo-bpf" , default-features = false, features = ["build"] }

[build-dependencies.capnpc]
version = "^0.14"
optional = true

[dependencies]
actix = "0.10"
actix = "0.11"
futures = "0.3"
tokio = { version = "0.2", features = ["udp", "time", "stream"]}
tokio = { version = "1.5", features = ["net", "signal", "macros"]}
bytes = "1.0"
mio = "0.6"

failure = "0.1"
lazy_static = "1.1.0"
lazy_static = "^1.1.0"

lazy-socket = "0.3"
redbpf = "^1.2"
redbpf-probes = "^1.2"
redbpf = { path = "../redbpf/redbpf" }
redbpf-probes = { path = "../redbpf/redbpf-probes" }

serde = "^1.0"
serde_derive = "^1.0"
Expand Down
8 changes: 0 additions & 8 deletions ingraind-probes/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions ingraind-probes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ edition = '2018'
[dependencies]
cty = "0.2"
memoffset = "0.6.1"
redbpf-macros = "^1.2"
redbpf-probes = "^1.2"
redbpf-macros = { path = "../../redbpf/redbpf-macros" }
redbpf-probes = { path = "../../redbpf/redbpf-probes" }
unroll = "0.1"

[features]
Expand Down
10 changes: 5 additions & 5 deletions ingraind-probes/src/file/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ program!(0xFFFFFFFE, "GPL");
const S_IFMT: u16 = 0o00170000;
const S_IFREG: u16 = 0o0100000;

#[map("actionlist")]
static mut actionlist: HashMap<u64, u8> = HashMap::with_max_entries(10240);

#[map("files")]
static mut files: HashMap<u64, *const file> = HashMap::with_max_entries(10240);
static mut files: HashMap<u64, *const file> = HashMap::with_max_entries(1024);

#[map("rw")]
static mut rw: PerfMap<FileAccess> = PerfMap::with_max_entries(1024);

#[map("actionlist")]
static mut actionlist: HashMap<u64, u8> = HashMap::with_max_entries(1024);

#[kprobe("vfs_read")]
pub fn trace_read_entry(regs: Registers) {
let tid = bpf_get_current_pid_tgid();
Expand Down Expand Up @@ -125,7 +125,7 @@ fn dentry_to_path(mut dentry: *mut dentry, path_list: &mut PathList) -> Option<I
let read = unsafe {
bpf_probe_read_str(
segment.name.as_mut_ptr() as *mut _,
PATH_SEGMENT_LEN as i32,
PATH_SEGMENT_LEN as u32,
name.name as *const _,
)
};
Expand Down
3 changes: 1 addition & 2 deletions src/aggregations/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl Buffer {
.map(|s| s * 1000)
.unwrap_or(config.interval_ms);
let flush_period = Duration::from_millis(ms);
Actor::start_in_arbiter(&actix::Arbiter::new(), move |_| Buffer {
Actor::start_in_arbiter(&actix::Arbiter::new().handle(), move |_| Buffer {
aggregator: Aggregator::new(config.enable_histograms),
upstream,
flush_handle: SpawnHandle::default(),
Expand Down Expand Up @@ -338,7 +338,6 @@ impl Aggregator {
pub fn uniques(&self, key: &MeasurementKey) -> Option<usize> {
self.sets.get(key).map(|am| am.value.len())
}

}

#[cfg(test)]
Expand Down
20 changes: 13 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,22 @@ impl Backend {
match self {
#[cfg(feature = "s3-backend")]
Backend::S3 => {
Actor::start_in_arbiter(&actix::Arbiter::new(), |_| s3::S3::new()).recipient()
Actor::start_in_arbiter(&actix::Arbiter::new().handle(), |_| s3::S3::new())
.recipient()
}
#[cfg(feature = "statsd-backend")]
Backend::StatsD(config) => {
Actor::start_in_arbiter(&actix::Arbiter::new(), |_| statsd::Statsd::new(config))
.recipient()
Actor::start_in_arbiter(&actix::Arbiter::new().handle(), |_| {
statsd::Statsd::new(config)
})
.recipient()
}
#[cfg(feature = "http-backend")]
Backend::HTTP(config) => {
Actor::start_in_arbiter(&actix::Arbiter::new(), |_| http::HTTP::new(config))
.recipient()
Actor::start_in_arbiter(&actix::Arbiter::new().handle(), |_| {
http::HTTP::new(config)
})
.recipient()
}
Backend::Console => console::Console.start().recipient(),
}
Expand All @@ -116,11 +121,12 @@ pub enum ProbeActor {
EBPF(EBPFActor),
StatsD(grains::statsd::Statsd),
Osquery(osquery::Osquery),
Test(grains::test::TestProbe)
Test(grains::test::TestProbe),
}

impl ProbeActor {
pub fn start(self, io: &Arbiter) {
pub fn start(self, arbiter: &Arbiter) {
let io = &arbiter.handle();
match self {
ProbeActor::EBPF(a) => {
Actor::start_in_arbiter(io, |_| a);
Expand Down
67 changes: 19 additions & 48 deletions src/grains/ebpf_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,29 @@ use crate::grains::EventCallback;

use futures::prelude::*;
use lazy_socket::raw::Socket;
use mio::unix::EventedFd;
use mio::{Evented, PollOpt, Ready, Token};
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::slice;
use std::task::{Context, Poll};
use tokio::io::PollEvented;
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;

use redbpf::PerfMap;

pub struct GrainIo(RawFd);

impl Evented for GrainIo {
fn register(
&self,
poll: &mio::Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
EventedFd(&self.0).register(poll, token, interest, opts)
}

fn reregister(
&self,
poll: &mio::Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
EventedFd(&self.0).reregister(poll, token, interest, opts)
}

fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
EventedFd(&self.0).deregister(poll)
}
}

pub type GrainIo = AsyncFd<RawFd>;
pub type MessageStream = dyn Stream<Item = Vec<Message>> + Unpin;
pub type MessageStreams = Vec<Box<MessageStream>>;

pub struct PerfMessageStream {
poll: PollEvented<GrainIo>,
poll: GrainIo,
map: PerfMap,
name: String,
callback: EventCallback,
}

impl PerfMessageStream {
pub fn new(name: String, map: PerfMap, callback: EventCallback) -> Self {
let io = GrainIo(map.fd);
let poll = PollEvented::new(io).unwrap();
let poll = GrainIo::with_interest(map.fd, Interest::READABLE).unwrap();
PerfMessageStream {
poll,
map,
Expand Down Expand Up @@ -96,27 +66,28 @@ impl Stream for PerfMessageStream {
type Item = Vec<Message>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let ready = Ready::readable();
if let Poll::Pending = self.poll.poll_read_ready(cx, ready) {
if let Poll::Pending = self.poll.poll_read_ready(cx) {
return Poll::Pending;
}

let messages = self.read_messages();
self.poll.clear_read_ready(cx, ready).unwrap();
Poll::Ready(Some(messages))
if let Poll::Ready(Ok(mut readguard)) = self.poll.poll_read_ready(cx) {
readguard.clear_ready();
}

return Poll::Ready(Some(messages));
}
}

pub struct SocketMessageStream {
poll: PollEvented<GrainIo>,
poll: GrainIo,
socket: Socket,
callback: EventCallback,
}

impl SocketMessageStream {
pub fn new(_name: &str, socket: Socket, callback: EventCallback) -> Self {
let io = GrainIo(socket.as_raw_fd());
let poll = PollEvented::new(io).unwrap();
let poll = GrainIo::with_interest(socket.as_raw_fd(), Interest::READABLE).unwrap();
SocketMessageStream {
poll,
socket,
Expand Down Expand Up @@ -148,13 +119,13 @@ impl Stream for SocketMessageStream {
type Item = Vec<Message>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let ready = Ready::readable();
if let Poll::Pending = self.poll.poll_read_ready(cx, ready) {
return Poll::Pending;
if let Poll::Ready(Ok(mut readguard)) = self.poll.poll_read_ready(cx) {
let messages = self.read_messages();

readguard.clear_ready();
return Poll::Ready(Some(messages));
}

let messages = self.read_messages();
self.poll.clear_read_ready(cx, ready).unwrap();
Poll::Ready(Some(messages))
Poll::Pending
}
}
2 changes: 1 addition & 1 deletion src/grains/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl Actor for Statsd {

fn started(&mut self, ctx: &mut Self::Context) {
info!("statsd daemon started {}", self.bind_address);
let mut socket =
let socket =
UdpSocket::from_std(std::net::UdpSocket::bind(self.bind_address).unwrap()).unwrap();
let stream = stream::poll_fn(move |ctx| {
let mut buf = [0u8; 65527];
Expand Down