Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added another FACE example with two topics #4607

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 45 additions & 0 deletions DevGuideExamples/FACE/two-topic/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
cmake_minimum_required(VERSION 3.3...3.27)
AlexFalzone marked this conversation as resolved.
Show resolved Hide resolved
project(opendds_stock CXX)
AlexFalzone marked this conversation as resolved.
Show resolved Hide resolved
enable_testing()

find_package(OpenDDS REQUIRED)
include(opendds_testing)

# Make sure the MPC-generated headers are gone so the CMake build will use the
# right ones. This is not needed in a real project.
# file(GLOB headers "*.h")
# list(LENGTH headers header_count)
# if(header_count GREATER 0)
# file(REMOVE ${headers})
# endif()
AlexFalzone marked this conversation as resolved.
Show resolved Hide resolved

# IDL TypeSupport Library
add_library(stockquoter_idl)
AlexFalzone marked this conversation as resolved.
Show resolved Hide resolved
include_directories(
/opt/OpenDDS/
)
AlexFalzone marked this conversation as resolved.
Show resolved Hide resolved
opendds_target_sources(stockquoter_idl PUBLIC "StockQuoter.idl")
target_link_libraries(stockquoter_idl PUBLIC OpenDDS::Dcps)
AlexFalzone marked this conversation as resolved.
Show resolved Hide resolved


set(opendds_libs
OpenDDS::Dcps # Core OpenDDS Library
OpenDDS::InfoRepoDiscovery OpenDDS::Tcp # For run_test.pl
OpenDDS::Rtps OpenDDS::Rtps_Udp # For run_test.pl --rtps
AlexFalzone marked this conversation as resolved.
Show resolved Hide resolved
OpenDDS::FACE
stockquoter_idl
)

# Publisher
add_executable(publisher
Publisher.cpp
StockQuoter_TS.cpp
)
target_link_libraries(publisher ${opendds_libs})

# Subscriber
add_executable(subscriber
Subscriber.cpp
StockQuoter_TS.cpp
)
target_link_libraries(subscriber ${opendds_libs})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The testing is missing for both CMake and MPC. The Simple example isn't doing this though, so I guess it'd be hard to come up with that from scratch. https://github.com/OpenDDS/OpenDDS/blob/master/tests/FACE/Messenger/run_test.pl should do without the callback and subdirectory stuff. You'd also need to add the test to https://github.com/OpenDDS/OpenDDS/blob/master/tests/dcps_tests.lst. Again FACE messenger should do, but without the callback lines probably:

tests/FACE/Messenger/run_test.pl: !DCPS_MIN !WCHAR RTPS !NO_BUILT_IN_TOPICS
tests/FACE/Messenger/run_test.pl callback: !DCPS_MIN !WCHAR RTPS !NO_BUILT_IN_TOPICS
tests/FACE/Messenger/run_test.pl static: !DCPS_MIN !WCHAR RTPS !NO_BUILT_IN_TOPICS
tests/FACE/Messenger/run_test.pl static callback: !DCPS_MIN !WCHAR RTPS !NO_BUILT_IN_TOPICS

Let me know if you need further guidance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you need further guidance.

I honestly don't know where to start

123 changes: 123 additions & 0 deletions DevGuideExamples/FACE/two-topic/Publisher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@

#include "StockQuoterC.h"
#include "StockQuoter_TS.hpp"
#include "ace/Log_Msg.h"
#include "ace/OS_NS_time.h"
#include "ace/OS_NS_unistd.h"
#include "orbsvcs/Time_Utilities.h"
#include <FACE/TS.hpp>
#include <FACE/TS_common.hpp>
#include <ace/OS_NS_time.h>

#ifdef ACE_AS_STATIC_LIBS
#include "dds/DCPS/RTPS/RtpsDiscovery.h"
#include "dds/DCPS/transport/rtps_udp/RtpsUdp.h"
#endif

TimeBase::TimeT get_timestamp() {
TimeBase::TimeT retval;
ACE_timer_t t = ACE_OS::gethrtime();
ORBSVCS_Time::hrtime_to_TimeT(retval, t);
return retval;
}

// FUZZ: disable check_for_improper_main_declaration
int main(int, char *[]) {
// Initialize the TS interface
FACE::RETURN_CODE_TYPE status, status2;
FACE::TS::Initialize("face_config.ini", status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG(
(LM_INFO, "Publisher: error initializing FACE, status = %d\n", status));
return static_cast<int>(status);
}

// Create the pub Exchangeconnection
FACE::CONNECTION_ID_TYPE connIdExchange;
FACE::CONNECTION_DIRECTION_TYPE dir = FACE::SOURCE;
FACE::MESSAGE_SIZE_TYPE max_msg_size;

FACE::TS::Create_Connection("pub", FACE::PUB_SUB, connIdExchange, dir,
max_msg_size, FACE::INF_TIME_VALUE, status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO, "Publisher: error creating connections, status = %d\n",
status));
return static_cast<int>(status);
}

