Skip to content

Commit

Permalink
feat(rabbitmq): publish using ChannelWrapper (#678)
Browse files Browse the repository at this point in the history
Publish messages using the ChannelWrapper instead of the raw ConfirmChannel to take advantage of the
reliability features of  `amqp-connection-manager`.

BREAKING CHANGE: This changes the behavior of throwing connection related errors

re #673
  • Loading branch information
ttshivers committed Jan 16, 2024
1 parent 83530d3 commit 8962eed
Showing 1 changed file with 2 additions and 22 deletions.
24 changes: 2 additions & 22 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
ConfirmChannel,
Options,
} from 'amqplib';
import { Replies } from 'amqplib/properties';
import {
EMPTY,
interval,
Expand Down Expand Up @@ -589,12 +588,7 @@ export class AmqpConnection {
routingKey: string,
message: T,
options?: Options.Publish
): Promise<Replies.Empty> {
// source amqplib channel is used directly to keep the behavior of throwing connection related errors
if (!this.managedConnection.isConnected() || !this._channel) {
throw new Error('AMQP connection is not available');
}

): Promise<boolean> {
let buffer: Buffer;
if (message instanceof Buffer) {
buffer = message;
Expand All @@ -606,21 +600,7 @@ export class AmqpConnection {
buffer = Buffer.alloc(0);
}

return new Promise((resolve, reject) => {
this._channel.publish(
exchange,
routingKey,
buffer,
options,
(err, ok) => {
if (err) {
reject(err);
} else {
resolve(ok);
}
}
);
});
return this._managedChannel.publish(exchange, routingKey, buffer, options);
}

private handleMessage<T, U>(
Expand Down

0 comments on commit 8962eed

Please sign in to comment.