Skip to content
This repository has been archived by the owner on Aug 11, 2023. It is now read-only.

ServiceResponseHandler uses unsafe queue for returning service responses to caller #261

Open
msmcconnell opened this issue Jan 24, 2018 · 6 comments
Labels

Comments

@msmcconnell
Copy link

The ServiceResponseHandler uses a queue to handler ros service requests between nodes. Since the response to a service call is simply a poll from the queue, service responses can become mismatched if a node contains multi-threaded components making the same service call. If the first request does not complete before the second request, the responses will be flipped. A mapping is needed to ensure only the correct responses are returned.

Relevant variable: Line 41
private final Queue<ServiceResponseListener<ResponseType>> responseListeners;

Relevant method: Line 52

  @Override
  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    final ServiceResponseListener<ResponseType> listener = responseListeners.poll();
    Preconditions.checkNotNull(listener, "No listener for incoming service response.");
    final ServiceServerResponse response = (ServiceServerResponse) e.getMessage();
    final ChannelBuffer buffer = response.getMessage();
    executorService.execute(new Runnable() {
      @Override
      public void run() {
        if (response.getErrorCode() == 1) {
          listener.onSuccess(deserializer.deserialize(buffer));
        } else {
          String message = Charset.forName("US-ASCII").decode(buffer.toByteBuffer()).toString();
          listener.onFailure(new RemoteException(StatusCode.ERROR, message));
        }
      }
    });
  }
@jubeira
Copy link

jubeira commented Jan 24, 2018

Hi @msmcconnell, thanks for reporting this.
Do you have a code sample to test that this is effectively failing? That would help in case we apply a fix to verify whether it works or not.
Do you propose a particular solution for the problem?

@msmcconnell
Copy link
Author

I have an integration test between two nodes where one of them is multi-threaded. I would need to remove a small dependency on my own code but it wouldn't be too hard. I can provide the code sample here. I'll try to think of a solution.

@msmcconnell
Copy link
Author

Here is a code sample for an integration test I used to verify that this was happening.
If this is run as a unit test with gradle the user should see some mismatched thread numbers and a failure from the service call.
Test Class

import cav_srvs.GetDriversWithCapabilities;
import cav_srvs.GetDriversWithCapabilitiesRequest;
import cav_srvs.GetDriversWithCapabilitiesResponse;
import gov.dot.fhwa.saxton.carma.rosutils.RosTest;
import org.ros.message.MessageFactory;
import org.ros.namespace.GraphName;
import org.ros.node.AbstractNodeMain;
import org.ros.node.ConnectedNode;
import org.ros.node.service.*;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.ros.node.Node;
import org.ros.node.NodeConfiguration;
import org.junit.Test;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
 * Class for running a multi-threaded service request
 */
public class ServiceTest extends RosTest {

  /**
   * Test if a multi-threaded node can safely call services from another node
   */
  @Test public void testServiceAvailability() throws Exception {
    final NodeConfiguration secondConfig = NodeConfiguration.newPrivate(rosCore.getUri());
    final MessageFactory messageFactory = secondConfig.getTopicMessageFactory();
    final CountDownLatch countDownLatch = new CountDownLatch(100);
    final String SERVICE_NAME = "test_service";
    final int NUM_THREADS = 10;

    // Create the anonymous node to test the server
    AbstractNodeMain serverNode = new AbstractNodeMain() {
      @Override public GraphName getDefaultNodeName() {
        return GraphName.of("server_node");
      }

      @Override public void onStart(final ConnectedNode connectedNode) {

        // Setup Server
        ServiceServer<GetDriversWithCapabilitiesRequest, GetDriversWithCapabilitiesResponse> testServer =
         connectedNode.newServiceServer(SERVICE_NAME, GetDriversWithCapabilities._TYPE,
          new ServiceResponseBuilder<GetDriversWithCapabilitiesRequest, GetDriversWithCapabilitiesResponse>() {
  
            @Override
            public void build(GetDriversWithCapabilitiesRequest req, GetDriversWithCapabilitiesResponse res) {
              res.setDriverData(req.getCapabilities());
            }
          });
      }
    };

    // Create the anonymous node to test the server
    AbstractNodeMain clientNode = new AbstractNodeMain() {
      List<ClientThread> threads = new LinkedList<>();
      @Override public GraphName getDefaultNodeName() {
        return GraphName.of("client_node");
      }

      @Override public void onStart(final ConnectedNode connectedNode) {

        // Assert that the service was created

        ServiceClient<GetDriversWithCapabilitiesRequest, GetDriversWithCapabilitiesResponse> serviceClient;
        try {
          serviceClient = connectedNode.newServiceClient(SERVICE_NAME, GetDriversWithCapabilities._TYPE);
          for(int i = 0; i < NUM_THREADS; i++) {
            ClientThread newThread = new ClientThread("Thread " + i, countDownLatch, messageFactory,
            SERVICE_NAME, serviceClient, connectedNode.getLog());
            newThread.start();
            threads.add(newThread);
          }
        } catch (org.ros.exception.ServiceNotFoundException e) {
          fail("Couldn't find service " + SERVICE_NAME);
        }
      }

      @Override
      public void onShutdown(Node node) {
        for (ClientThread thread: threads) {
          thread.interrupt();
        }
      }
    };

    // Start the transform server node
    nodeMainExecutor.execute(serverNode, nodeConfiguration);
    // Give time for service to be available
    Thread.sleep(1000);
    // Start the anonymous node to test the server
    nodeMainExecutor.execute(clientNode, secondConfig);
    assertTrue(countDownLatch.await(20, TimeUnit.SECONDS)); // Check if service calls were successful
    // // Shutdown nodes
    // nodeMainExecutor.shutdownNodeMain(clientNode);
    // // Shutting down the transform server from this test results in a exception on printing the service address
    // nodeMainExecutor.shutdownNodeMain(serverNode);
    // Stack trace is automatically logged
    // ROS is shutdown automatically in cleanup from ROS Test
  }
  
}

