Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to update filter through API #25

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
224 changes: 212 additions & 12 deletions src/main/java/com/opendxl/streaming/client/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,16 @@ public Channel(final String base, final ChannelAuth auth, final String consumerG
* @param httpProxySettings contains http proxy hostname, port, username and password.
* @throws TemporaryError if http client request object failed to be created.
*/
public Channel(final String base, final ChannelAuth auth, final String consumerGroup, final String pathPrefix,
public Channel(final String base, final ChannelAuth auth, final String consumerGroup, final String pathPrefix,
final String consumerPathPrefix, final String producerPathPrefix, final boolean retryOnFail,
final String verifyCertBundle, final Properties extraConfigs,
final HttpProxySettings httpProxySettings)
throws TemporaryError {
this(base, auth, consumerGroup, pathPrefix, consumerPathPrefix, producerPathPrefix, retryOnFail,
verifyCertBundle, extraConfigs, httpProxySettings, true, Collections.EMPTY_MAP);
}
verifyCertBundle, extraConfigs, httpProxySettings, true, Collections.EMPTY_MAP);
}

public Channel(final String base, final ChannelAuth auth, final String consumerGroup, final String pathPrefix,
public Channel(final String base, final ChannelAuth auth, final String consumerGroup, final String pathPrefix,
final String consumerPathPrefix, final String producerPathPrefix, final boolean retryOnFail,
final String verifyCertBundle, final Properties extraConfigs,
final HttpProxySettings httpProxySettings, final boolean multiTenant,
Expand Down Expand Up @@ -346,7 +346,7 @@ public Channel(final String base, final ChannelAuth auth, final String consumerG
this.isHttps = base.toLowerCase().startsWith("https");
this.httpProxySettings = httpProxySettings;
this.request = new Request(base, auth, this.verifyCertBundle, this.isHttps, this.httpProxySettings,
requestHeaders);
requestHeaders);

this.retryOnFail = retryOnFail;

Expand Down Expand Up @@ -406,7 +406,7 @@ public void create() throws PermanentError, TemporaryError {

try {
final StringBuilder api = new StringBuilder(consumerPathPrefix)
.append("/consumers");
.append("/consumers");
if (!this.isMultiTenant) {
api.append("?multi_tenant=false");
}
Expand Down Expand Up @@ -455,8 +455,8 @@ public void subscribe(final List<String> topics) throws ConsumerError, Permanent
* @throws PermanentError if no topics were specified.
* @throws TemporaryError if the subscription attempt fails.
*/
public void subscribe(final List<String> topics, final Map<String, Object> filter,
final boolean payloadLookupForFilter) throws ConsumerError, PermanentError, TemporaryError {
public void subscribe(final List<String> topics, final Map<String, Map<String, Object>> filter,
final boolean payloadLookupForFilter) throws ConsumerError, PermanentError, TemporaryError {
acquireAndEnsureChannelIsActive();
try {
if (topics == null) {
Expand Down Expand Up @@ -522,6 +522,81 @@ public void subscribe(final List<String> topics, final Map<String, Object> filte

}

/**
* Update consumer filter with updated filter values
*
* @param topics Topic list.
* @throws ConsumerError if the consumer associated with the channel does not exist on the server.
* @throws PermanentError if no topics were specified.
* @throws TemporaryError if the subscription attempt fails.
*/
public void updateFilterSubscribe(final List<String> topics, final Map<String, Map<String, Object>> filter,
final boolean payloadLookupForFilter) throws ConsumerError, PermanentError, TemporaryError {
acquireAndEnsureChannelIsActive();
try {
if (topics == null) {

String error = "Non-empty value must be specified for topics.";
logger.error(error);
throw new PermanentError(error);

}

// Remove any null or empty topic from list
if (topics.isEmpty()) {

String error = "Non-empty value must be specified for topics";
logger.error(error);
throw new PermanentError(error);

}

if (consumerId == null || consumerId.isEmpty()) {
// Auto-create consumer group if none present
create();
}

Gson gson = new Gson();
byte[] body = null;
if (null == filter || filter.isEmpty()) {
Topics topicsToBeSubscribed = new Topics(topics);
body = gson.toJson(topicsToBeSubscribed).getBytes();
} else {
SubscribePayload payload = new SubscribePayload(topics, filter, payloadLookupForFilter);
body = gson.toJson(payload).getBytes();
}

StringBuilder api = new StringBuilder(consumerPathPrefix)
.append("/consumers/")
.append(consumerId)
.append("/updateFilter");
if (!this.isMultiTenant) {
api.append("?multi_tenant=false");
}

final String logMessage = new StringBuilder(logConsumerId())
.append(" to ").append(topics).append(" topics.").toString();
try {

request.post(api.toString(), body, SUBSCRIBE_ERROR_MAP);
if (logger.isDebugEnabled()) {
logger.debug("Subscribed call with updateFilter " + logMessage);
}

subscriptions.clear();
subscriptions.addAll(topics);

} catch (final ClientError error) {
error.setApi("subscribe");
logger.error("Failed to subscribe " + logMessage, error);
throw error;
}
} finally {
release();
}

}

/**
* List the topic names to which the consumer is subscribed.
*
Expand Down Expand Up @@ -653,7 +728,7 @@ public ConsumerRecords consume(final int timeout) throws ConsumerError, Permanen
* @throws TemporaryError if the consume attempt fails.
*/
public ConsumerRecords consume(final int timeout, final boolean filter)
throws ConsumerError, PermanentError, TemporaryError {
throws ConsumerError, PermanentError, TemporaryError {

acquireAndEnsureChannelIsActive();
try {
Expand Down Expand Up @@ -840,7 +915,7 @@ public void run(final ConsumerRecordProcessor processCallback, final List<String
* @throws TemporaryError consume or commit attempts failed with errors other than ConsumerError.
*/
public void run(final ConsumerRecordProcessor processCallback, final List<String> topics,
final Map<String, Object> filter, final boolean payloadLookupForFilter, final int timeout)
final Map<String, Map<String, Object>> filter, final boolean payloadLookupForFilter, final int timeout)
throws PermanentError, TemporaryError {

acquireAndEnsureChannelIsActive();
Expand Down Expand Up @@ -879,6 +954,67 @@ public void run(final ConsumerRecordProcessor processCallback, final List<String

}

/**
* <p>Update consumer filter for the subscribed topics.</p>
*
* <p>The supplied
* {@link ConsumerRecordProcessor#processCallback(ConsumerRecords, String)} method is invoked with a list containing
* each consumer record.</p>
*
* <p>{@link ConsumerRecordProcessor#processCallback(ConsumerRecords, String)} return value is <b>currently
* ignored</b>. It is <b>reserved for future use</b>.</p>
*
* <p>The {@link Channel#stop()} method can also be called to halt an execution of this method.</p>
*
* @param processCallback Callable which is invoked with a list of records which have been consumed.
* @param topics If set to a non-empty value, the channel will be subscribed to the specified topics.
* If set to an empty value, the channel will use topics previously subscribed via a call to the
* subscribe method.
* @param timeout Timeout in milliseconds to wait for records before returning
* @throws PermanentError if a prior run is already in progress or no consumer group value was specified or
* callback to deliver records was not specified
* @throws TemporaryError consume or commit attempts failed with errors other than ConsumerError.
*/
public void updateFilter(final ConsumerRecordProcessor processCallback, final List<String> topics,
final Map<String, Map<String, Object>> filter, final boolean payloadLookupForFilter, final int timeout)
throws PermanentError, TemporaryError {

acquireAndEnsureChannelIsActive();
try {
if (consumerGroup == null || consumerGroup.isEmpty()) {
String error = "No value specified for 'consumerGroup' during channel init";
logger.error(error);
throw new PermanentError(error);
}

if (processCallback == null) {
String error = "processCallback not provided";
logger.error(error);
throw new PermanentError(error);
}

List<String> topicsOfInterest = topics != null && !topics.isEmpty() ? topics : subscriptions;

if (running.compareAndSet(false, true)) {

logger.info("Channel is running");

while (!stopRequested.get()) {
consumeLoopToFilter(processCallback, topicsOfInterest, filter, payloadLookupForFilter, timeout);
}

if (logger.isDebugEnabled()) {
logger.debug("Exiting run method.");
}
}

} finally {
running.set(false);
release();
}

}

/**
* <p>Repeatedly consume records from the subscribed topic.</p>
*
Expand Down Expand Up @@ -1060,8 +1196,8 @@ public void close() throws TemporaryError, StopError, PermanentError {
* @throws PermanentError the callback asks to stop consuming records.
*/
private void consumeLoop(final ConsumerRecordProcessor processCallback, final List<String> topics,
final Map<String, Object> filter, final boolean payloadLookupForFilter, final int timeout)
throws PermanentError, TemporaryError {
final Map<String, Map<String, Object>> filter, final boolean payloadLookupForFilter, final int timeout)
throws PermanentError, TemporaryError {

boolean continueRunning = true;
boolean subscribed = false;
Expand Down Expand Up @@ -1124,6 +1260,70 @@ private void consumeLoop(final ConsumerRecordProcessor processCallback, final Li

}

/**
* <p>Calls consume to update filter
* </p>
*
* @param processCallback Callable which is invoked with a list of records which have been consumed.
* @param topics the channel will be subscribed to the specified topics.
* @param timeout Timeout in milliseconds to wait for records before returning
* @throws TemporaryError the consume or commit attempt failed with an error other than ConsumerError.
* @throws PermanentError the callback asks to stop consuming records.
*/
private void consumeLoopToFilter(final ConsumerRecordProcessor processCallback, final List<String> topics,
final Map<String, Map<String, Object>> filter, final boolean payloadLookupForFilter, final int timeout)
throws PermanentError, TemporaryError {

boolean continueRunning = true;
boolean subscribed = false;

while (continueRunning) {
try {
// if consumer is not subscribed yet, then subscribe it
if (!subscribed) {
updateFilterSubscribe(topics, filter, payloadLookupForFilter);
subscribed = true;
if (logger.isDebugEnabled()) {
// show topics consumer is subscribed to
subscriptions();
}
}

} catch (final ConsumerError error) {
// ConsumerError exception can be raised if the consumer has been removed or if callback found errors
// in records and it wants them to be consumed again.
// Then, current consumer is deleted and a brand new one is created to resume consuming from
// last commit.
logger.info("Creating a new consumer to resume consuming.");
logger.error("Consumer error was: ", error);
subscribed = false;
recreateConsumer(topics, error);

if (!retryOnFail) {
continueRunning = false;
if (logger.isDebugEnabled()) {
logger.debug("Exiting run method because retryOnFail is " + retryOnFail);
}
}

} catch (final PermanentError | TemporaryError error) {
// Delete consumer instance.
delete();
error.setApi("run");
logger.error("Exiting run method due to error", error);
throw error;
} finally {
// Check if there is a request to stop consuming records
if (stopRequested.get()) {
// Exit consume loop immediately
continueRunning = false;
}
}
}

}


/**
* <p>Produce records to the channel.</p>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
public class SubscribePayload {
private List<String> topics;

private Map<String, Object> filter;
private Map<String, Map<String, Object>> filter;

private boolean payloadLookupForFilter;

/**
* @param topics list of topic names
*/
public SubscribePayload(final List<String> topics, final Map<String, Object> filter,
final boolean payloadLookupForFilter) {
public SubscribePayload(final List<String> topics, final Map<String, Map<String, Object>> filter,
final boolean payloadLookupForFilter) {
this.topics = topics;
this.filter = filter;
this.payloadLookupForFilter = payloadLookupForFilter;
Expand Down