Skip to content
This repository has been archived by the owner on Nov 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #16 from runabove/labels
Browse files Browse the repository at this point in the history
Labels
  • Loading branch information
d33d33 committed Jan 24, 2017
2 parents 437e70f + e5eddd8 commit caf43c5
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "beamium"
version = "1.2.0"
version = "1.2.1"
authors = [ "d33d33 <kevin.georges@corp.ovh.com>" ]

build = "build.rs"
Expand Down
13 changes: 7 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,23 @@ fn main() {
// Spawn sources
info!("spawning sources");
for source in config.sources {
let (labels, parameters, sigint) =
(config.labels.clone(), config.parameters.clone(), sigint.clone());
let (parameters, sigint) = (config.parameters.clone(), sigint.clone());
handles.push(thread::spawn(move || {
slog_scope::scope(slog_scope::logger().new(o!("source" => source.name.clone())),
|| source::source(&source, &labels, &parameters, sigint));
|| source::source(&source, &parameters, sigint));
}));
}

// Spawn router
info!("spawning router");
{
let (sinks, parameters, sigint) =
(config.sinks.clone(), config.parameters.clone(), sigint.clone());
let (sinks, labels, parameters, sigint) = (config.sinks.clone(),
config.labels.clone(),
config.parameters.clone(),
sigint.clone());
handles.push(thread::spawn(move || {
slog_scope::scope(slog_scope::logger().new(o!()),
|| router::router(&sinks, &parameters, sigint));
|| router::router(&sinks, &labels, &parameters, sigint));
}));
}

Expand Down
81 changes: 62 additions & 19 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use time;
use std::cmp;
use std::collections::HashMap;
use std::io::prelude::*;
use std::fs;
use std::fs::File;
Expand All @@ -21,12 +22,20 @@ const REST_TIME: u64 = 10;

