Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't close subscriber if channel is recoverable. #64

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/main/scala/io/scalac/amqp/impl/QueuePublisher.scala
Expand Up @@ -37,8 +37,8 @@ private[amqp] class QueuePublisher(
case _ ⇒
Try(connection.createChannel()) match {
case Success(channel) ⇒
channel.addShutdownListener(newShutdownListener(subscriber))
val subscription = new QueueSubscription(channel, queue, subscriber)
subscription.addShutdownListener(newShutdownListener(subscriber))

try {
subscriber.onSubscribe(subscription)
Expand All @@ -54,10 +54,8 @@ private[amqp] class QueuePublisher(
}
}

def newShutdownListener(subscriber: Subscriber[_ >: Delivery]) = new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException) =
subscribers.single.transform(_ - subscriber)
}
def newShutdownListener(subscriber: Subscriber[_ >: Delivery]) =
() => subscribers.single.transform(_ - subscriber)

override def toString = s"QueuePublisher(connection=$connection, queue=$queue, prefetch=$prefetch)"
}
99 changes: 53 additions & 46 deletions src/main/scala/io/scalac/amqp/impl/QueueSubscription.scala
Expand Up @@ -16,34 +16,30 @@ import scala.util.control.NonFatal
private[amqp] class QueueSubscription(channel: Channel, queue: String, subscriber: Subscriber[_ >: Delivery])
extends DefaultConsumer(channel) with Subscription {

type Listener = () ⇒ Unit

val demand = Ref(0L)

/** Number of messages stored in this buffer is limited by channel QOS. */
val buffer = Ref(Queue[Delivery]())
val running = Ref(false)
var closeRequested = Ref(false)
val shutdownListeners = Ref(List[Listener]())

override def finalize(): Unit = {
try
closeChannel()
finally
super.finalize()
}
override def finalize(): Unit =
try closeChannel()
finally super.finalize()

override def handleCancel(consumerTag: String) = try {
subscriber.onComplete()
} catch {
case NonFatal(exception) ⇒
subscriber.onError(new IllegalStateException("Rule 2.13: onComplete threw an exception", exception))
}
override def handleCancel(consumerTag: String) = closeAndComplete()

override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) = sig match {
case sig if !sig.isInitiatedByApplication ⇒ subscriber.onError(sig)
case _ ⇒ // shutdown initiated by us
override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) = {
if (sig.isInitiatedByApplication) {
closeAndComplete()
} else if (!channel.isInstanceOf[Recoverable]) {
closeWithError(sig)
}
}



override def handleDelivery(consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
Expand All @@ -57,7 +53,7 @@ private[amqp] class QueueSubscription(channel: Channel, queue: String, subscribe
private def deliverRequested(): Unit = {
val go = atomic { implicit txn ⇒
if (demand() > 0 && buffer().nonEmpty)
running.transformAndExtract(r => (true, !r))
running.transformAndExtract(r (true, !r))
else
false
}
Expand All @@ -84,52 +80,63 @@ private[amqp] class QueueSubscription(channel: Channel, queue: String, subscribe
subscriber.onNext(delivery)
}
} catch {
case NonFatal(exception) ⇒
// 2.13: exception from onNext cancels subscription
try closeChannel() catch {
case NonFatal(_) ⇒ // mute
}
subscriber.onError(exception)
case NonFatal(exception) ⇒ closeWithError(exception)
}

def closeChannel(): Unit = synchronized {
if (closeRequested.single.compareAndSet(false, true) && channel.isOpen) {
try {
channel.close()
} catch {
case NonFatal(_) =>
if (channel.isOpen()) {
try channel.close() catch {
case _: AlreadyClosedException ⇒ // ignore
}
}
}

override def request(n: Long) = n match {
case n if n <= 0 ⇒
try closeChannel() catch {
case NonFatal(_) ⇒ // mute
}
subscriber.onError(new IllegalArgumentException("Rule 3.9: n <= 0"))

case n if channel.isOpen ⇒
case n if n <= 0 ⇒
closeWithError(new IllegalArgumentException("Rule 3.9: n <= 0"))
case n if channel.isOpen() ⇒
val newDemand = demand.single.transformAndGet(_ + n)
newDemand match {
case d if d < 0 ⇒ // 3.17: overflow
try closeChannel() catch {
case NonFatal(_) ⇒ // mute
}
subscriber.onError(new IllegalStateException("Rule 3.17: Pending + n > Long.MaxValue"))
closeWithError(new IllegalStateException("Rule 3.17: Pending + n > Long.MaxValue"))
case d ⇒
Future(deliverRequested())
}

case _ ⇒ // 3.6: nop
}

override def cancel() = try {
closeChannel()
} catch {
case _: AlreadyClosedException ⇒ // 3.7: nop
case NonFatal(exception) ⇒
subscriber.onError(exception)
override def cancel() = close()

def addShutdownListener(listener: Listener): Unit = {
shutdownListeners.single.transform(listener :: _)
}

private def notifyShutdownListeners(): Unit = {
shutdownListeners.single.get.foreach(_())
}

private def closeAndComplete(): Unit = close(subscriber.onComplete, subscriber.onError)

private def closeWithError(cause: Throwable): Unit =
close(() ⇒ subscriber.onError(cause), _ ⇒ subscriber.onError(cause))

private val nop = () ⇒ ()

private def close(
onSuccess: () ⇒ Unit = nop,
onError: Throwable ⇒ Unit = subscriber.onError
): Unit = {
if (closeRequested.single.compareAndSet(false, true)) {
try {
closeChannel()
onSuccess()
} catch {
case NonFatal(e) ⇒ onError(e)
} finally {
notifyShutdownListeners()
}
}
}

override def toString() = atomic { implicit txn ⇒
Expand Down