Skip to content
This repository has been archived by the owner on Nov 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #15 from runabove/limits
Browse files Browse the repository at this point in the history
Limits
  • Loading branch information
d33d33 committed Jan 12, 2017
2 parents e57fe50 + 160f65d commit 437e70f
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "beamium"
version = "1.1.1"
version = "1.2.0"
authors = [ "d33d33 <kevin.georges@corp.ovh.com>" ]

build = "build.rs"
Expand Down
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ sources: # Sources definitions (Optional)
Beamium can have none to many Warp10 endpoints. A *sink* is defined as follow:
``` yaml
sinks: # Sinks definitions (Optional)
source1: # Sink name (Required)
url: https://warp.io/api/v0/update # Warp10 endpoint (Required)
token: mywarp10token # Warp10 write token (Required)
token-header: X-Custom-Token # Warp10 token header name (Optional, default: X-Warp10-Token)
selector: metrics.* # Regex used to filter metrics (Optional, default: None)
source1: # Sink name (Required)
url: https://warp.io/api/v0/update # Warp10 endpoint (Required)
token: mywarp10token # Warp10 write token (Required)
token-header: X-Custom-Token # Warp10 token header name (Optional, default: X-Warp10-Token)
selector: metrics.* # Regex used to filter metrics (Optional, default: None)
ttl: 3600 # Discard file older than ttl (seconds) (Optional, default: 3600)
size: 1073741824 # Discard old file if sink size is greater (Optional, default: 1073741824)
```

#### Labels
Expand Down
24 changes: 24 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct Sink {
pub token: String,
pub token_header: String,
pub selector: Option<regex::Regex>,
pub ttl: u64,
pub size: u64,
}

#[derive(Debug)]
Expand Down Expand Up @@ -268,12 +270,34 @@ fn load_path<P: AsRef<Path>>(file_path: P, config: &mut Config) -> Result<(), Co
.ok_or(format!("sinks.{}.selector is invalid", name))))))
};

let ttl = if v["ttl"].is_badvalue() {
3600
} else {
let ttl = try!(v["ttl"]
.as_i64()
.ok_or(format!("sinks.{}.ttl should be a number", name)));
try!(cast::u64(ttl)
.map_err(|_| format!("sinks.{}.ttl should be a positive number", name)))
};

let size = if v["size"].is_badvalue() {
1073741824
} else {
let size = try!(v["size"]
.as_i64()
.ok_or(format!("sinks.{}.size should be a number", name)));
try!(cast::u64(size)
.map_err(|_| format!("sinks.{}.size should be a positive number", name)))
};

config.sinks.push(Sink {
name: String::from(name),
url: String::from(url),
token: String::from(token),
token_header: String::from(token_header),
selector: selector,
ttl: ttl,
size: size,
})
}
}
Expand Down
77 changes: 65 additions & 12 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::error::Error;
use std::ffi::OsStr;
use std::path::PathBuf;
use hyper;
use std::os::unix::fs::MetadataExt;

use config;

Expand All @@ -30,6 +31,11 @@ pub fn sink(sink: &config::Sink, parameters: &config::Parameters, sigint: Arc<At
Ok(_) => info!("post success"),
}

let res = cappe(sink, parameters);
if res.is_err() {
error!("cappe fail: {}", res.unwrap_err());
}

let elapsed = (time::now_utc() - start).num_milliseconds() as u64;
let sleep_time = if elapsed > parameters.scan_period {
REST_TIME
Expand All @@ -45,31 +51,24 @@ pub fn sink(sink: &config::Sink, parameters: &config::Parameters, sigint: Arc<At
}
}

/// Send send sink metrics to Warp10.
/// Send sink metrics to Warp10.
fn send(sink: &config::Sink, parameters: &config::Parameters) -> Result<(), Box<Error>> {
debug!("post {}", &sink.url);

loop {
let entries = try!(fs::read_dir(&parameters.sink_dir));
let entries = try!(files(&parameters.sink_dir, &sink.name));
let mut files = Vec::with_capacity(parameters.batch_count as usize);
let mut metrics = String::new();

// Load metrics
let mut batch_size = 0;
for (i, entry) in entries.enumerate() {
let entry = try!(entry);
let file_name = String::from(entry.file_name().to_str().unwrap_or(""));
// Look only for metrics files of the sink
if entry.path().extension() != Some(OsStr::new("metrics")) ||
!file_name.starts_with(&sink.name) {
continue;
}

for (i, entry) in entries.iter().enumerate() {
// Split metrics in capped batch
if i > parameters.batch_count as usize || batch_size > parameters.batch_size as usize {
break;
}

debug!("open sink file {}", format!("{:?}", entry.path()));
debug!("open sink file {:?}", entry.path());
let file = match read(entry.path()) {
Err(_) => continue,
Ok(v) => v,
Expand Down Expand Up @@ -112,6 +111,36 @@ fn send(sink: &config::Sink, parameters: &config::Parameters) -> Result<(), Box<
Ok(())
}

fn cappe(sink: &config::Sink, parameters: &config::Parameters) -> Result<(), Box<Error>> {
let entries = try!(files(&parameters.sink_dir, &sink.name));
let mut sinks_size: u64 = 0;

for entry in &entries {
let meta = try!(entry.metadata());

let modified = meta.modified();

if modified.is_ok() {
let modified = modified.unwrap();
let age = modified.elapsed().unwrap_or(Duration::new(0, 0));

if age.as_secs() > sink.ttl {
warn!("skip file {:?}", entry.path());
try!(fs::remove_file(entry.path()));
continue;
}
}

sinks_size += meta.size();
if sinks_size > sink.size {
warn!("skip file {:?}", entry.path());
try!(fs::remove_file(entry.path()));
}
}

Ok(())
}

/// Read a file as String.
fn read(path: PathBuf) -> Result<String, Box<Error>> {
let mut file = try!(File::open(path));
Expand All @@ -121,3 +150,27 @@ fn read(path: PathBuf) -> Result<String, Box<Error>> {

Ok(content)
}

fn files(dir: &str, sink_name: &str) -> Result<Vec<fs::DirEntry>, Box<Error>> {
let mut entries: Vec<fs::DirEntry> = try!(fs::read_dir(dir)).filter_map(|entry| {
if entry.is_err() {
return None;
}
let entry = entry.unwrap();
if entry.path().extension() != Some(OsStr::new("metrics")) {
return None;
}

let file_name = String::from(entry.file_name().to_str().unwrap_or(""));

if !file_name.starts_with(sink_name) {
return None;
}

Some(entry)
}).collect();

entries.sort_by(|a, b| b.file_name().cmp(&a.file_name()));

Ok(entries)
}

0 comments on commit 437e70f

Please sign in to comment.