Skip to content

Commit

Permalink
Merge pull request #179 from snorwin/await-termination-timeout
Browse files Browse the repository at this point in the history
Added parameter to set await termination timeout
  • Loading branch information
fantavlik committed Sep 22, 2021
2 parents 4b7f686 + 427bca6 commit f681fb0
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 4 deletions.
Expand Up @@ -157,6 +157,7 @@ public static HttpEventCollectorLog4jAppender createAppender(
@PluginAttribute(value = "call_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_CALL_TIMEOUT) final long callTimeout,
@PluginAttribute(value = "read_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_READ_TIMEOUT) final long readTimeout,
@PluginAttribute(value = "write_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_WRITE_TIMEOUT) final long writeTimeout,
@PluginAttribute(value = "termination_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_TERMINATION_TIMEOUT) final long terminationTimeout,
@PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginElement("Filter") final Filter filter
)
Expand Down Expand Up @@ -219,7 +220,7 @@ public static HttpEventCollectorLog4jAppender createAppender(
disableCertificateValidation,
eventBodySerializer,
eventHeaderSerializer,
new HttpEventCollectorSender.TimeoutSettings(connectTimeout, callTimeout, readTimeout, writeTimeout)
new HttpEventCollectorSender.TimeoutSettings(connectTimeout, callTimeout, readTimeout, writeTimeout, terminationTimeout)
);
}

Expand Down
Expand Up @@ -384,6 +384,14 @@ public long getWriteTimeout(long milliseconds) {
return this.timeoutSettings.writeTimeout = milliseconds;
}

public void setTerminationTimeout(long milliseconds) {
this.timeoutSettings.terminationTimeout = milliseconds;
}

public long getTerminationTimeout(long milliseconds) {
return this.timeoutSettings.terminationTimeout = milliseconds;
}

private static long parseLong(String string, int defaultValue) {
try {
return Long.parseLong(string);
Expand Down
Expand Up @@ -114,6 +114,7 @@ public final class HttpEventCollectorLoggingHandler extends Handler {
private final String CallTimeoutConfTag = "call_timeout";
private final String ReadTimeoutConfTag = "read_timeout";
private final String WriteTimeoutConfTag = "write_timeout";
private final String TerminationTimeoutConfTag = "termination_timeout";

/** HttpEventCollectorLoggingHandler c-or */
public HttpEventCollectorLoggingHandler() {
Expand Down Expand Up @@ -165,7 +166,8 @@ public HttpEventCollectorLoggingHandler() {
getConfigurationNumericProperty(ConnectTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_CONNECT_TIMEOUT),
getConfigurationNumericProperty(CallTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_CALL_TIMEOUT),
getConfigurationNumericProperty(ReadTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_READ_TIMEOUT),
getConfigurationNumericProperty(WriteTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_WRITE_TIMEOUT)
getConfigurationNumericProperty(WriteTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_WRITE_TIMEOUT),
getConfigurationNumericProperty(TerminationTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_TERMINATION_TIMEOUT)
);

if ("raw".equalsIgnoreCase(type)) {
Expand Down
36 changes: 34 additions & 2 deletions src/main/java/com/splunk/logging/HttpEventCollectorSender.java
Expand Up @@ -29,6 +29,7 @@
import java.io.Serializable;
import java.security.cert.CertificateException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;


Expand Down Expand Up @@ -263,8 +264,36 @@ public static void putIfPresent(JsonObject collection, String tag, Object value)

private void stopHttpClient() {
if (httpClient != null) {
httpClient.dispatcher().executorService().shutdown();
Dispatcher dispatcher = httpClient.dispatcher();
httpClient = null;

if (timeoutSettings.terminationTimeout > 0) {
// wait for queued messages in the dispatcher to be promoted to the executor service
long start = System.currentTimeMillis();
while (dispatcher.queuedCallsCount() > 0 && start + timeoutSettings.terminationTimeout > System.currentTimeMillis()) {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}

// initialize the shutdown of the executor service
dispatcher.executorService().shutdown();

// wait for the messages in the dispatcher's executor service to be sent out
long awaitTerminationTimeout = timeoutSettings.terminationTimeout - (System.currentTimeMillis() - start);
if (awaitTerminationTimeout > 0) {
try {
dispatcher.executorService().awaitTermination(awaitTerminationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} else {
dispatcher.executorService().shutdown();
}
}
}

Expand Down Expand Up @@ -393,19 +422,22 @@ public static class TimeoutSettings {
public static final long DEFAULT_WRITE_TIMEOUT = 0; // 0 means no timeout
public static final long DEFAULT_CALL_TIMEOUT = 0;
public static final long DEFAULT_READ_TIMEOUT = 0;
public static final long DEFAULT_TERMINATION_TIMEOUT = 0;

public long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
public long callTimeout = DEFAULT_CALL_TIMEOUT;
public long readTimeout = DEFAULT_READ_TIMEOUT;
public long writeTimeout = DEFAULT_WRITE_TIMEOUT;
public long terminationTimeout = DEFAULT_TERMINATION_TIMEOUT;

public TimeoutSettings() {}

public TimeoutSettings(long connectTimeout, long callTimeout, long readTimeout, long writeTimeout) {
public TimeoutSettings(long connectTimeout, long callTimeout, long readTimeout, long writeTimeout, long terminationTimeout) {
this.connectTimeout = connectTimeout;
this.callTimeout = callTimeout;
this.readTimeout = readTimeout;
this.writeTimeout = writeTimeout;
this.terminationTimeout = terminationTimeout;
}
}
}
1 change: 1 addition & 0 deletions src/test/resources/log4j2.xml
Expand Up @@ -45,6 +45,7 @@ under the License.
batch_size_count="0"
batch_interval="0"
connect_timeout="5000"
termination_timeout="1000"
disableCertificateValidation="true">
<PatternLayout pattern="%m"/>
</SplunkHttp>
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/logback.xml
Expand Up @@ -60,6 +60,7 @@ under the License.
<messageFormat>text</messageFormat>
<middleware>HttpEventCollectorUnitTestMiddleware</middleware>
<connectTimeout>5000</connectTimeout>
<terminationTieout>2000</terminationTieout>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%msg</pattern>
</layout>
Expand Down

0 comments on commit f681fb0

Please sign in to comment.