Skip to content

Commit

Permalink
Merge pull request #2 from jbarlow-mcafee/add-extra-config-param
Browse files Browse the repository at this point in the history
Add extra_configs parameter to the channel constructor
  • Loading branch information
chrissmith-mcafee committed Jul 2, 2018
2 parents f2563eb + fab9cf0 commit 1155254
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 21 deletions.
51 changes: 33 additions & 18 deletions dxlstreamingclient/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import absolute_import
from functools import wraps
import base64
import copy
import json
import logging
import threading
Expand Down Expand Up @@ -111,14 +112,21 @@ class Channel(object):
# streaming service
_DEFAULT_WAIT_BETWEEN_QUERIES = 30

# Constants for consumer config settings
_AUTO_OFFSET_RESET_CONFIG_SETTING = "auto.offset.reset"
_ENABLE_AUTO_COMMIT_CONFIG_SETTING = "enable.auto.commit"
_REQUEST_TIMEOUT_CONFIG_SETTING = "request.timeout.ms"
_SESSION_TIMEOUT_CONFIG_SETTING = "session.timeout.ms"

def __init__(self, base, auth,
consumer_group,
path_prefix="/databus/consumer-service/v1",
offset="latest", # earliest
request_timeout=None,
session_timeout=None,
retry_on_fail=True,
verify_cert_bundle=""):
verify_cert_bundle="",
extra_configs=None):
"""
Constructor parameters:
Expand Down Expand Up @@ -164,6 +172,12 @@ def __init__(self, base, auth,
the certificate of the authentication server being connected to was
signed by a valid authority. If set to an empty string, the server
certificate is not validated.
:param dict extra_configs: Dictionary of key/value pairs containing
any custom configuration settings which should be sent to the
streaming service when a consumer is created. Note that any
values specified for the `offset`, `request_timeout`, and/or
`session_timeout` parameters will override the corresponding
values, if specified, in the `extra_configs` parameter.
"""
self._base = base
self._path_prefix = path_prefix
Expand All @@ -177,13 +191,24 @@ def __init__(self, base, auth,
raise PermanentError(
"Value for 'offset' must be one of: {}".format(
', '.join(offset_values)))
self._offset = offset

# Convert from seconds to milliseconds
self._request_timeout = request_timeout * 1000 \
if request_timeout else request_timeout
self._session_timeout = session_timeout * 1000 \
if session_timeout else session_timeout
# Setup customer configs from supplied parameters
self._configs = copy.deepcopy(extra_configs) if extra_configs else {}

if self._ENABLE_AUTO_COMMIT_CONFIG_SETTING not in self._configs:
# this has to be false for now
self._configs[self._ENABLE_AUTO_COMMIT_CONFIG_SETTING] = "false"
self._configs[self._AUTO_OFFSET_RESET_CONFIG_SETTING] = offset

if session_timeout is not None:
# Convert from seconds to milliseconds
self._configs[self._SESSION_TIMEOUT_CONFIG_SETTING] = str(
session_timeout * 1000)

if request_timeout is not None:
# Convert from seconds to milliseconds
self._configs[self._REQUEST_TIMEOUT_CONFIG_SETTING] = str(
request_timeout * 1000)

# state variables
self._consumer_id = None
Expand Down Expand Up @@ -278,19 +303,9 @@ def create(self):
path="consumers").url
payload = {
"consumerGroup": self._consumer_group,
"configs": {
"enable.auto.commit": "false", # this has to be false for now
"auto.offset.reset": self._offset
}
"configs": self._configs
}

if self._session_timeout is not None:
payload["configs"]["session.timeout.ms"] = \
str(self._session_timeout)
if self._request_timeout is not None:
payload["configs"]["request.timeout.ms"] = \
str(self._request_timeout)

res = self._post_request(url, json=payload)

if res.status_code in [200, 201, 202, 204]:
Expand Down
13 changes: 10 additions & 3 deletions tests/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,12 @@ def test_main(self):
verify_cert_bundle="cabundle.crt",
request_timeout=70,
session_timeout=60,
offset="earliest")
offset="earliest",
extra_configs={
"enable.auto.commit": "true",
"one.extra.setting": "one extra value",
"another.extra.setting": 42
})

self.assertEqual(channel._session.verify, "cabundle.crt")

Expand All @@ -197,8 +202,10 @@ def test_main(self):
"configs": {
"request.timeout.ms": "70000",
"session.timeout.ms": "60000",
"enable.auto.commit": "false",
"auto.offset.reset": "earliest"
"enable.auto.commit": "true",
"auto.offset.reset": "earliest",
"one.extra.setting": "one extra value",
"another.extra.setting": 42
}
}
)
Expand Down

0 comments on commit 1155254

Please sign in to comment.