forked from eclipse-cyclonedds/cyclonedds
/
dynsub.c
187 lines (175 loc) · 8.13 KB
/
dynsub.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Copyright(c) 2022 to 2023 ZettaScale Technology and others
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
// v. 1.0 which is available at
// http://www.eclipse.org/org/documents/edl-v10.php.
//
// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include "dds/dds.h"
#include "dynsub.h"
// Interpreting the data of an arbitrary topic requires interpreting the type object that describes the data.
// The type object type is defined by the XTypes specification (https://www.omg.org/spec/DDS-XTypes/) and it
// comes in two forms: MinimalTypeObject and CompleteTypeObject. Only the latter includes field names, so
// that's what need.
//
// Cyclone DDS includes a copy of the IDL as well as the corresponding type definitions in C as generated by
// IDLC. So instead of including yet another copy, we simply refer to those. These files are not (yet?)
// part of the stable API of Cyclone DDS and so the updates to Cyclone may change the locations or the names
// of the relevant header files.
//
// The API uses `dds_typeobj_t` and `dds_typeinfo_t` that are opaque types but really amount to the
// corresponding XTypes objects: DDS_XTypes_TypeObject and DDS_XTypes_TypeInformation. Rather than casting
// pointers like we do here, they should be defined in a slightly different way so that they are not really
// opaque. For now, this'll have to do.
#include "dds/ddsi/ddsi_xt_typeinfo.h"
// For convenience, the DDS participant is global
static dds_entity_t participant;
// Helper function to wait for a DCPSPublication to show up with the desired topic name, then calls
// dds_find_topic to create a topic for that data writer's type up the retrieves the type object.
static dds_return_t get_topic_and_typeobj (const char *topic_name, dds_duration_t timeout, dds_entity_t *topic, DDS_XTypes_TypeObject **xtypeobj)
{
const dds_entity_t waitset = dds_create_waitset (participant);
const dds_entity_t dcpspublication_reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL);
const dds_entity_t dcpspublication_readcond = dds_create_readcondition (dcpspublication_reader, DDS_ANY_STATE);
(void) dds_waitset_attach (waitset, dcpspublication_readcond, 0);
const dds_time_t abstimeout = (timeout == DDS_INFINITY) ? DDS_NEVER : dds_time () + timeout;
dds_return_t ret = DDS_RETCODE_OK;
*xtypeobj = NULL;
while (*xtypeobj == NULL && dds_waitset_wait_until (waitset, NULL, 0, abstimeout) > 0)
{
void *epraw = NULL;
dds_sample_info_t si;
if (dds_take (dcpspublication_reader, &epraw, &si, 1, 1) <= 0)
continue;
dds_builtintopic_endpoint_t *ep = epraw;
const dds_typeinfo_t *typeinfo = NULL;
// We are only interested in DCPSPublications where the topic name matches and that carry type information
// (a non-XTypes capable DDS would not provide type information) because without that information there is
// no way we can do anything interesting with it.
if (strcmp (ep->topic_name, topic_name) == 0 && dds_builtintopic_get_endpoint_type_info (ep, &typeinfo) == 0 && typeinfo)
{
// Using dds_find_topic allows us to "clone" the topic definition including the topic QoS, but it does
// require that topic discovery is enabled in the configuration. The advantage of using dds_find_topic
// is that it creates a topic with the same name, type *and QoS*. That distinction only matters if
// topic is discovery is enabled and/or if the topic has a durability kind of of transient or persistent:
// - using a different topic QoS might result in an incompatible QoS notification if topic discovery is
// enabled (everything would still work).
// - transient/persistent data behaviour is defined in terms of the topic QoS actually really matters
//
// So we try to use dds_find_topic, and if that fails, try to go the other route using the writer's QoS
// as an approximation of the topic QoS.
if ((*topic = dds_find_topic (DDS_FIND_SCOPE_GLOBAL, participant, ep->topic_name, typeinfo, DDS_SECS (2))) < 0)
{
fprintf (stderr, "dds_find_topic: %s ... continuing on the assumptions that topic discovery is disabled\n", dds_strretcode (*topic));
dds_topic_descriptor_t *descriptor;
if ((ret = dds_create_topic_descriptor(DDS_FIND_SCOPE_GLOBAL, participant, typeinfo, DDS_SECS (10), &descriptor)) < 0)
{
fprintf (stderr, "dds_create_topic_descriptor: %s\n", dds_strretcode (ret));
dds_return_loan (dcpspublication_reader, &epraw, 1);
goto error;
}
if ((*topic = dds_create_topic (participant, descriptor, ep->topic_name, ep->qos, NULL)) < 0)
{
fprintf (stderr, "dds_create_topic_descriptor: %s (be sure to enable topic discovery in the configuration)\n", dds_strretcode (*topic));
dds_delete_topic_descriptor (descriptor);
dds_return_loan (dcpspublication_reader, &epraw, 1);
goto error;
}
dds_delete_topic_descriptor (descriptor);
}
// The topic suffices for creating a reader, but we also need the TypeObject to make sense of the data
if ((*xtypeobj = load_type_with_deps (participant, typeinfo)) == NULL)
{
fprintf (stderr, "loading type with all dependencies failed\n");
dds_return_loan (dcpspublication_reader, &epraw, 1);
goto error;
}
}
dds_return_loan (dcpspublication_reader, &epraw, 1);
}
if (*xtypeobj)
{
{
struct ppc ppc;
ppc_init (&ppc);
ppc_print_to (&ppc, &(*xtypeobj)->_u.complete);
}
// If we got the type object, populate the type cache
size_t align, size;
build_typecache_to (&(*xtypeobj)->_u.complete, &align, &size);
fflush (stdout);
struct typeinfo templ = { .key = { .key = (uintptr_t) *xtypeobj } } , *info;
if ((info = type_cache_lookup (&templ)) != NULL)
{
assert (info->release == NULL);
info->release = *xtypeobj;
}
else
{
// not sure whether this is at all possible
info = malloc (sizeof (*info));
assert (info);
*info = (struct typeinfo){ .key = { .key = (uintptr_t) *xtypeobj }, .typeobj = &(*xtypeobj)->_u.complete, .release = *xtypeobj, .align = align, .size = size };
type_cache_add (info);
}
}
error:
dds_delete (dcpspublication_reader);
dds_delete (waitset);
return (*xtypeobj != NULL) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT;
}
int main (int argc, char **argv)
{
dds_return_t ret = 0;
dds_entity_t topic = 0;
if (argc != 2)
{
fprintf (stderr, "usage: %s topicname\n", argv[0]);
return 2;
}
participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
if (participant < 0)
{
fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant));
return 1;
}
// The one magic step: get a topic and type object ...
DDS_XTypes_TypeObject *xtypeobj;
type_cache_init ();
if ((ret = get_topic_and_typeobj (argv[1], DDS_SECS (10), &topic, &xtypeobj)) < 0)
{
fprintf (stderr, "get_topic_and_typeobj: %s\n", dds_strretcode (ret));
goto error;
}
// ... given those, we can create a reader just like we do normally ...
const dds_entity_t reader = dds_create_reader (participant, topic, NULL, NULL);
// ... and create a waitset that allows us to wait for any incoming data ...
const dds_entity_t waitset = dds_create_waitset (participant);
const dds_entity_t readcond = dds_create_readcondition (reader, DDS_ANY_STATE);
(void) dds_waitset_attach (waitset, readcond, 0);
while (1)
{
(void) dds_waitset_wait (waitset, NULL, 0, DDS_INFINITY);
void *raw = NULL;
dds_sample_info_t si;
if ((ret = dds_take (reader, &raw, &si, 1, 1)) < 0)
goto error;
else if (ret != 0)
{
// ... that we then print
print_sample (si.valid_data, raw, &xtypeobj->_u.complete);
if ((ret = dds_return_loan (reader, &raw, 1)) < 0)
goto error;
}
}
error:
type_cache_free ();
dds_delete (participant);
return ret < 0;
}