Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to Workaround Bad Address in tcp Transport #3215

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions dds/DCPS/AssociationData.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct AssociationData {
, remote_durable_(false)
{}

static ACE_INET_Addr get_remote_address(const TransportBLOB& remote)
static ACE_INET_Addr get_remote_address(const TransportBLOB& remote, AddrSet* attempted = 0)
{
ACE_INET_Addr remote_address;
NetworkAddress network_order_address;
Expand All @@ -49,7 +49,7 @@ struct AssociationData {
ACE_TEXT("(%P|%t) ERROR: AssociationData::get_remote_address")
ACE_TEXT(" failed to de-serialize the NetworkAddress\n")));
} else {
network_order_address.to_addr(remote_address);
network_order_address.to_addr(remote_address, attempted);
}

return remote_address;
Expand Down
62 changes: 35 additions & 27 deletions dds/DCPS/transport/framework/NetworkAddress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,50 +523,57 @@ ACE_INET_Addr tie_breaker(const T& addrs, const String& name)

}

ACE_INET_Addr choose_single_coherent_address(const OPENDDS_VECTOR(ACE_INET_Addr)& addresses, bool prefer_loopback, const String& name)
namespace {
void insert_into_addr_set(
const char* name, AddrSet& set, AddrSet* attempted, const ACE_INET_Addr& addr)
{
if (attempted && attempted->count(addr)) {
VDBG((LM_DEBUG, "(%P|%t) choose_single_coherent_address(list): "
"Will NOT consider %C address %C because it was already attempted\n",
name, LogAddr(addr).c_str()));
} else {
VDBG((LM_DEBUG, "(%P|%t) choose_single_coherent_address(list): "
"Considering Address %C - ADDING TO %C LIST\n", LogAddr(addr).c_str(), name));
set.insert(addr);
}
}
}

