Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Cumulocity Device Availability #2866

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

rina23q
Copy link
Member

@rina23q rina23q commented May 8, 2024

Proposed changes

Todo:

  • Update all affected unit tests by this change (3764aa7)
  • Clarify two design decisions all regarding
  • Try the usecase: child metadata has the leadService field
  • Add dedicated unit tests for this feature
  • Add dedicated robot tests for this feature
  • Update the code according to the @health design
  • Doc, probably in different PR as this PR is already huge

"Lead Service" design decisions
The "lead service" is meant to which service status tedge-mapper-c8y looks up to judge if the device is "available" or not.

  • By default, using te/device/{device_name}/service/tedge-agent/status/health
  • Can be customerized by device registration message with @health key. For example,
    Topic: te/device/{device_name}//
{
  "@health": "can/be/any/custom/topic"
}
  • If the health status is not reported to the given topic@health, tedge-mapper-c8y logs it as info and not sending a heartbeat.
  • If the given topic is invalid as MQTT topic, tedge-mapper-c8y logs it as info and not sending a heartbeat.

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue

#2842

Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s)
  • I ran cargo fmt as mentioned in CODING_GUIDELINES
  • I used cargo clippy as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

@rina23q rina23q temporarily deployed to Test Pull Request May 10, 2024 18:15 — with GitHub Actions Inactive
Copy link

codecov bot commented May 10, 2024

Codecov Report

Attention: Patch coverage is 90.70968% with 72 lines in your changes are missing coverage. Please review.

Project coverage is 78.2%. Comparing base (644cd6f) to head (a802e6a).
Report is 19 commits behind head on main.

Current head a802e6a differs from pull request most recent head 4ea0722

Please upload reports for the commit 4ea0722 to get more accurate results.

Additional details and impacted files
Files Coverage Δ
crates/core/c8y_api/src/smartrest/inventory.rs 61.6% <100.0%> (+4.8%) ⬆️
crates/extensions/c8y_mapper_ext/src/lib.rs 88.8% <ø> (ø)
...es/extensions/c8y_mapper_ext/src/operations/mod.rs 92.5% <100.0%> (ø)
...s/extensions/c8y_mapper_ext/src/service_monitor.rs 96.4% <100.0%> (+<0.1%) ⬆️
...s/c8y_mapper_ext/src/operations/firmware_update.rs 90.5% <94.7%> (-0.2%) ⬇️
...nsions/c8y_mapper_ext/src/operations/log_upload.rs 89.9% <80.0%> (-0.5%) ⬇️
.../tedge_config/src/tedge_config_cli/tedge_config.rs 76.4% <72.7%> (-0.1%) ⬇️
...ates/extensions/c8y_mapper_ext/src/availability.rs 97.9% <97.9%> (ø)
crates/extensions/c8y_mapper_ext/src/config.rs 47.6% <44.4%> (-0.2%) ⬇️
crates/extensions/c8y_mapper_ext/src/converter.rs 84.0% <91.7%> (+0.3%) ⬆️
... and 2 more

... and 11 files with indirect coverage changes

Copy link
Contributor

github-actions bot commented May 10, 2024

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
443 0 3 443 100 1h9m43.004998s

If c8y.availability.enable is set to true, tedge-mapper-c8y publishes
a SmartREST '117' message to set c8y_RequiredAvailability fragment for
both main and child devices.

If c8y.availability.enable is set to true AND c8y.availability.period is
set to value > 0, tedge-mapper-c8y sends an empty inventory update
message periodically.

For the main device, the heartbeat timer starts during initialization.
When the timer fires, the health status of tedge-agent is checked. If
it is up, the heartbeat message will be sent.

