Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using ESP8266 as MQTT broker and datalogger #77

Open
theRosyProject opened this issue Apr 1, 2024 · 5 comments
Open

Using ESP8266 as MQTT broker and datalogger #77

theRosyProject opened this issue Apr 1, 2024 · 5 comments

Comments

@theRosyProject
Copy link

Thank you for developing this library. I've been using it to program ESP8266 devices with ESPHome for sensor data collection (including temperature, relative humidity, and air quality) and to transmit the data as MQTT payloads to an ESP8266 MQTT broker that also serves as the WiFi access point. This setup works flawlessly. However, I am now looking to enhance it by processing and storing the incoming messages on an SD card. Indeed, while I can handle the data in Python on my ubuntu system by reading the messages (mosquito_sub) and storing them in a dataframe, I am also interested in implementing this functionality directly on the ESP8266, saving measurements in an SD card and eventually using the sd.h library which I've used in the past. This would involve accessing the payload and storing it as a character array within the loop function. Since I am not very familiar with C, I am uncertain whether a function already exists in your library that I could use for this purpose or if I need to modify the onData() function in the uMQTTBroker.cpp file(?). Any guidance or suggestions you can provide would be greatly appreciated. Thank you for your support and again, great library! Best regards.

@theRosyProject
Copy link
Author

theRosyProject commented Apr 2, 2024

I was able to achieve the desired result of accessing the last published payload in the serial output. I've used a mixed trial and error approach with the help of ChatGPT, with the following modification of the uMQTTBroker.h and uMQTTBroker.cpp files. Attaching the final version of the files

@theRosyProject
Copy link
Author

#include "uMQTTBroker.h"
#include "espconn.h"

uMQTTBroker *uMQTTBroker::TheBroker;

// Get access to last payload F.Dallo
char uMQTTBroker::lastPayload[uMQTTBroker::MAX_PAYLOAD_SIZE] = {0};

const char* uMQTTBroker::getLastPayload() {
    return lastPayload;
}

void uMQTTBroker::clearLastPayload() {
    lastPayload[0] = '\0';
}

// end F.Dallo

bool uMQTTBroker::_onConnect(struct espconn *pesp_conn, uint16_t client_count) {
IPAddress connAddr(pesp_conn->proto.tcp->remote_ip[0], pesp_conn->proto.tcp->remote_ip[1],
		   pesp_conn->proto.tcp->remote_ip[2], pesp_conn->proto.tcp->remote_ip[3]);

return TheBroker->onConnect(connAddr, client_count);
}

void uMQTTBroker::_onDisconnect(struct espconn *pesp_conn, const char *client_id) {
IPAddress connAddr(pesp_conn->proto.tcp->remote_ip[0], pesp_conn->proto.tcp->remote_ip[1],
		   pesp_conn->proto.tcp->remote_ip[2], pesp_conn->proto.tcp->remote_ip[3]);

    TheBroker->onDisconnect(connAddr, (String)client_id);
}

bool uMQTTBroker::_onAuth(const char* username, const char *password, const char* client_id, struct espconn *pesp_conn) {
return TheBroker->onAuth((String)username, (String)password, (String)client_id);
}

void uMQTTBroker::_onData(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t length) {
char topic_str[topic_len+1];

os_memcpy(topic_str, topic, topic_len);
topic_str[topic_len] = '\0';

// Get access to last payload F.Dallo
length = (length < MAX_PAYLOAD_SIZE) ? length : MAX_PAYLOAD_SIZE - 1;
memcpy(lastPayload, data, length);
lastPayload[length] = '\0';
//
TheBroker->onData((String)topic_str, data, length);

}

uMQTTBroker::uMQTTBroker(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics) {
TheBroker = this;
_portno = portno;
_max_subscriptions = max_subscriptions;
_max_retained_topics = max_retained_topics;

MQTT_server_onConnect(_onConnect);
MQTT_server_onDisconnect(_onDisconnect);
MQTT_server_onAuth(_onAuth);
MQTT_server_onData(_onData);
}

void uMQTTBroker::init() {
MQTT_server_start(_portno, _max_subscriptions, _max_retained_topics);
}

bool uMQTTBroker::onConnect(IPAddress addr, uint16_t client_count) {
return true;
}

void uMQTTBroker::onDisconnect(IPAddress addr, String client_id) {
return;
}

bool uMQTTBroker::onAuth(String username, String password, String client_id) {
return true;
}

void uMQTTBroker::onData(String topic, const char *data, uint32_t length) {
}

bool uMQTTBroker::publish(String topic, uint8_t* data, uint16_t data_length, uint8_t qos, uint8_t retain) {
return MQTT_local_publish((uint8_t*)topic.c_str(), data, data_length, qos, retain);
}

uint16_t uMQTTBroker::getClientCount() {
return MQTT_server_countClientCon();
}