FACE::CONNECTION_ID_TYPE connIdQuote;
FACE::CONNECTION_DIRECTION_TYPE dirQuote = FACE::SOURCE;
FACE::TS::Create_Connection("pubquote", FACE::PUB_SUB, connIdQuote, dirQuote,
max_msg_size, FACE::INF_TIME_VALUE, status2);
if (status2 != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO, "Publisher: error creating connections, status = %d\n ",
status));
return static_cast<int>(status2);
}

// Message to send
for (size_t i = 0; i < 10; i++) {
ACE_OS::sleep(1);

StockQuoter::Quote msg;
msg.ticker = "AAPL";
msg.exchange = "NASDAQ";
msg.full_name = "Apple Inc.";
msg.value = 100.0 + i;
msg.timestamp = get_timestamp();
FACE::TRANSACTION_ID_TYPE txnQuote;
FACE::TRANSACTION_ID_TYPE txnExchange;

StockQuoter::ExchangeEvent event;
event.exchange = "NASDAQ";
event.event = StockQuoter::TRADING_OPENED;
event.timestamp = get_timestamp();

// Send the event
FACE::TS::Send_Message(connIdExchange, FACE::INF_TIME_VALUE, txnExchange,
event, max_msg_size, status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO,
"Publisher: error sending EVENT message, status = %d\n",
status));
return static_cast<int>(status);
}

ACE_OS::sleep(1);

// Send the message
FACE::TS::Send_Message(connIdQuote, FACE::INF_TIME_VALUE, txnQuote, msg,
max_msg_size, status2);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO,
"Publisher: error sending QUOTE message, status = %d\n",
status));
return static_cast<int>(status);
}

ACE_DEBUG((LM_INFO, "Publisher: sent message\n"));
}

// Give message time to be processed before exiting
ACE_OS::sleep(15);

// Destroy the pub connection
FACE::TS::Destroy_Connection(connIdExchange, status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO,
"Publisher: error destroying connections, status = %d\n",
status));
return static_cast<int>(status);
}

FACE::TS::Destroy_Connection(connIdQuote, status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO,
"Publisher: error destroying connections, status = %d\n",
status));
return static_cast<int>(status);
}

return EXIT_SUCCESS;
}
// FUZZ: enable check_for_improper_main_declaration
25 changes: 25 additions & 0 deletions DevGuideExamples/FACE/two-topic/StockQuoter.idl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma FACE IDL_VERSION 3.1
#include "orbsvcs/TimeBase.idl"
module StockQuoter
{
@topic
struct Quote {
@key string ticker;
string exchange;
string full_name;
double value;
TimeBase::TimeT timestamp;
};

enum ExchangeEventType { TRADING_OPENED,
TRADING_CLOSED,
TRADING_SUSPENDED,
TRADING_RESUMED };
@topic
struct ExchangeEvent {
@key string exchange;
ExchangeEventType event;
TimeBase::TimeT timestamp;
};
#pragma FACE ts_definitions Message
};
AlexFalzone marked this conversation as resolved.
Show resolved Hide resolved
116 changes: 116 additions & 0 deletions DevGuideExamples/FACE/two-topic/Subscriber.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#include "StockQuoterC.h"
#include "StockQuoter_TS.hpp"
#include "ace/Log_Msg.h"
#include <FACE/TS_common.hpp>
#include <FACE/common.hpp>
#include <csignal>

#ifdef ACE_AS_STATIC_LIBS
#include "dds/DCPS/RTPS/RtpsDiscovery.h"
#include "dds/DCPS/transport/rtps_udp/RtpsUdp.h"
#endif

// Global flag to indicate if termination signal was received
volatile sig_atomic_t termination_flag = 0;

// Signal handler function
void signal_handler(int signal) {
// Set termination flag to true
termination_flag = 1;
}

// FUZZ: disable check_for_improper_main_declaration
int main(int, char *[]) {

// Set up signal handlers
std::signal(SIGINT, signal_handler); // Ctrl+C
std::signal(SIGTERM, signal_handler); // Termination signal

// Initialize the TS interface
FACE::RETURN_CODE_TYPE status;
FACE::TS::Initialize("face_config.ini", status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO, "Subscriber: error initializing FACE, status = %d\n",
status));
return static_cast<int>(status);
}