ACE_INET_Addr choose_single_coherent_address(
const OPENDDS_VECTOR(ACE_INET_Addr)& addresses, bool prefer_loopback,
const String& name, AddrSet* attempted)
{
#ifdef ACE_HAS_IPV6
OPENDDS_SET(ACE_INET_Addr) set6_loopback;
OPENDDS_SET(ACE_INET_Addr) set6_linklocal;
OPENDDS_SET(ACE_INET_Addr) set6_mapped_v4;
OPENDDS_SET(ACE_INET_Addr) set6;
AddrSet set6_loopback;
AddrSet set6_linklocal;
AddrSet set6_mapped_v4;
AddrSet set6;
#endif // ACE_HAS_IPV6
OPENDDS_SET(ACE_INET_Addr) set4_loopback;
OPENDDS_SET(ACE_INET_Addr) set4;
AddrSet set4_loopback;
AddrSet set4;

for (OPENDDS_VECTOR(ACE_INET_Addr)::const_iterator it = addresses.begin(); it != addresses.end(); ++it) {
#ifdef ACE_HAS_IPV6
if (it->get_type() == AF_INET6 && !it->is_multicast()) {
if (it->is_loopback()) {
VDBG((LM_DEBUG, "(%P|%t) choose_single_coherent_address(list) - "
"Considering Address %C - ADDING TO IPv6 LOOPBACK LIST\n", LogAddr(*it).c_str()));
set6_loopback.insert(*it);
insert_into_addr_set("IPv6 LOOPBACK", set6_loopback, attempted, *it);
} else if (it->is_ipv4_mapped_ipv6() || it->is_ipv4_compat_ipv6()) {
#ifndef IPV6_V6ONLY
VDBG((LM_DEBUG, "(%P|%t) choose_single_coherent_address(list) - "
"Considering Address %C - ADDING TO IPv6 MAPPED / COMPATIBLE IPv4 LIST\n", LogAddr(*it).c_str()));
insert_into_addr_set("IPv6 MAPPED / COMPATIBLE IPv4", set6_mapped_v4, attempted, *it);
set6_mapped_v4.insert(*it);
#endif // ! IPV6_V6ONLY
} else if (it->is_linklocal()) {
VDBG((LM_DEBUG, "(%P|%t) choose_single_coherent_address(list) - "
"Considering Address %C - ADDING TO IPv6 LINK-LOCAL LIST\n", LogAddr(*it).c_str()));
set6_linklocal.insert(*it);
insert_into_addr_set("IPv6 LINK-LOCAL", set6_linklocal, attempted, *it);
} else {
VDBG((LM_DEBUG, "(%P|%t) choose_single_coherent_address(list) - "
"Considering Address %C - ADDING TO IPv6 NORMAL LIST\n", LogAddr(*it).c_str()));
set6.insert(*it);
insert_into_addr_set("IPv6 NORMAL", set6, attempted, *it);
}
}
#endif // ACE_HAS_IPV6
if (it->get_type() == AF_INET && !it->is_multicast()) {
if (it->is_loopback()) {
VDBG((LM_DEBUG, "(%P|%t) choose_single_coherent_address(list) - "
"Considering Address %C - ADDING TO IPv4 LOOPBACK LIST\n", LogAddr(*it).c_str()));
set4_loopback.insert(*it);
insert_into_addr_set("IPv4 LOOPBACK", set4_loopback, attempted, *it);
} else {
VDBG((LM_DEBUG, "(%P|%t) choose_single_coherent_address(list) - "
"Considering Address %C - ADDING TO IPv4 NORMAL LIST\n", LogAddr(*it).c_str()));
set4.insert(*it);
insert_into_addr_set("IPv4 NORMAL", set4, attempted, *it);
}
}
}
Expand Down Expand Up @@ -617,7 +624,8 @@ ACE_INET_Addr choose_single_coherent_address(const OPENDDS_VECTOR(ACE_INET_Addr)
return ACE_INET_Addr();
}

ACE_INET_Addr choose_single_coherent_address(const String& address, bool prefer_loopback, bool allow_ipv4_fallback)
ACE_INET_Addr choose_single_coherent_address(
const String& address, bool prefer_loopback, bool allow_ipv4_fallback, AddrSet* attempted)
{
ACE_INET_Addr result;

Expand Down Expand Up @@ -752,7 +760,7 @@ ACE_INET_Addr choose_single_coherent_address(const String& address, bool prefer_

#ifdef ACE_WIN32
static ACE_Thread_Mutex addr_cache_map_mutex_;
typedef std::pair<SystemTimePoint, OPENDDS_SET(ACE_INET_Addr)> AddrCachePair;
typedef std::pair<SystemTimePoint, AddrSet> AddrCachePair;
typedef OPENDDS_MAP(String, AddrCachePair) AddrCacheMap;
static AddrCacheMap addr_cache_map_;
ACE_Guard<ACE_Thread_Mutex> g(addr_cache_map_mutex_);
Expand Down Expand Up @@ -805,7 +813,7 @@ ACE_INET_Addr choose_single_coherent_address(const String& address, bool prefer_

ACE_OS::freeaddrinfo(res);

return choose_single_coherent_address(addresses, prefer_loopback, host_name);
return choose_single_coherent_address(addresses, prefer_loopback, host_name, attempted);
}

int locator_to_address(ACE_INET_Addr& dest,
Expand Down
12 changes: 9 additions & 3 deletions dds/DCPS/transport/framework/NetworkAddress.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct HostnameInfo {

typedef OPENDDS_VECTOR(HostnameInfo) HostnameInfoVector;

typedef OPENDDS_SET(ACE_INET_Addr) AddrSet;

/**
* @struct NetworkAddress
*
Expand All @@ -52,7 +54,7 @@ struct OpenDDS_Dcps_Export NetworkAddress {

/// Accessor to populate the provided ACE_INET_Addr object from the
/// address string received through transport.
void to_addr(ACE_INET_Addr& addr) const;
void to_addr(ACE_INET_Addr& addr, AddrSet* attempted = 0) const;

/// Reserve byte for some feature supports in the future.
/// e.g. version support.
Expand Down Expand Up @@ -90,13 +92,17 @@ extern OpenDDS_Dcps_Export
bool open_appropriate_socket_type(ACE_SOCK_Dgram& socket, const ACE_INET_Addr& local_address, int* proto_family = 0);

extern OpenDDS_Dcps_Export
ACE_INET_Addr choose_single_coherent_address(const OPENDDS_VECTOR(ACE_INET_Addr)& addrs, bool prefer_loopback = true, const String& name = String());
ACE_INET_Addr choose_single_coherent_address(
const OPENDDS_VECTOR(ACE_INET_Addr)& addrs, bool prefer_loopback = true,
const String& name = String(), AddrSet* attempted = 0);

extern OpenDDS_Dcps_Export
ACE_INET_Addr choose_single_coherent_address(const ACE_INET_Addr& addr, bool prefer_loopback = true);

extern OpenDDS_Dcps_Export
ACE_INET_Addr choose_single_coherent_address(const String& hostname, bool prefer_loopback = true, bool allow_ipv4_fallback = true);
ACE_INET_Addr choose_single_coherent_address(
const String& address, bool prefer_loopback = true,
bool allow_ipv4_fallback = true, AddrSet* attempted = 0);

inline void assign(DDS::OctetArray16& dest,
ACE_CDR::ULong ipv4addr_be)
Expand Down
5 changes: 3 additions & 2 deletions dds/DCPS/transport/framework/NetworkAddress.inl
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ OpenDDS::DCPS::NetworkAddress::~NetworkAddress()
}

ACE_INLINE
void OpenDDS::DCPS::NetworkAddress::to_addr(ACE_INET_Addr& addr) const
void OpenDDS::DCPS::NetworkAddress::to_addr(ACE_INET_Addr& addr, AddrSet* attempted) const
{
DBG_ENTRY_LVL("NetworkAddress","to_addr",6);
addr = choose_single_coherent_address(addr_.c_str(), true /*prefer_loopback*/);
addr = choose_single_coherent_address(
addr_.c_str(), true /*prefer_loopback*/, true /*allow_ipv4_fallback*/, attempted);
VDBG_LVL((LM_DEBUG, "(%P|%t) NetworkAddress::to_addr() - Resolving address for %C to be %C\n", addr_.c_str(), addr.get_host_addr()), 2);
}

Expand Down
20 changes: 16 additions & 4 deletions dds/DCPS/transport/framework/TransportImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,23 @@ class OpenDDS_Dcps_Export TransportImpl : public RcObject {
};

struct AcceptConnectResult {
enum Status { ACR_SUCCESS, ACR_FAILED };
explicit AcceptConnectResult(Status ok = ACR_FAILED)
: success_(ok == ACR_SUCCESS), link_() {}
enum Status { ACR_SUCCESS, ACR_FAILED, ACR_CONNECT_FAILED };

explicit AcceptConnectResult(Status status = ACR_FAILED)
: status_(status)
, success_(status == ACR_SUCCESS)
, link_()
{
}

AcceptConnectResult(const DataLink_rch& link)
: success_(link), link_(link) {}
: status_(link ? ACR_SUCCESS : ACR_FAILED)
, success_(status_ == ACR_SUCCESS)
, link_(link)
{
}

Status status_;
/// If false, the accept or connect has failed and link_ is ignored.
bool success_;
/// If success_ is true, link_ may either be null or have a valid DataLink.
Expand Down
37 changes: 27 additions & 10 deletions dds/DCPS/transport/tcp/TcpTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,27 +61,44 @@ TcpTransport::config() const
PriorityKey
TcpTransport::blob_to_key(const TransportBLOB& remote,
Priority priority,
bool active)
bool active,
AddrSet* attempted)
{
const ACE_INET_Addr remote_address = AssociationData::get_remote_address(remote);
const ACE_INET_Addr remote_address = AssociationData::get_remote_address(remote, attempted);
const bool is_loopback = remote_address == config().local_address();
return PriorityKey(priority, remote_address, is_loopback, active);
}

TransportImpl::AcceptConnectResult
TcpTransport::connect_datalink(const RemoteTransport& remote,
const ConnectionAttribs& attribs,
const TransportClient_rch& client)
TransportImpl::AcceptConnectResult TcpTransport::connect_datalink(
const RemoteTransport& remote,
const ConnectionAttribs& attribs,
const TransportClient_rch& client)
{
DBG_ENTRY_LVL("TcpTransport", "connect_datalink", 6);

if (is_shut_down()) {
return AcceptConnectResult();
}

const PriorityKey key =
blob_to_key(remote.blob_, attribs.priority_, true /*active*/);
TransportImpl::AcceptConnectResult res(AcceptConnectResult::ACR_CONNECT_FAILED);
AddrSet attempted;
while (res.status_ == AcceptConnectResult::ACR_CONNECT_FAILED) {
const PriorityKey key =
blob_to_key(remote.blob_, attribs.priority_, true /*active*/, &attempted);
res = connect_datalink_i(remote, attribs, client, key);
attempted.insert(key.address());
}

return res;
}


TransportImpl::AcceptConnectResult TcpTransport::connect_datalink_i(
const RemoteTransport& remote,
const ConnectionAttribs& attribs,
const TransportClient_rch& client,
const PriorityKey& key)
{
VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink PriorityKey "
"prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
key.priority(), LogAddr(key.address()).c_str(), key.is_loopback(),
Expand All @@ -100,7 +117,7 @@ TcpTransport::connect_datalink(const RemoteTransport& remote,
}

link = make_rch<TcpDataLink>(key.address(), ref(*this), attribs.priority_,
key.is_loopback(), true /*active*/);
key.is_loopback(), true /*active*/);
VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink create new link[%@]\n", link.in()), 0);
if (links_.bind(key, link) != 0 /*OK*/) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
Expand Down Expand Up @@ -147,7 +164,7 @@ TcpTransport::connect_datalink(const RemoteTransport& remote,
}
link->invoke_on_start_callbacks(false);

return AcceptConnectResult();
return AcceptConnectResult(AcceptConnectResult::ACR_CONNECT_FAILED);
}

if (ret == 0) {
Expand Down
10 changes: 9 additions & 1 deletion dds/DCPS/transport/tcp/TcpTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <dds/DCPS/ReactorTask_rch.h>
#include <dds/DCPS/transport/framework/PriorityKey.h>
#include <dds/DCPS/transport/framework/TransportImpl.h>
#include <dds/DCPS/transport/framework/NetworkAddress.h>
#include <dds/DCPS/TimeTypes.h>

#include <ace/INET_Addr.h>
Expand Down Expand Up @@ -62,6 +63,12 @@ class OpenDDS_Tcp_Export TcpTransport
const ConnectionAttribs& attribs,
const TransportClient_rch& client);

virtual AcceptConnectResult connect_datalink_i(
const RemoteTransport& remote,
const ConnectionAttribs& attribs,
const TransportClient_rch& client,
const PriorityKey& key);

virtual AcceptConnectResult accept_datalink(const RemoteTransport& remote,
const ConnectionAttribs& attribs,
const TransportClient_rch& client);
Expand Down Expand Up @@ -109,7 +116,8 @@ class OpenDDS_Tcp_Export TcpTransport

PriorityKey blob_to_key(const TransportBLOB& remote,
Priority priority,
bool active);
bool active,
AddrSet* attempted = 0);

/// Map Type: (key) PriorityKey to (value) TcpDataLink_rch
typedef ACE_Hash_Map_Manager_Ex
Expand Down