bool uMQTTBroker::getClientId(uint16_t index, String &client_id) {
    const char *c = MQTT_server_getClientId(index);
    if (c == NULL)
        return false;
    client_id = c;
    return true;
}

bool uMQTTBroker::getClientAddr(uint16_t index, IPAddress& addr) {
    const struct espconn* pesp_conn = MQTT_server_getClientPcon(index);
    if (pesp_conn == NULL)
        return false;
    addr = pesp_conn->proto.tcp->remote_ip;
    return true;
}

bool uMQTTBroker::publish(String topic, String data, uint8_t qos, uint8_t retain) {
return MQTT_local_publish((uint8_t*)topic.c_str(), (uint8_t*)data.c_str(), data.length(), qos, retain);
}

bool uMQTTBroker::subscribe(String topic, uint8_t qos) {
return MQTT_local_subscribe((uint8_t*)topic.c_str(), qos);
}

bool uMQTTBroker::unsubscribe(String topic) {
return MQTT_local_unsubscribe((uint8_t*)topic.c_str());
}

void uMQTTBroker::cleanupClientConnections() {
MQTT_server_cleanupClientCons();
}

@theRosyProject
Copy link
Author

#ifndef MQTT_SERVER_H
#define MQTT_SERVER_H

#include "user_interface.h"
#include "IPAddress.h"
#include "string.h"

#ifndef ipv4_addr_t
#define ipv4_addr_t ip_addr_t
#endif

extern "C" {

// Interface for starting the broker

bool MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics);

// Callbacks for message reception, username/password authentication, and client connection

typedef void (*MqttDataCallback)(uint32_t args, const char topic, uint32_t topic_len, const char *data, uint32_t lengh);
typedef bool (MqttAuthCallback)(const char username, const char *password, const char *client_id, struct espconn *pesp_conn);
typedef bool (*MqttConnectCallback)(struct espconn *pesp_conn, uint16_t client_count);
typedef void (*MqttDisconnectCallback)(struct espconn *pesp_conn, const char *client_id);

void MQTT_server_onData(MqttDataCallback dataCb);
void MQTT_server_onAuth(MqttAuthCallback authCb);
void MQTT_server_onConnect(MqttConnectCallback connectCb);
void MQTT_server_onDisconnect(MqttDisconnectCallback disconnectCb);

// Interface for local pub/sub interaction with the broker

bool MQTT_local_publish(uint8_t* topic, uint8_t* data, uint16_t data_length, uint8_t qos, uint8_t retain);
bool MQTT_local_subscribe(uint8_t* topic, uint8_t qos);
bool MQTT_local_unsubscribe(uint8_t* topic);

// Interface to cleanup after STA disconnect

void MQTT_server_cleanupClientCons();

// Interface for persistence of retained topics
// Topics can be serialized to a buffer and reinitialized later after reboot
// Application is responsible for saving and restoring that buffer (i.e. to/from flash)

void clear_retainedtopics();
int serialize_retainedtopics(char *buf, int len);
bool deserialize_retainedtopics(char *buf, int len);

// Interface for getting some infos on the currently connected clients
// MQTT_server_getClientId() and MQTT_server_getClientPcon() return NULL on invalid indices

uint16_t MQTT_server_countClientCon();
const char* MQTT_server_getClientId(uint16_t index);
const struct espconn* MQTT_server_getClientPcon(uint16_t index);
}

class uMQTTBroker
{
private:
static uMQTTBroker *TheBroker;
uint16_t _portno;
uint16_t _max_subscriptions;
uint16_t _max_retained_topics;

static bool _onConnect(struct espconn *pesp_conn, uint16_t client_count);
static void _onDisconnect(struct espconn *pesp_conn, const char *client_id);
static bool _onAuth(const char* username, const char *password,  const char *client_id, struct espconn *pesp_conn);
static void _onData(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t length);

// Get access to last payload F.Dallo
static const int MAX_PAYLOAD_SIZE = 512;
static char lastPayload[MAX_PAYLOAD_SIZE];

public:
uMQTTBroker(uint16_t portno=1883, uint16_t max_subscriptions=30, uint16_t max_retained_topics=30);

void init();

// Callbacks on client actions

virtual bool onConnect(IPAddress addr, uint16_t client_count);
virtual void onDisconnect(IPAddress addr, String client_id);
virtual bool onAuth(String username, String password, String client_id);
virtual void onData(String topic, const char *data, uint32_t length);

// Infos on currently connected clients

virtual uint16_t getClientCount();
virtual bool getClientId(uint16_t index, String &client_id);
virtual bool getClientAddr(uint16_t index, IPAddress& addr);

// Interaction with the local broker

virtual bool publish(String topic, uint8_t* data, uint16_t data_length, uint8_t qos=0, uint8_t retain=0);
virtual bool publish(String topic, String data, uint8_t qos=0, uint8_t retain=0);
virtual bool subscribe(String topic, uint8_t qos=0);
virtual bool unsubscribe(String topic);

// Get access to last payload F.Dallo
static const char* getLastPayload();
static void clearLastPayload();

// Cleanup all clients on Wifi connection loss

void cleanupClientConnections();

};