// Create the sub connection
FACE::CONNECTION_ID_TYPE connIdQuote;
FACE::CONNECTION_ID_TYPE connIdExchange;
FACE::CONNECTION_DIRECTION_TYPE dir = FACE::DESTINATION;
FACE::CONNECTION_DIRECTION_TYPE dirQuote = FACE::DESTINATION;
FACE::MESSAGE_SIZE_TYPE max_msg_size;
FACE::TS::Create_Connection("subquote", FACE::PUB_SUB, connIdQuote, dirQuote,
max_msg_size, FACE::INF_TIME_VALUE, status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO, "Subscriber: no error\n", status));
}

FACE::TS::Create_Connection("sub", FACE::PUB_SUB, connIdExchange, dir,
max_msg_size, FACE::INF_TIME_VALUE, status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO, "Subscriber: error creating connections, status = %d\n",
status));
return static_cast<int>(status);
}

// Receive a message
while (true) {
ACE_OS::sleep(1);

ACE_DEBUG((LM_INFO, "Subscriber: waiting for message\n"));

StockQuoter::Quote msg;
FACE::TRANSACTION_ID_TYPE txnExchange;
FACE::TRANSACTION_ID_TYPE txnQuote;

StockQuoter::ExchangeEvent event;
FACE::TS::Receive_Message(connIdExchange, FACE::INF_TIME_VALUE, txnExchange,
event, max_msg_size, status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO,
"Subscriber: error receiving EVENT message, status = %d\n",
status));
}

ACE_OS::sleep(2);

FACE::TS::Receive_Message(connIdQuote, FACE::INF_TIME_VALUE, txnQuote, msg,
max_msg_size, status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO,
"Subscriber: error receiving QUOTE message, status = %d\n",
status));
return static_cast<int>(status);
}

// print all the fields of msg
ACE_DEBUG((LM_INFO, "Subscriber: received message\n"));
ACE_DEBUG((LM_INFO, " exchange: %s\n", msg.exchange.in()));
ACE_DEBUG((LM_INFO, " value: %f\n", msg.value));

if (termination_flag) {
break;
}
}

// Destroy the sub connection
FACE::TS::Destroy_Connection(connIdQuote, status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO,
"Subscriber: error destroying connections, status = %d\n",
status));
return static_cast<int>(status);
}

FACE::TS::Destroy_Connection(connIdExchange, status);
if (status != FACE::RC_NO_ERROR) {
ACE_DEBUG((LM_INFO,
"Subscriber: error destroying connections, status = %d\n",
status));
return static_cast<int>(status);
}

return EXIT_SUCCESS;
}
AlexFalzone marked this conversation as resolved.
Show resolved Hide resolved
12 changes: 12 additions & 0 deletions DevGuideExamples/FACE/two-topic/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: '2'
services:
subscriber:
image: ghcr.io/opendds/opendds:latest-release
command: ["./build/subscriber", "-DCPSConfigFile", "rtps.ini" ]
volumes:
- $PWD:/opt/workspace
publisher:
image: ghcr.io/opendds/opendds:latest-release
command: ["./build/publisher", "-DCPSConfigFile", "rtps.ini" ]
volumes:
- $PWD:/opt/workspace
61 changes: 61 additions & 0 deletions DevGuideExamples/FACE/two-topic/face_config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
[common]
DCPSGlobalTransportConfig=$file

[domain/3]
DiscoveryConfig=uni_rtps

[rtps_discovery/uni_rtps]
SedpMulticast=0
ResendPeriod=2

[transport/the_rtps_transport]
transport_type=rtps_udp
use_multicast=0

[topic/Quote]
platform_view_guid=103
type_name=StockQuoter::Quote
max_message_size=50000

[topic/ExchangeEvent]
platform_view_guid=103
type_name=StockQuoter::ExchangeEvent
max_message_size=50000

; EXCHANGE PUB
[connection/pub]
id=1
domain=3
direction=source
topic=ExchangeEvent
datawriterqos=durable_writer

; EXCHANGE SUB
[connection/sub]
id=2
domain=3
direction=destination
topic=ExchangeEvent
datareaderqos=durable_reader

; QUOTE PUB
[connection/pubquote]
id=3
domain=3
direction=source
topic=Quote
datawriterqos=durable_writer

; QUOTE SUB
[connection/subquote]
id=4
domain=3
direction=destination
topic=Quote
datareaderqos=durable_reader

[datawriterqos/durable_writer]
durability.kind=TRANSIENT_LOCAL

[datareaderqos/durable_reader]
durability.kind=TRANSIENT_LOCAL
6 changes: 6 additions & 0 deletions DevGuideExamples/FACE/two-topic/rtps.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[common]
AlexFalzone marked this conversation as resolved.
Show resolved Hide resolved
DCPSDefaultDiscovery=DEFAULT_RTPS
DCPSGlobalTransportConfig=$file

[transport/the_rtps_transport]
transport_type=rtps_udp