diff --git a/src/main/java/com/morlunk/jumble/net/JumbleConnection.java b/src/main/java/com/morlunk/jumble/net/JumbleConnection.java index 6f54199..9194902 100644 --- a/src/main/java/com/morlunk/jumble/net/JumbleConnection.java +++ b/src/main/java/com/morlunk/jumble/net/JumbleConnection.java @@ -625,17 +625,12 @@ public void onTCPConnectionEstablished() { mConnected = true; // Attempt to start UDP thread once connected. - if(!shouldForceTCP()) { - try { - mUDP = new JumbleUDP(mCryptState); - mUDP.setUDPConnectionListener(this); - mUDP.connect(mHost, mPort); - } catch (ConnectException e) { - onUDPConnectionError(e); - } + if (!shouldForceTCP()) { + mUDP = new JumbleUDP(mCryptState, this, mMainHandler); + mUDP.connect(mHost, mPort); } - if(mListener != null) mListener.onConnectionEstablished(); + if (mListener != null) mListener.onConnectionEstablished(); } @Override diff --git a/src/main/java/com/morlunk/jumble/net/JumbleNetworkThread.java b/src/main/java/com/morlunk/jumble/net/JumbleNetworkThread.java index b58f0e5..9c89117 100644 --- a/src/main/java/com/morlunk/jumble/net/JumbleNetworkThread.java +++ b/src/main/java/com/morlunk/jumble/net/JumbleNetworkThread.java @@ -27,6 +27,7 @@ * Base class for TCP/UDP protocol implementations. * Provides a common threading model (single threaded queue for write) * Created by andrew on 25/03/14. + * @deprecated This shouldn't be needed. Redundant inheritance with limited shared code. */ public abstract class JumbleNetworkThread implements Runnable { diff --git a/src/main/java/com/morlunk/jumble/net/JumbleTCP.java b/src/main/java/com/morlunk/jumble/net/JumbleTCP.java index 764b2af..6ffbbbc 100644 --- a/src/main/java/com/morlunk/jumble/net/JumbleTCP.java +++ b/src/main/java/com/morlunk/jumble/net/JumbleTCP.java @@ -126,15 +126,6 @@ public void run() { }); } } - - if(mListener != null) { - executeOnMainThread(new Runnable() { - @Override - public void run() { - mListener.onTCPConnectionDisconnect(); - } - }); - } } catch (SocketException e) { error("Could not open a connection to the host", e); } catch (SSLHandshakeException e) { @@ -162,6 +153,13 @@ public void run() { e.printStackTrace(); } mRunning = false; + + executeOnMainThread(new Runnable() { + @Override + public void run() { + mListener.onTCPConnectionDisconnect(); + } + }); stopThreads(); } } @@ -215,6 +213,8 @@ public void run() { * Attempts to disconnect gracefully on the Tx thread. * Disconnects interrupt the socket listening on the Tx thread, suppressing any exceptions * caused by this request. Any remaining protobuf messages will be dispatched first. + * + * Suppresses all future errors on this connection. */ public void disconnect() { if (!mRunning) return; @@ -256,6 +256,18 @@ public void run() { }); } + /** + * Runnable that + */ + private static class OutboxConsumer implements Runnable { + + + @Override + public void run() { + + } + } + public interface TCPConnectionListener { public void onTCPConnectionEstablished(); public void onTLSHandshakeFailed(X509Certificate[] chain); diff --git a/src/main/java/com/morlunk/jumble/net/JumbleUDP.java b/src/main/java/com/morlunk/jumble/net/JumbleUDP.java index ab59c82..b593a7f 100644 --- a/src/main/java/com/morlunk/jumble/net/JumbleUDP.java +++ b/src/main/java/com/morlunk/jumble/net/JumbleUDP.java @@ -17,15 +17,22 @@ package com.morlunk.jumble.net; +import android.os.Handler; import android.util.Log; import com.morlunk.jumble.Constants; +import org.jetbrains.annotations.NotNull; + import java.io.IOException; import java.net.ConnectException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; @@ -33,8 +40,11 @@ /** * Class to maintain and receive packets from the UDP connection to a Mumble server. + * Public interface is not thread safe. */ -public class JumbleUDP extends JumbleNetworkThread { +public class JumbleUDP implements Runnable { + private static final String TAG = "JumbleUDP"; + private static final int BUFFER_SIZE = 2048; private final CryptState mCryptState; @@ -45,23 +55,34 @@ public class JumbleUDP extends JumbleNetworkThread { private InetAddress mResolvedHost; private boolean mConnected; + /** Main datagram thread hosting this runnable. */ + private final Thread mDatagramThread; + + /** Handler to invoke listener callback invocations on. */ + private final Handler mCallbackHandler; + + /** Unbounded queue of outgoing packets to be sent. */ + private final BlockingQueue mSendQueue; + /** * Sets up a new UDP connection context. * @param cryptState Cryptographic state provider. + * @param listener Callback target. Messages will be posted on the callback handler given. + * @param callbackHandler Handler to post listener invocations on. */ - public JumbleUDP(CryptState cryptState) { + public JumbleUDP(@NotNull CryptState cryptState, @NotNull UDPConnectionListener listener, + @NotNull Handler callbackHandler) { mCryptState = cryptState; - } - - public void setUDPConnectionListener(UDPConnectionListener listener) { mListener = listener; + mCallbackHandler = callbackHandler; + mDatagramThread = new Thread(this); + mSendQueue = new LinkedBlockingQueue<>(); } - public void connect(String host, int port) throws ConnectException { - if(mConnected) throw new ConnectException("UDP connection already established!"); + public void connect(@NotNull String host, @NotNull int port) { mHost = host; mPort = port; - startThreads(); + mDatagramThread.start(); } public boolean isRunning() { @@ -70,39 +91,32 @@ public boolean isRunning() { @Override public void run() { + Thread outgoingConsumerThread = null; + mConnected = true; try { mResolvedHost = InetAddress.getByName(mHost); mUDPSocket = new DatagramSocket(); - } catch (final IOException e) { - if(mListener != null) { - executeOnMainThread(new Runnable() { - @Override - public void run() { - mListener.onUDPConnectionError(e); - } - }); - } - return; - } - mUDPSocket.connect(mResolvedHost, mPort); - final DatagramPacket packet = new DatagramPacket(new byte[BUFFER_SIZE], BUFFER_SIZE); + mUDPSocket.connect(mResolvedHost, mPort); + Log.d(TAG, "Created socket"); - Log.d(Constants.TAG, "[UDP] Created socket"); - mConnected = true; + // Start outgoing consumer once the UDP socket is open, as a child thread. + final OutgoingConsumer outgoingConsumer = new OutgoingConsumer(mUDPSocket, mSendQueue); + outgoingConsumerThread = new Thread(outgoingConsumer); + outgoingConsumerThread.start(); - while(mConnected) { - try { + final DatagramPacket packet = new DatagramPacket(new byte[BUFFER_SIZE], BUFFER_SIZE); + while (mConnected) { mUDPSocket.receive(packet); final byte[] data = packet.getData(); final int length = packet.getLength(); if (!mCryptState.isValid()) { - Log.d(Constants.TAG, "[UDP] CryptState invalid, discarding packet"); + Log.d(TAG, "CryptState invalid, discarding packet"); continue; } if (length < 5) { - Log.d(Constants.TAG, "[UDP] Packet too short, discarding"); + Log.d(TAG, "Packet too short, discarding"); continue; } @@ -111,82 +125,94 @@ public void run() { if (mListener != null) { if (buffer != null) { - executeOnReceiveThread(new Runnable() { + mCallbackHandler.post(new Runnable() { @Override public void run() { mListener.onUDPDataReceived(buffer); } }); - } else if(mCryptState.getLastGoodElapsed() > 5000000 && + } else if (mCryptState.getLastGoodElapsed() > 5000000 && mCryptState.getLastRequestElapsed() > 5000000) { mCryptState.resetLastRequestTime(); - executeOnMainThread(new Runnable() { + mCallbackHandler.post(new Runnable() { @Override public void run() { mListener.resyncCryptState(); } }); - Log.d(Constants.TAG, "[UDP] Packet failed to decrypt, discarding and " + - "requesting crypt state resync"); + Log.d(TAG, "Packet failed to decrypt, discarding and requesting crypt state resync"); } else { - Log.d(Constants.TAG, "[UDP] Packet failed to decrypt, discarding"); + Log.d(TAG, "Packet failed to decrypt, discarding"); } } - } catch (BadPaddingException|IllegalBlockSizeException|ShortBufferException e) { - Log.d(Constants.TAG, "[UDP] Discarding packet", e); - } - } catch (final IOException e) { - // If a UDP exception is thrown while connected, notify the listener to fall back to TCP. - if(mConnected && mListener != null) { - executeOnMainThread(new Runnable() { - @Override - public void run() { - mListener.onUDPConnectionError(e); - } - }); + } catch (BadPaddingException | IllegalBlockSizeException | ShortBufferException e) { + Log.d(Constants.TAG, "Discarding packet", e); } - break; } + } catch (final IOException e) { + // If mConnected is false, then this is a user-triggered disconnection. Report no error. + if (mConnected) { + Log.d(TAG, "UDP socket closed unexpectedly"); + mCallbackHandler.post(new Runnable() { + @Override + public void run() { + mListener.onUDPConnectionError(e); + } + }); + } else { + Log.d(TAG, "UDP socket closed in response to user disconnect"); + } + } finally { + mConnected = false; + + // We want to interrupt the outgoing queue consumer thread to avoid sends after socket + // cleanup. Blocking shouldn't be necessary. + if (outgoingConsumerThread != null) { + outgoingConsumerThread.interrupt(); + } + + // Clear the outgoing queue, in case the caller decides to reconnect with the same socket. + mSendQueue.clear(); + + mUDPSocket.close(); } - disconnect(); // Make sure we close the socket if disconnect wasn't controlled } - public void sendMessage(final byte[] data, final int length) { - if(!mCryptState.isValid() || !mConnected) return; - executeOnSendThread(new Runnable() { - @Override - public void run() { - try { - if(!mCryptState.isValid() || !mConnected) return; - byte[] encryptedData = mCryptState.encrypt(data, length); - final DatagramPacket packet = new DatagramPacket(encryptedData, encryptedData.length); - packet.setAddress(mResolvedHost); - packet.setPort(mPort); - mUDPSocket.send(packet); - } catch (IOException e) { - e.printStackTrace(); - } catch (IllegalBlockSizeException e) { - e.printStackTrace(); - } catch (ShortBufferException e) { - e.printStackTrace(); - } catch (BadPaddingException e) { - e.printStackTrace(); - } - } - }); + public void sendMessage(@NotNull final byte[] data, final int length) { + if (!mCryptState.isValid()) { + Log.w(TAG, "Invalid cryptstate prior to sendMessage call."); + return; + } + if (!mConnected) { + Log.w(TAG, "Tried to send UDP message without an active connection."); + return; + } + + try { + byte[] encryptedData = mCryptState.encrypt(data, length); + final DatagramPacket packet = new DatagramPacket(encryptedData, encryptedData.length); + packet.setAddress(mResolvedHost); + packet.setPort(mPort); + mSendQueue.add(packet); + } catch (BadPaddingException e) { + // TODO + e.printStackTrace(); + } catch (IllegalBlockSizeException e) { + // TODO + e.printStackTrace(); + } catch (ShortBufferException e) { + // TODO + e.printStackTrace(); + } } + /** + * Lazy, non-blocking idempotent disconnect. + */ public void disconnect() { - if(!mConnected) return; mConnected = false; - executeOnSendThread(new Runnable() { - @Override - public void run() { - mUDPSocket.disconnect(); - mUDPSocket.close(); - } - }); - stopThreads(); + // Closing a socket will trigger an IOException on the consumer thread. + mUDPSocket.close(); } /** @@ -194,8 +220,40 @@ public void run() { * onUDPDataReceived is always called on the UDP receive thread. */ public interface UDPConnectionListener { - public void onUDPDataReceived(byte[] data); - public void onUDPConnectionError(Exception e); - public void resyncCryptState(); + void onUDPDataReceived(byte[] data); + void onUDPConnectionError(Exception e); + void resyncCryptState(); + } + + /** + * Runnable that reads from a shared blocking queue, dispatching datagrams when available. + */ + private static class OutgoingConsumer implements Runnable { + private final DatagramSocket mSocket; + private final BlockingQueue mQueue; + + public OutgoingConsumer(@NotNull DatagramSocket socket, + @NotNull BlockingQueue queue) { + mSocket = socket; + mQueue = queue; + } + + @Override + public void run() { + Log.d(TAG, "Datagram outbox consumer active"); + boolean interrupted = false; + while (!interrupted) { + try { + DatagramPacket packet = mQueue.take(); + mSocket.send(packet); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + // Our datagram thread interrupted us. We should stop reading. + interrupted = true; + } + } + Log.d(TAG, "Datagram outbox consumer shutdown"); + } } }