#endif /* MQTT_SERVER_H */

1 similar comment
@theRosyProject
Copy link
Author

#ifndef MQTT_SERVER_H
#define MQTT_SERVER_H

#include "user_interface.h"
#include "IPAddress.h"
#include "string.h"

#ifndef ipv4_addr_t
#define ipv4_addr_t ip_addr_t
#endif

extern "C" {

// Interface for starting the broker

bool MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics);

// Callbacks for message reception, username/password authentication, and client connection

typedef void (*MqttDataCallback)(uint32_t args, const char topic, uint32_t topic_len, const char *data, uint32_t lengh);
typedef bool (MqttAuthCallback)(const char username, const char *password, const char *client_id, struct espconn *pesp_conn);
typedef bool (*MqttConnectCallback)(struct espconn *pesp_conn, uint16_t client_count);
typedef void (*MqttDisconnectCallback)(struct espconn *pesp_conn, const char *client_id);

void MQTT_server_onData(MqttDataCallback dataCb);
void MQTT_server_onAuth(MqttAuthCallback authCb);
void MQTT_server_onConnect(MqttConnectCallback connectCb);
void MQTT_server_onDisconnect(MqttDisconnectCallback disconnectCb);

// Interface for local pub/sub interaction with the broker

bool MQTT_local_publish(uint8_t* topic, uint8_t* data, uint16_t data_length, uint8_t qos, uint8_t retain);
bool MQTT_local_subscribe(uint8_t* topic, uint8_t qos);
bool MQTT_local_unsubscribe(uint8_t* topic);

// Interface to cleanup after STA disconnect

void MQTT_server_cleanupClientCons();

// Interface for persistence of retained topics
// Topics can be serialized to a buffer and reinitialized later after reboot
// Application is responsible for saving and restoring that buffer (i.e. to/from flash)

void clear_retainedtopics();
int serialize_retainedtopics(char *buf, int len);
bool deserialize_retainedtopics(char *buf, int len);

// Interface for getting some infos on the currently connected clients
// MQTT_server_getClientId() and MQTT_server_getClientPcon() return NULL on invalid indices

uint16_t MQTT_server_countClientCon();
const char* MQTT_server_getClientId(uint16_t index);
const struct espconn* MQTT_server_getClientPcon(uint16_t index);
}

class uMQTTBroker
{
private:
static uMQTTBroker *TheBroker;
uint16_t _portno;
uint16_t _max_subscriptions;
uint16_t _max_retained_topics;

static bool _onConnect(struct espconn *pesp_conn, uint16_t client_count);
static void _onDisconnect(struct espconn *pesp_conn, const char *client_id);
static bool _onAuth(const char* username, const char *password,  const char *client_id, struct espconn *pesp_conn);
static void _onData(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t length);

// Get access to last payload F.Dallo
static const int MAX_PAYLOAD_SIZE = 512;
static char lastPayload[MAX_PAYLOAD_SIZE];

public:
uMQTTBroker(uint16_t portno=1883, uint16_t max_subscriptions=30, uint16_t max_retained_topics=30);

void init();

// Callbacks on client actions

virtual bool onConnect(IPAddress addr, uint16_t client_count);
virtual void onDisconnect(IPAddress addr, String client_id);
virtual bool onAuth(String username, String password, String client_id);
virtual void onData(String topic, const char *data, uint32_t length);

// Infos on currently connected clients

virtual uint16_t getClientCount();
virtual bool getClientId(uint16_t index, String &client_id);
virtual bool getClientAddr(uint16_t index, IPAddress& addr);

// Interaction with the local broker

virtual bool publish(String topic, uint8_t* data, uint16_t data_length, uint8_t qos=0, uint8_t retain=0);
virtual bool publish(String topic, String data, uint8_t qos=0, uint8_t retain=0);
virtual bool subscribe(String topic, uint8_t qos=0);
virtual bool unsubscribe(String topic);

// Get access to last payload F.Dallo
static const char* getLastPayload();
static void clearLastPayload();

// Cleanup all clients on Wifi connection loss

void cleanupClientConnections();

};

#endif /* MQTT_SERVER_H */

@theRosyProject
Copy link
Author

theRosyProject commented Apr 2, 2024

Then in the Arduino code setup:

myBroker.subscribe("windsled/thespecifictopicofinterest");

and loop:

// Process the last received payload
const char* payload = uMQTTBroker::getLastPayload();
if (payload[0] != '\0') { // Check if there's a new payload
Serial.println(payload); // Example: print payload
// Clear the payload after processing to avoid processing it multiple times
uMQTTBroker::clearLastPayload();
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant