Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracepoint for publish/subscribe serialized message #485

Merged
merged 9 commits into from Apr 5, 2024
30 changes: 29 additions & 1 deletion rmw_cyclonedds_cpp/src/rmw_node.cpp
Expand Up @@ -2024,9 +2024,13 @@ extern "C" rmw_ret_t rmw_publish_serialized_message(
serialized_message, "serialized message handle is null",
return RMW_RET_INVALID_ARGUMENT);
auto pub = static_cast<CddsPublisher *>(publisher->data);
const dds_time_t tstamp = dds_time();
TRACETOOLS_TRACEPOINT(rmw_publish, (const void *)publisher, serialized_message, tstamp);

struct ddsi_serdata * d = serdata_rmw_from_serialized_message(
pub->sertype, serialized_message->buffer, serialized_message->buffer_length);
d->timestamp.v = tstamp;
d->statusinfo = 0;

#ifdef DDS_HAS_SHM
// publishing a serialized message when SHM is available
Expand All @@ -2040,7 +2044,7 @@ extern "C" rmw_ret_t rmw_publish_serialized_message(
}
#endif

const bool ok = (dds_writecdr(pub->enth, d) >= 0);
const bool ok = (dds_forwardcdr(pub->enth, d) >= 0);
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
return ok ? RMW_RET_OK : RMW_RET_ERROR;
}

Expand Down Expand Up @@ -3522,6 +3526,12 @@ static rmw_ret_t rmw_take_ser_int(
serialized_message->buffer_length = size;
ddsi_serdata_unref(d);
*taken = true;
TRACETOOLS_TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(serialized_message),
(message_info ? message_info->source_timestamp : 0LL),
*taken);
return RMW_RET_OK;
} else if (iox_header->shm_data_state == IOX_CHUNK_CONTAINS_RAW_DATA) {
if (rmw_serialize(d->iox_chunk, &sub->type_supports, serialized_message) != RMW_RET_OK) {
Expand All @@ -3532,6 +3542,12 @@ static rmw_ret_t rmw_take_ser_int(
}
ddsi_serdata_unref(d);
*taken = true;
TRACETOOLS_TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(serialized_message),
(message_info ? message_info->source_timestamp : 0LL),
*taken);
return RMW_RET_OK;
} else {
RMW_SET_ERROR_MSG("The recieved sample over SHM is not initialized");
Expand All @@ -3553,12 +3569,24 @@ static rmw_ret_t rmw_take_ser_int(
serialized_message->buffer_length = size;
ddsi_serdata_unref(d);
*taken = true;
TRACETOOLS_TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(serialized_message),
(message_info ? message_info->source_timestamp : 0LL),
*taken);
return RMW_RET_OK;
}
}
ddsi_serdata_unref(d);
}
*taken = false;
TRACETOOLS_TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(serialized_message),
0LL,
*taken);
return RMW_RET_OK;
}

Expand Down