For child devices, the heartbeat timer starts when the entity is
registered to the entity store. By default, like the main device case,
the heartbeat message will be sent when tedge-agent for the child device
is up. However, if `leadService` (EntityTopicId) is set to its metadata,
tedge-mapper-c8y looks up the status of the leadService.

Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
1. JSON deserialization should not include double-quotes for leadService
value.
2. get_child_lead_service_entity_topic_id now returns Option. If the
topic scheme is not default ("device" is missing in the first segment),
it returns None.

Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
@rina23q rina23q force-pushed the feature/2842/support-c8y-device-availability branch from 4f7f600 to 4521a52 Compare May 15, 2024 21:01
@rina23q rina23q temporarily deployed to Test Pull Request May 15, 2024 21:01 — with GitHub Actions Inactive
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
@rina23q rina23q temporarily deployed to Test Pull Request May 21, 2024 16:19 — with GitHub Actions Inactive
@rina23q rina23q marked this pull request as ready for review May 21, 2024 16:20
@rina23q rina23q requested review from jarhodes314 and a team as code owners May 21, 2024 16:20
@@ -979,10 +998,16 @@ impl CumulocityConverter {
channel: Channel,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
let mut registration_messages: Vec<MqttMessage> = vec![];
let mut converted_messages: Vec<MqttMessage> = vec![];
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All changes under this function try_convert_te_topics will be changed after merging #2884. Ignore the changes in this function for now.

# c8y has time lag to change the availability state
# that may cause false-negative result if we check c8y_Availability.status
# also putting 'sleep 2m' a couple of times is not good :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about replacing Sleep commands with loops that periodically check the desired status. This approach can reduce wait times and improve test reliability. for Example something like:
Wait Until Keyword Succeeds 2m 10s Device Should Have Fragment Values c8y_Availability.status=AVAILABLE

Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
In runtime, if user updates entity metadata message to change the health
endpoint, there can be this situation: one timer exists for the old
health endpoint, and the other timer exists for the new endpoint. To
avoid the old timer fires and gets triggered again, converter needs to
know what is the map, which entity topic ID is interested in which
health endpoint now.

Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
@rina23q rina23q temporarily deployed to Test Pull Request May 22, 2024 16:44 — with GitHub Actions Inactive

/// Required interval, and period of the heartbeat only if enable is true
#[tedge_config(example = "60", default(value = 60_i16))]
period: i16,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
period: i16,
interval: i16,

A better term in the context of heartbeats. We could even be more explicit and call it heartbeat_interval.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also better to say this interval is expected to be a number of seconds and to use the tedge-config type Seconds.

@@ -230,6 +238,7 @@ pub struct CumulocityConverter {
pub auth_proxy: ProxyUrlGenerator,
pub uploader_sender: LoggingSender<IdUploadRequest>,
pub downloader_sender: LoggingSender<IdDownloadRequest>,
pub timer_sender: LoggingSender<SyncStart>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're using this timer for a different purpose and not for the alarm sync, for which that SyncStart alias was created, I'd suggest that you use a different alias for it, say Hearbeat or Ping or something better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think renaming SyncStart as you suggested here solves your point.

@@ -239,6 +248,11 @@ pub struct CumulocityConverter {
pub command_id: IdGenerator,
// Keep active command IDs to avoid creation of multiple commands for an operation
pub active_commands: HashSet<CmdId>,

// Keep the service statuses with topic name as a key for c8y_Availability monitoring
pub health_status: HashMap<TopicWithoutPrefix, HealthStatus>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the relevance of the topic "without prefix" here? Anything wrong if something like the EntityTopicId was used here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's different from EntityTopicId. EntityTopicId is restrict to 4 segments name like device/main/a/b, however, the requirement here is addressing custom topic without prefix.

For example,
Original topic: te/custom/anything, te/very/long/a/b/c/d/e/f

  • Invalid as EntityTopicId
  • Valid as the topic to monitor health (custom/anything, very/long/a/b/c/d/e/f)

@@ -967,6 +988,9 @@ impl CumulocityConverter {
) -> Result<Vec<MqttMessage>, ConversionError> {
debug!("Mapping message on topic: {}", message.topic.name);
trace!("Message content: {:?}", message.payload_str());

record_health_status(&self.mqtt_schema, &mut self.health_status, message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The placement of that function call here doesn't feel right. From the logic, it seems like it's only meant to be called on HealthStatus messages. In that case why not do this from the process_health_status_message handle rather than calling it on every Mqtt message as done here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because process_health_status_message can address only Channel::Health topics, meaning, the topic must have te/{valid_entity_topic_id}/status/health scheme. But the requirement is to accept any topic as health endpoint as long as it has te(can be custom root) prefix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support any random topic, that doesn't even have to be a thin-edge topic (not even with custom prefix and custom topic scheme), then we need to make the mapper subscribe to those random topics as well, so that it is aware of the health updates published to those topics. This would mean that we need to dynamically subscribe to those "open topics" as and when they are registered, which won't be easy as our MQTT actor currently doesn't support dynamic subscriptions. The other alternative would be for the mapper to subscribe to '#' and then process only the ones that are declared by the @health endpoints. Since the mapper has a persistent connection with the broker, it will unnecessarily increase the load on the broker to queue a lot of unnecessary data for the mapper, most of which won't even be processed.

So, my advice would be to restrict this feature to thin-edge health endpoints only for now (customers are still free to use custom prefixes and topic schemes), and expand it later to any random topics, once we add support for dynamic subscriptions. @reubenmiller Are you okay with that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No the @health value will still use the root_prefix (default being te/), so at least we need to subscribe for anything more than we are already doing.

The though process was to include the explict /status/health portion of the topic is more explicit, rather than having unexpected "status/health" added to the @health value.

Though I guess if someone chooses to uses something like hello/world/status (which will be interpreted as te/hello/world/status, then thin-edge.io won't receive any messages from this topic (as it does not match any of the existing local mqtt topic subscriptions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No the @health value will still use the root_prefix (default being te/), so at least we need to subscribe for anything more than we are already doing.

So, it is guaranteed that the @health value would be the health endpoint of some entity, right? It could be using a custom topic scheme, and even a custom prefix, as long as tedge is also configured to use the same prefix. In that case, things are better. The PR description stated that the value @health value can/be/any/custom/topic, that doesn't even conform to the above mentioned requirements. My concerns were about trying to be so open to any random topic. As long as it is a valid thin-edge health status topic, we're good.

The though process was to include the explict /status/health portion of the topic is more explicit, rather than having unexpected "status/health" added to the @health value.

I think there's some misunderstanding here. @rina23q implemented this contract with the assumption that we can't assume the @health value to be on the health/status channel, but rather can be any other random topic as well. If we're restricting the value to the the health/status channel, then all good. That's exactly what I was expecting as well.

Though I guess if someone chooses to uses something like hello/world/status (which will be interpreted as te/hello/world/status, then thin-edge.io won't receive any messages from this topic (as it does not match any of the existing local mqtt topic subscriptions?

Correct. Since the mapper currently subscribes specifically to te/+/+/+/+/<channel>/<data-type> topics, Something published to a 4-part topic like te/hello/world/status won't even be received, as thin-edge expects at least a 5-part topic.

TO SUMMARISE my current understanding is as follows: The requirement was not to support any random topic with any prefix and any number of topic levels (e.g: a/b/c/d/c/e/f/g/h or a/b/c), but only to expect valid health status messages on explicit health status channel itself, using any valid custom prefix and topic scheme (the same that is configured with tedge). Correct me if this understanding is wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the original requirement was to support any random topic (under the root_prefix). But since this opens up the problem that the mapper is not subscribing to a wildcard topic {root_prefix}/#, then you're right we won't get the notifications on the topic specified by @health.

Then let's change the requirement so that the @health should refer to an target entity (e.g. device/main/service/tedge-agent etc.)...then the agent implies that the /status/health topic suffix. This means the @health property would follow be of a similar format to the @parent field that is used in custom entity registration messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then let's change the requirement so that the @health should refer to an target entity (e.g. device/main/service/tedge-agent etc.)...then the agent implies that the /status/health topic suffix. This means the @health property would follow be of a similar format to the @parent field that is used in custom entity registration messages.

I agree that this is the way to go. And not because the former desire to support random topics was adding too many technical constraints on the design.

Having the @heath property be an entity identifier makes sense and even more sense than a random topic:

  • even if a child device is not running tedge-agent, this child device will have in any cases to define and populate te topics for its data and services: hence this is not restrictive
  • the @heath property is the identifier of the entity, more likely a service but not necessarily, used to monitor the health of the device: this makes sense from a user perspective
  • this is aligned to all the other proposals related to MQTT topics: easier to understand and implement.

@@ -1251,6 +1327,7 @@ impl CumulocityConverter {
&mut self,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
dbg!(&message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget to remove that.

Comment on lines +59 to +60
pub type SyncStart = SetTimeout<TimeoutKind>;
pub type SyncComplete = Timeout<TimeoutKind>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Sync is just a kind of Timeout as per this updated scheme, I wouldn't call these SyncStart and SyncComplete, but rather use some some generic names like TimerStart and TimerComplete

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll rename it


start_heartbeat_timer(
self.timer_sender.clone(),
interval,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the same interval for the timer that we've set as the c8y_RequiredAvailability interval would lead to a lot of missed heartbeats. For e.g, if the interval is set to 30 mins and if we sleep for 30 minutes here using the timer, by the time we process the heartbeat response from the timer and send a response to the cloud, it definitely would take more than 30 mins, by then the availability interval in the cloud would have timed out. So, IMO, we should send heartbeat intervals more often than what the configured values is.

use tracing::debug;
use tracing::info;

pub type TopicWithoutPrefix = String;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use the EntityTopicId of the lead service, as the health channel info in them are just redundant?

*** Test Cases ***

c8y_RequiredAvailability is set by default
Execute Command ./bootstrap.sh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Execute Command ./bootstrap.sh
Setup

Why not just that I wonder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setup does a lot more than just bootstrapping a device. When using the docker adapter it will spawn a new container. The Test Setup is already creating the container (but with bootstrapping disabled), as the tests look to require fine-grain control over the initial connection.

pub fn get_lead_service_topic(entity: &EntityMetadata) -> Result<TopicWithoutPrefix, String> {
match entity.other.get("@health") {
Some(maybe_topic_name) => match maybe_topic_name.as_str() {
Some(topic_name) if Topic::new(topic_name).is_ok() => Ok(topic_name.to_string()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should validate that the value is a valid entity topic id. I'd have validated that against the entity store as well, but may be that's too early, as the lead service may not be registered by then.

Copy link
Member Author

@rina23q rina23q May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lead service can be any MQTT topic, that's why the type is not EntityTopicId and also no querying to entity store.

Comment on lines +135 to +136
/// The interval can be <=0, which means the device is in maintenance mode in the c8y context.
/// Details: https://cumulocity.com/docs/device-integration/fragment-library/#device-availability
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should not be hidden, but made public in tedge config list --all --doc.

 availability: {
            /// Heartbeat interval in seconds. A value of 0 means the device is in maintenance mode.
            #[tedge_config(example = "60", default(value = 60_u64))]
            heartbeat_interval: Seconds,

use tracing::debug;
use tracing::info;

pub type TopicWithoutPrefix = String;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by without prefix?

  • Is this something like device/main///m/ ?
  • What's the purpose?
  • Is this because having an MqttSchema with the appropriate root prefix is a pain?

Copy link
Contributor

@didier-wenzek didier-wenzek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to avoid adding responsibilities to this already overly bloated CumulocityConverter.

Device availability should be a monitored by an actor that is fully independent of C8yMapperActor and CumulocityConverter. I would implement this feature in a fully independent extension.

The device availability monitor needs to subscribe for entity registration messages and heath topic for this entities (as defined by the @health" property of their registration). This actor doesn't need to share data with the c8y converter and have simply to store mapping as those proposed here (timer_map, status_map`) to publish heartbeat at a regular pace.

The only part which is c8y related are the topics and format for these heartbeats.

@rina23q rina23q marked this pull request as draft May 29, 2024 14:10
@rina23q
Copy link
Member Author

rina23q commented May 29, 2024

Move to draft status as I'm planning to re-write code from scratch. However, it's worth keeping unclosed until the whole work is done in order to keep the conversation history.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants