Skip to content

Commit

Permalink
Using tracing for logs
Browse files Browse the repository at this point in the history
  • Loading branch information
angristan committed Mar 30, 2024
1 parent 6dfa4d1 commit 31050fe
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 39 deletions.
116 changes: 115 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
aimeqtt = { git = "https://github.com/angristan/aimeqtt", rev = "fa01364" }
aimeqtt = { git = "https://github.com/angristan/aimeqtt", rev = "dcfce13" }
# aimeqtt = { path = "../aimeqtt" }
async-stream = "0.3.5"
bincode = "1.3.3"
futures-util = "0.3.30"
Expand All @@ -15,3 +16,5 @@
serde = { version = "1.0.197", features = ["derive"] }
serialport = "4.3.0"
tokio = { version = "1", features = ["rt-multi-thread"] }
tracing = "0.1"
tracing-subscriber = "0.3"
47 changes: 44 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,41 @@
use futures_util::pin_mut;
use futures_util::stream::StreamExt;
use rppal::gpio::Gpio;
use std::env;
use std::thread;
use std::time::Duration;
use tracing::{event, Level};

mod mqtt;
mod serial;
mod teleinfo;

const GPIO_PITINFO_GREEN_LED: u8 = 4;

#[tokio::main]
async fn main() {
let log_level: tracing::Level = match env::var("LOG_LEVEL") {
Ok(level) => match level.to_lowercase().as_str() {
"trace" => tracing::Level::TRACE,
"debug" => tracing::Level::DEBUG,
"info" => tracing::Level::INFO,
"warn" => tracing::Level::WARN,
"error" => tracing::Level::ERROR,
_ => tracing::Level::INFO,
},
Err(_) => tracing::Level::INFO,
};

let subscriber = tracing_subscriber::fmt()
.compact()
.with_file(true)
.with_line_number(true)
.with_thread_ids(true)
.with_target(true)
.with_max_level(log_level)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

let mqtt_host = env::var("MQTT_HOST").expect("$MQTT_HOST is not set");
let mqtt_port = match env::var("MQTT_PORT") {
Ok(port) => port
Expand All @@ -29,6 +57,7 @@ async fn main() {
}

let client = aimeqtt::client::new(aimeqtt_options).await;
event!(Level::DEBUG, "MQTT client created");

let serial_stream = serial::stream::serial_stream(serial_port);
pin_mut!(serial_stream);
Expand All @@ -41,9 +70,21 @@ async fn main() {
pin_mut!(teleinfo_parsed_frames_stream);

while let Some(value) = teleinfo_parsed_frames_stream.next().await {
println!("=====================");
println!("{:?}", value);
match mqtt::publish::publish_teleinfo(&client, &value).await {
Ok(_) => {
let mut pin = Gpio::new()
.unwrap()
.get(GPIO_PITINFO_GREEN_LED)
.unwrap()
.into_output();

mqtt::publish::publish_teleinfo(&client, value);
pin.set_high();
thread::sleep(Duration::from_millis(10));
pin.set_low();
}
Err(e) => {
event!(Level::ERROR, error = ?e, "Error while publishing teleinfo frame to MQTT");
}
}
}
}
38 changes: 8 additions & 30 deletions src/mqtt/publish.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,12 @@
use crate::teleinfo::parser::TeleinfoFrame;
use aimeqtt::client::Client;
use rppal::gpio::Gpio;
use std::thread;
use std::time::Duration;
use tokio::task;
use aimeqtt::client::{Client, ClientError};
use tracing::{event, instrument, Level};

const GPIO_PITINFO_GREEN_LED: u8 = 4;
#[instrument(skip(client))]
pub async fn publish_teleinfo(client: &Client, value: &TeleinfoFrame) -> Result<(), ClientError> {
event!(Level::INFO, "Publishing teleinfo frame to MQTT");

pub fn publish_teleinfo(client: &Client, value: TeleinfoFrame) {
let client_clone = (*client).clone();
task::spawn(async move {
let publish_res = client_clone
.publish(format!("teleinfo/{}", value.adco), value.to_string())
.await;

match publish_res {
Ok(_) => {
println!("Published MQTT message");

let mut pin = Gpio::new()
.unwrap()
.get(GPIO_PITINFO_GREEN_LED)
.unwrap()
.into_output();

pin.set_high();
thread::sleep(Duration::from_millis(10));
pin.set_low();
}
Err(e) => eprintln!("Error: {:?}", e),
}
});
client
.publish(format!("teleinfo/{}", value.adco), value.to_string())
.await
}
10 changes: 8 additions & 2 deletions src/serial/stream.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use async_stream::stream;
use futures_util::stream::Stream;
use std::{io, time::Duration};
use tracing::{event, instrument, Level};

#[instrument]
pub fn serial_stream(port_device: String) -> impl Stream<Item = Vec<u8>> {
stream! {
let port = serialport::new(port_device, 1200)
.timeout(Duration::from_millis(1000)).data_bits(serialport::DataBits::Seven)
.open();

event!(Level::INFO, "Opening serial port");

match port {
Ok(mut port) => {
let mut serial_buf: Vec<u8> = vec![0; 1000];
Expand All @@ -17,12 +21,14 @@ pub fn serial_stream(port_device: String) -> impl Stream<Item = Vec<u8>> {
yield serial_buf[..t].to_vec()
},
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => (),
Err(e) => eprintln!("{:?}", e),
Err(e) => {
event!(Level::ERROR, "Failed to read from serial port. Error: {:?}", e);
}
}
}
}
Err(e) => {
eprintln!("Failed to open port. Error: {}", e);
event!(Level::ERROR, "Failed to open port. Error: {:?}", e);
::std::process::exit(1);
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/teleinfo/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use super::parser::TeleinfoFrame;
use async_stream::stream;
use futures_util::stream::Stream;
use futures_util::stream::StreamExt;
use tracing::event;
use tracing::instrument;
use tracing::Level;

pub fn ascii_to_frames<S: Stream<Item = Vec<u8>>>(ascii_stream: S) -> impl Stream<Item = String> {
let mut ascii_stream = Box::pin(ascii_stream);
Expand All @@ -29,19 +32,21 @@ pub fn ascii_to_frames<S: Stream<Item = Vec<u8>>>(ascii_stream: S) -> impl Strea
}
}

#[instrument(skip(frame_stream))]
pub fn frame_to_teleinfo<S: Stream<Item = String>>(
frame_stream: S,
) -> impl Stream<Item = TeleinfoFrame> {
let mut frame_stream = Box::pin(frame_stream);
stream! {
while let Some(value) = frame_stream.next().await {
println!("value frame_stream: {:?}", value);
let teleinfo = parser::parse_teleinfo(&value);
match teleinfo {
Ok(teleinfo) => {
yield teleinfo;
}
Err(e) => eprintln!("{:?}", e),
Err(e) => {
event!(Level::ERROR, "Failed to parse teleinfo frame: {:?}", e);
}
}
}
}
Expand Down

0 comments on commit 31050fe

Please sign in to comment.