/// Router loop.
pub fn router(sinks: &Vec<config::Sink>,
labels: &HashMap<String, String>,
parameters: &config::Parameters,
sigint: Arc<AtomicBool>) {

let labels: String = labels.iter()
.fold(String::new(), |acc, (k, v)| {
let sep = if acc.is_empty() { "" } else { "," };
acc + sep + k + "=" + v
});

loop {
let start = time::now_utc();

match route(sinks, parameters) {
match route(sinks, parameters, &labels) {
Err(err) => error!("route fail: {}", err),
Ok(_) => info!("route success"),
}
Expand All @@ -47,12 +56,15 @@ pub fn router(sinks: &Vec<config::Sink>,
}

/// Route handle sources forwarding.
fn route(sinks: &Vec<config::Sink>, parameters: &config::Parameters) -> Result<(), Box<Error>> {
fn route(sinks: &Vec<config::Sink>,
parameters: &config::Parameters,
labels: &String)
-> Result<(), Box<Error>> {
debug!("route");
loop {
let entries = try!(fs::read_dir(&parameters.source_dir));
let mut files = Vec::with_capacity(parameters.batch_count as usize);
let mut metrics = Vec::with_capacity(parameters.batch_count as usize);
let mut metrics: Vec<String> = Vec::new();

// Load metrics
let mut batch_size = 0;
Expand All @@ -77,9 +89,42 @@ fn route(sinks: &Vec<config::Sink>, parameters: &config::Parameters) -> Result<(
Ok(v) => v,
};

for line in file.lines() {
if labels.is_empty() {
metrics.push(String::from(line));
continue;
}
let mut parts = line.splitn(2, "{");

let class = match parts.next() {
None => {
warn!("no_class");
continue;
}
Some(v) => v,
};
let class = String::from(class);
let plabels = match parts.next() {
None => {
warn!("no_labels");
continue;
}
Some(v) => v,
};
let plabels = String::from(plabels);

let slabels = labels.clone() +
if plabels.trim().starts_with("}") {
""
} else {
","
} + &plabels;

metrics.push(format!("{}{{{}", class, slabels))
}

files.push(entry.path());
batch_size += file.len();
metrics.push(file);
}

// Nothing to do
Expand All @@ -100,24 +145,22 @@ fn route(sinks: &Vec<config::Sink>, parameters: &config::Parameters) -> Result<(

// Write metrics
debug!("write sink files");
for m in metrics {
for line in m.lines() {
if line.is_empty() {
continue;
}
for line in metrics {
if line.is_empty() {
continue;
}

for (i, sink) in sinks.iter().enumerate() {
if sink.selector.is_some() {
let selector = sink.selector.as_ref().unwrap();
if line.split_whitespace()
.nth(1)
.map_or(false, |class| selector.is_match(class)) {
continue;
}
for (i, sink) in sinks.iter().enumerate() {
if sink.selector.is_some() {
let selector = sink.selector.as_ref().unwrap();
if line.split_whitespace()
.nth(1)
.map_or(false, |class| selector.is_match(class)) {
continue;
}
try!(sink_files[i].write(line.as_bytes()));
try!(sink_files[i].write(b"\n"));
}
try!(sink_files[i].write(line.as_bytes()));
try!(sink_files[i].write(b"\n"));
}
}

Expand Down
57 changes: 7 additions & 50 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use time;
use std::cmp;
use hyper;
use std::io::prelude::*;
use std::collections::HashMap;
use std::fs;
use std::fs::File;
use std::error::Error;
Expand All @@ -21,20 +20,11 @@ use config;
const REST_TIME: u64 = 10;

/// Source loop.
pub fn source(source: &config::Source,
labels: &HashMap<String, String>,
parameters: &config::Parameters,
sigint: Arc<AtomicBool>) {
let labels: String = labels.iter()
.fold(String::new(), |acc, (k, v)| {
let sep = if acc.is_empty() { "" } else { "," };
acc + sep + k + "=" + v
});

pub fn source(source: &config::Source, parameters: &config::Parameters, sigint: Arc<AtomicBool>) {
loop {
let start = time::now_utc();

match fetch(source, &labels, parameters) {
match fetch(source, parameters) {
Err(err) => error!("fetch fail: {}", err),
Ok(_) => info!("fetch success"),
}
Expand All @@ -55,10 +45,7 @@ pub fn source(source: &config::Source,
}

/// Fetch retrieve metrics from Prometheus.
fn fetch(source: &config::Source,
labels: &String,
parameters: &config::Parameters)
-> Result<(), Box<Error>> {
fn fetch(source: &config::Source, parameters: &config::Parameters) -> Result<(), Box<Error>> {
debug!("fetch {}", &source.url);

// Fetch metrics
Expand Down Expand Up @@ -87,17 +74,9 @@ fn fetch(source: &config::Source,

for line in body.lines() {
let line = match source.format {
config::SourceFormat::Sensision => {
match format_sensision(line.trim(), labels) {
Err(_) => {
warn!("bad row {}", &line);
continue;
}
Ok(v) => v,
}
},
config::SourceFormat::Sensision => String::from(line.trim()),
config::SourceFormat::Prometheus => {
match format_prometheus(line.trim(), labels, now) {
match format_prometheus(line.trim(), now) {
Err(_) => {
warn!("bad row {}", &line);
continue;
Expand Down Expand Up @@ -133,7 +112,7 @@ fn fetch(source: &config::Source,
}

/// Format Warp10 metrics from Prometheus one.
fn format_prometheus(line: &str, labels: &String, now: i64) -> Result<String, Box<Error>> {
fn format_prometheus(line: &str, now: i64) -> Result<String, Box<Error>> {
// Skip comments
if line.starts_with("#") {
return Ok(String::new());
Expand Down Expand Up @@ -161,7 +140,7 @@ fn format_prometheus(line: &str, labels: &String, now: i64) -> Result<String, Bo
let mut parts = class.splitn(2, "{");
let class = String::from(try!(parts.next().ok_or("no_class")));
let plabels = parts.next();
let mut slabels = if plabels.is_some() {
let slabels = if plabels.is_some() {
let mut labels = plabels.unwrap().split("\",")
.map(|v| v.replace("=", "%3D")) // escape
.map(|v| v.replace("%3D\"", "=")) // remove left double quote
Expand All @@ -184,29 +163,7 @@ fn format_prometheus(line: &str, labels: &String, now: i64) -> Result<String, Bo
String::new()
};

if !labels.is_empty() {
if !slabels.is_empty() {
slabels += ",";
}
slabels += labels;
}

let class = format!("{}{{{}}}", class, slabels);

Ok(format!("{}// {} {}", timestamp, class, value))
}

/// Format Warp10 metrics from Sensision one.
fn format_sensision(line: &str, labels: &String) -> Result<String, Box<Error>> {
if labels.is_empty() {
return Ok(String::from(line));
}
let mut parts = line.splitn(2, "{");

let class = String::from(try!(parts.next().ok_or("no_class")));
let plabels = String::from(try!(parts.next().ok_or("no_labels")));

let slabels = labels.clone() + if plabels.trim().starts_with("}") {""} else {","} + &plabels;

Ok(format!("{}{{{}", class, slabels))
}

0 comments on commit caf43c5

Please sign in to comment.