-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Cumulocity Device Availability #2866
base: main
Are you sure you want to change the base?
Support Cumulocity Device Availability #2866
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files
|
Robot Results
|
If c8y.availability.enable is set to true, tedge-mapper-c8y publishes a SmartREST '117' message to set c8y_RequiredAvailability fragment for both main and child devices. If c8y.availability.enable is set to true AND c8y.availability.period is set to value > 0, tedge-mapper-c8y sends an empty inventory update message periodically. For the main device, the heartbeat timer starts during initialization. When the timer fires, the health status of tedge-agent is checked. If it is up, the heartbeat message will be sent. For child devices, the heartbeat timer starts when the entity is registered to the entity store. By default, like the main device case, the heartbeat message will be sent when tedge-agent for the child device is up. However, if `leadService` (EntityTopicId) is set to its metadata, tedge-mapper-c8y looks up the status of the leadService. Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
1. JSON deserialization should not include double-quotes for leadService value. 2. get_child_lead_service_entity_topic_id now returns Option. If the topic scheme is not default ("device" is missing in the first segment), it returns None. Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
4f7f600
to
4521a52
Compare
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
@@ -979,10 +998,16 @@ impl CumulocityConverter { | |||
channel: Channel, | |||
message: &MqttMessage, | |||
) -> Result<Vec<MqttMessage>, ConversionError> { | |||
let mut registration_messages: Vec<MqttMessage> = vec![]; | |||
let mut converted_messages: Vec<MqttMessage> = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All changes under this function try_convert_te_topics
will be changed after merging #2884. Ignore the changes in this function for now.
# c8y has time lag to change the availability state | ||
# that may cause false-negative result if we check c8y_Availability.status | ||
# also putting 'sleep 2m' a couple of times is not good :/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about replacing Sleep commands with loops that periodically check the desired status. This approach can reduce wait times and improve test reliability. for Example something like:
Wait Until Keyword Succeeds 2m 10s Device Should Have Fragment Values c8y_Availability.status=AVAILABLE
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
In runtime, if user updates entity metadata message to change the health endpoint, there can be this situation: one timer exists for the old health endpoint, and the other timer exists for the new endpoint. To avoid the old timer fires and gets triggered again, converter needs to know what is the map, which entity topic ID is interested in which health endpoint now. Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
|
||
/// Required interval, and period of the heartbeat only if enable is true | ||
#[tedge_config(example = "60", default(value = 60_i16))] | ||
period: i16, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
period: i16, | |
interval: i16, |
A better term in the context of heartbeats. We could even be more explicit and call it heartbeat_interval
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also better to say this interval is expected to be a number of seconds and to use the tedge-config type Seconds
.
@@ -230,6 +238,7 @@ pub struct CumulocityConverter { | |||
pub auth_proxy: ProxyUrlGenerator, | |||
pub uploader_sender: LoggingSender<IdUploadRequest>, | |||
pub downloader_sender: LoggingSender<IdDownloadRequest>, | |||
pub timer_sender: LoggingSender<SyncStart>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you're using this timer for a different purpose and not for the alarm sync, for which that SyncStart
alias was created, I'd suggest that you use a different alias for it, say Hearbeat
or Ping
or something better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think renaming SyncStart
as you suggested here solves your point.
@@ -239,6 +248,11 @@ pub struct CumulocityConverter { | |||
pub command_id: IdGenerator, | |||
// Keep active command IDs to avoid creation of multiple commands for an operation | |||
pub active_commands: HashSet<CmdId>, | |||
|
|||
// Keep the service statuses with topic name as a key for c8y_Availability monitoring | |||
pub health_status: HashMap<TopicWithoutPrefix, HealthStatus>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the relevance of the topic "without prefix" here? Anything wrong if something like the EntityTopicId
was used here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's different from EntityTopicId
. EntityTopicId
is restrict to 4 segments name like device/main/a/b
, however, the requirement here is addressing custom topic without prefix.
For example,
Original topic: te/custom/anything
, te/very/long/a/b/c/d/e/f
- Invalid as EntityTopicId
- Valid as the topic to monitor health (
custom/anything
,very/long/a/b/c/d/e/f
)
@@ -967,6 +988,9 @@ impl CumulocityConverter { | |||
) -> Result<Vec<MqttMessage>, ConversionError> { | |||
debug!("Mapping message on topic: {}", message.topic.name); | |||
trace!("Message content: {:?}", message.payload_str()); | |||
|
|||
record_health_status(&self.mqtt_schema, &mut self.health_status, message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The placement of that function call here doesn't feel right. From the logic, it seems like it's only meant to be called on HealthStatus
messages. In that case why not do this from the process_health_status_message
handle rather than calling it on every Mqtt message as done here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because process_health_status_message
can address only Channel::Health
topics, meaning, the topic must have te/{valid_entity_topic_id}/status/health
scheme. But the requirement is to accept any topic as health endpoint as long as it has te
(can be custom root) prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To support any random topic, that doesn't even have to be a thin-edge topic (not even with custom prefix and custom topic scheme), then we need to make the mapper subscribe to those random topics as well, so that it is aware of the health updates published to those topics. This would mean that we need to dynamically subscribe to those "open topics" as and when they are registered, which won't be easy as our MQTT actor currently doesn't support dynamic subscriptions. The other alternative would be for the mapper to subscribe to '#' and then process only the ones that are declared by the @health
endpoints. Since the mapper has a persistent connection with the broker, it will unnecessarily increase the load on the broker to queue a lot of unnecessary data for the mapper, most of which won't even be processed.
So, my advice would be to restrict this feature to thin-edge health endpoints only for now (customers are still free to use custom prefixes and topic schemes), and expand it later to any random topics, once we add support for dynamic subscriptions. @reubenmiller Are you okay with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No the @health
value will still use the root_prefix (default being te/
), so at least we need to subscribe for anything more than we are already doing.
The though process was to include the explict /status/health
portion of the topic is more explicit, rather than having unexpected "status/health" added to the @health
value.
Though I guess if someone chooses to uses something like hello/world/status
(which will be interpreted as te/hello/world/status
, then thin-edge.io won't receive any messages from this topic (as it does not match any of the existing local mqtt topic subscriptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No the @health value will still use the root_prefix (default being te/), so at least we need to subscribe for anything more than we are already doing.
So, it is guaranteed that the @health
value would be the health endpoint of some entity, right? It could be using a custom topic scheme, and even a custom prefix, as long as tedge is also configured to use the same prefix. In that case, things are better. The PR description stated that the value @health
value can/be/any/custom/topic
, that doesn't even conform to the above mentioned requirements. My concerns were about trying to be so open to any random topic. As long as it is a valid thin-edge health status topic, we're good.
The though process was to include the explict /status/health portion of the topic is more explicit, rather than having unexpected "status/health" added to the @health value.
I think there's some misunderstanding here. @rina23q implemented this contract with the assumption that we can't assume the @health
value to be on the health/status
channel, but rather can be any other random topic as well. If we're restricting the value to the the health/status
channel, then all good. That's exactly what I was expecting as well.
Though I guess if someone chooses to uses something like hello/world/status (which will be interpreted as te/hello/world/status, then thin-edge.io won't receive any messages from this topic (as it does not match any of the existing local mqtt topic subscriptions?
Correct. Since the mapper currently subscribes specifically to te/+/+/+/+/<channel>/<data-type>
topics, Something published to a 4-part topic like te/hello/world/status
won't even be received, as thin-edge expects at least a 5-part topic.
TO SUMMARISE my current understanding is as follows: The requirement was not to support any random topic with any prefix and any number of topic levels (e.g: a/b/c/d/c/e/f/g/h
or a/b/c
), but only to expect valid health status messages on explicit health status channel itself, using any valid custom prefix and topic scheme (the same that is configured with tedge). Correct me if this understanding is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the original requirement was to support any random topic (under the root_prefix). But since this opens up the problem that the mapper is not subscribing to a wildcard topic {root_prefix}/#
, then you're right we won't get the notifications on the topic specified by @health
.
Then let's change the requirement so that the @health
should refer to an target entity (e.g. device/main/service/tedge-agent
etc.)...then the agent implies that the /status/health
topic suffix. This means the @health
property would follow be of a similar format to the @parent
field that is used in custom entity registration messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then let's change the requirement so that the
@health
should refer to an target entity (e.g.device/main/service/tedge-agent
etc.)...then the agent implies that the/status/health
topic suffix. This means the@health
property would follow be of a similar format to the@parent
field that is used in custom entity registration messages.
I agree that this is the way to go. And not because the former desire to support random topics was adding too many technical constraints on the design.
Having the @heath
property be an entity identifier makes sense and even more sense than a random topic:
- even if a child device is not running
tedge-agent
, this child device will have in any cases to define and populatete
topics for its data and services: hence this is not restrictive - the
@heath
property is the identifier of the entity, more likely a service but not necessarily, used to monitor the health of the device: this makes sense from a user perspective - this is aligned to all the other proposals related to MQTT topics: easier to understand and implement.
@@ -1251,6 +1327,7 @@ impl CumulocityConverter { | |||
&mut self, | |||
message: &MqttMessage, | |||
) -> Result<Vec<MqttMessage>, ConversionError> { | |||
dbg!(&message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget to remove that.
pub type SyncStart = SetTimeout<TimeoutKind>; | ||
pub type SyncComplete = Timeout<TimeoutKind>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Sync
is just a kind of Timeout
as per this updated scheme, I wouldn't call these SyncStart
and SyncComplete
, but rather use some some generic names like TimerStart
and TimerComplete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll rename it
|
||
start_heartbeat_timer( | ||
self.timer_sender.clone(), | ||
interval, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the same interval for the timer that we've set as the c8y_RequiredAvailability
interval would lead to a lot of missed heartbeats. For e.g, if the interval is set to 30 mins and if we sleep for 30 minutes here using the timer, by the time we process the heartbeat response from the timer and send a response to the cloud, it definitely would take more than 30 mins, by then the availability interval in the cloud would have timed out. So, IMO, we should send heartbeat intervals more often than what the configured values is.
use tracing::debug; | ||
use tracing::info; | ||
|
||
pub type TopicWithoutPrefix = String; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use the EntityTopicId
of the lead service, as the health channel info in them are just redundant?
*** Test Cases *** | ||
|
||
c8y_RequiredAvailability is set by default | ||
Execute Command ./bootstrap.sh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Execute Command ./bootstrap.sh | |
Setup |
Why not just that I wonder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setup
does a lot more than just bootstrapping a device. When using the docker adapter it will spawn a new container. The Test Setup
is already creating the container (but with bootstrapping disabled), as the tests look to require fine-grain control over the initial connection.
pub fn get_lead_service_topic(entity: &EntityMetadata) -> Result<TopicWithoutPrefix, String> { | ||
match entity.other.get("@health") { | ||
Some(maybe_topic_name) => match maybe_topic_name.as_str() { | ||
Some(topic_name) if Topic::new(topic_name).is_ok() => Ok(topic_name.to_string()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we should validate that the value is a valid entity topic id. I'd have validated that against the entity store as well, but may be that's too early, as the lead service may not be registered by then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lead service can be any MQTT topic, that's why the type is not EntityTopicId
and also no querying to entity store.
/// The interval can be <=0, which means the device is in maintenance mode in the c8y context. | ||
/// Details: https://cumulocity.com/docs/device-integration/fragment-library/#device-availability |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment should not be hidden, but made public in tedge config list --all --doc
.
availability: {
/// Heartbeat interval in seconds. A value of 0 means the device is in maintenance mode.
#[tedge_config(example = "60", default(value = 60_u64))]
heartbeat_interval: Seconds,
use tracing::debug; | ||
use tracing::info; | ||
|
||
pub type TopicWithoutPrefix = String; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by without prefix?
- Is this something like
device/main///m/
? - What's the purpose?
- Is this because having an
MqttSchema
with the appropriate root prefix is a pain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to avoid adding responsibilities to this already overly bloated CumulocityConverter
.
Device availability should be a monitored by an actor that is fully independent of C8yMapperActor
and CumulocityConverter
. I would implement this feature in a fully independent extension.
The device availability monitor needs to subscribe for entity registration messages and heath topic for this entities (as defined by the @health" property of their registration). This actor doesn't need to share data with the c8y converter and have simply to store mapping as those proposed here (
timer_map,
status_map`) to publish heartbeat at a regular pace.
The only part which is c8y related are the topics and format for these heartbeats.
Move to draft status as I'm planning to re-write code from scratch. However, it's worth keeping unclosed until the whole work is done in order to keep the conversation history. |
Proposed changes
Todo:
leadService
field@health
design"Lead Service" design decisions
The "lead service" is meant to which service status
tedge-mapper-c8y
looks up to judge if the device is "available" or not.te/device/{device_name}/service/tedge-agent/status/health
@health
key. For example,Topic:
te/device/{device_name}//
@health
,tedge-mapper-c8y
logs it asinfo
and not sending a heartbeat.tedge-mapper-c8y
logs it asinfo
and not sending a heartbeat.Types of changes
Paste Link to the issue
#2842
Checklist
cargo fmt
as mentioned in CODING_GUIDELINEScargo clippy
as mentioned in CODING_GUIDELINESFurther comments