Thread Class

import cav_srvs.GetDriversWithCapabilities;
import cav_srvs.GetDriversWithCapabilitiesRequest;
import cav_srvs.GetDriversWithCapabilitiesResponse;
import org.ros.exception.RemoteException;
import org.ros.message.MessageFactory;
import org.ros.node.service.*;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

/**
 * Class for making service requests
 */
public class ClientThread extends Thread {
  final String requestStr;
  final CountDownLatch countDownLatch;
  final MessageFactory messageFactory;
  final ServiceClient<GetDriversWithCapabilitiesRequest, GetDriversWithCapabilitiesResponse> serviceClient;
  final String service;
  final boolean[] done = new boolean[1];
  final Log log;

  ClientThread(String requestStr, CountDownLatch countDownLatch, MessageFactory messageFactory,
   String service, ServiceClient<GetDriversWithCapabilitiesRequest, GetDriversWithCapabilitiesResponse> serviceClient, Log log) {
    this.requestStr = requestStr;
    this.countDownLatch = countDownLatch;
    this.messageFactory = messageFactory;
    this.service = service;
    this.serviceClient = serviceClient;
    this.done[0] = true;
    this.log = log;
  }

  @Override
  public void run(){  
    // Build ros messages
    if (done[0]) {
      final List<String> caps = new LinkedList<>();
      caps.add(requestStr);
      done[0] = false;
      GetDriversWithCapabilitiesRequest req =
      messageFactory.newFromType(GetDriversWithCapabilitiesRequest._TYPE);
    
      req.setCapabilities(caps);

      serviceClient.call(req, new ServiceResponseListener<GetDriversWithCapabilitiesResponse>() {
        @Override public void onSuccess(GetDriversWithCapabilitiesResponse response) {
          log.info("\n\n Request: " + req.getCapabilities().get(0) + " Result: " + response.getDriverData().get(0) + " \n\n");
          assertTrue(response.getDriverData().get(0).equals(req.getCapabilities().get(0)));
          countDownLatch.countDown();
          done[0] = true;
        }

        @Override
        public void onFailure(RemoteException e) {
          fail("Service request failed for request " + requestStr);
        }
      });
    }
  }  
}

The service used in this case looks like this.

String[] capabilities
---
String[] driver_data

@jubeira
Copy link

jubeira commented Jan 25, 2018

Thanks @msmcconnell, this looks useful!
If you find a fix for it please submit a PR. I will try to test this when I have some time.

@msmcconnell
Copy link
Author

I'll look into making a fix. Which branch is the preferred one to develop on? The README says master but it is listed as obsolete/master and the last PR was onto kinetic. Does that mean I should branch directly off kinetic?

@jubeira
Copy link

jubeira commented Jan 25, 2018

Good catch; that readme should be updated as well.
For a while now we are targeting everything to Kinetic, so please use it as a starting point, and then target your PR there.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants