Skip to content

Commit

Permalink
Merge pull request #6 from opendxl/client_credentials_support_impleme…
Browse files Browse the repository at this point in the history
…ntation

Client credentials support implementation
  • Loading branch information
umanekar-mcafee committed Jun 1, 2022
2 parents f2ce32d + bbae705 commit ee64b54
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 2 deletions.
2 changes: 1 addition & 1 deletion dxlstreamingclient/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.1.1"
__version__ = "0.1.2"
34 changes: 34 additions & 0 deletions dxlstreamingclient/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,40 @@ class PermanentAuthenticationError(PermanentError):
"""
pass

def token(url,clinetid,clientsecret,path_fragment="/iam/v1.4/token",verify_cert_bundle="",scope="",grant_type="",audience=""):
auth = (clinetid, clientsecret)
try:
with warnings.catch_warnings():
if not verify_cert_bundle:
warnings.filterwarnings("ignore", "Unverified HTTPS request")
body ={
'scope': scope,
'grant_type': grant_type,
'audience': audience
}
res = requests.post(furl(url).add(path=path_fragment).url,
auth=auth,data=body,
verify=verify_cert_bundle)
if res.status_code == 200:
try:
token = res.json()['access_token']
return token
except Exception as exp:
raise PermanentAuthenticationError(str(exp))
elif res.status_code == 401 or res.status_code == 403:
raise PermanentAuthenticationError(
"Unauthorized {}: {}".format(res.status_code, res.text))
else:
raise TemporaryAuthenticationError(
"Unexpected status code {}: {}".format(
res.status_code,
res.text
)
)
except RequestException as exp:
raise TemporaryAuthenticationError(
"Unexpected error: {}".format(str(exp))
)

def login(url, username, password, path_fragment="/identity/v1/login",
verify_cert_bundle=""):
Expand Down
43 changes: 42 additions & 1 deletion dxlstreamingclient/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import requests
from retrying import Retrying
from furl import furl
from .auth import login
from .auth import login, token
from .error import TemporaryError, PermanentError, StopError
from ._compat import is_string

Expand Down Expand Up @@ -88,6 +88,47 @@ def __call__(self, r):
return r


class ClientCredentialsChannelAuth(requests.auth.AuthBase):
"""
Authentication class for use with channel requests.
"""
def __init__(self, base, clientid, clientsecret, verify_cert_bundle="",scope="",grant_type="",audience=""):
"""
Constructor parameters:
:param str base: Base URL to forward authentication requests to.
:param str username: User name to supply for request authentication.
:param str password: Password to supply for request authentication.
:param str verify_cert_bundle: Path to a CA bundle file containing
certificates of trusted CAs. The CA bundle is used to validate that
the certificate of the authentication server being connected to was
signed by a valid authority. If set to an empty string, the server
certificate is not validated.
"""
self._clientid = clientid
self._clientsecret = clientsecret
self._base = base
self._token = None
self._verify_cert_bundle = verify_cert_bundle
self._scope = scope
self._grant_type = grant_type
self._audience = audience
super(ClientCredentialsChannelAuth, self).__init__()
def reset(self):
"""
Purge any credentials cached from a previous authentication.
"""
self._token = None
def __call__(self, r):
# Implement my authentication
if not self._token:
self._token = token(self._base, self._clientid,
self._clientsecret,
verify_cert_bundle=self._verify_cert_bundle,
scope=self._scope,
grant_type=self._grant_type,
audience=self._audience)
r.headers['Authorization'] = "Bearer {}".format(self._token)
return r
class Channel(object):
"""
The :class:`Channel` class is responsible for all communication with the
Expand Down
58 changes: 58 additions & 0 deletions sample/basic/basic_consume_with_client_credentials_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import absolute_import
import json
import os
import sys
from dxlstreamingclient.channel import Channel, ChannelAuth, ClientCredentialsChannelAuth
root_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(root_dir + "/../..")
sys.path.append(root_dir + "/..")
# Import common logging and configuration
from common import *
# Configure local logger
logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)
# Change these below to match the appropriate details for your
# channel connection.
CHANNEL_URL = "http://127.0.0.1:50080"
CHANNEL_IAM_URL = "https://iam.server-cloud.com/"
CHANNEL_CLIENT_ID = "me"
CHANNEL_CLIENT_SECRET = "secret"
CHANNEL_CONSUMER_GROUP = "sample_consumer_group"
CHANNEL_TOPIC_SUBSCRIPTIONS = ["testTopic"]
CHANNEL_SCOPE=""
CHANNEL_GRANT_TYPE=""
CHANNEL_AUDIENCE=""
# Path to a CA bundle file containing certificates of trusted CAs. The CA
# bundle is used to validate that the certificate of the server being connected
# to was signed by a valid authority. If set to an empty string, the server
# certificate is not validated.
VERIFY_CERTIFICATE_BUNDLE = ""
# This constant controls the frequency (in seconds) at which the channel 'run'
# call below polls the streaming service for new records.
WAIT_BETWEEN_QUERIES = 5
# Create a new channel object
with Channel(CHANNEL_URL,
auth=ClientCredentialsChannelAuth(CHANNEL_IAM_URL,
CHANNEL_CLIENT_ID,
CHANNEL_CLIENT_SECRET,
verify_cert_bundle=VERIFY_CERTIFICATE_BUNDLE,
scope=CHANNEL_SCOPE,
grant_type=CHANNEL_GRANT_TYPE,
audience=CHANNEL_AUDIENCE),
consumer_group=CHANNEL_CONSUMER_GROUP,
verify_cert_bundle=VERIFY_CERTIFICATE_BUNDLE
) as channel:
# Create a function which will be called back upon by the 'run' method (see
# below) when records are received from the channel.
def process_callback(payloads):
# Print the payloads which were received. 'payloads' is a list of
# dictionary objects extracted from the records received from the
# channel.
logger.info("Received payloads: \n%s",
json.dumps(payloads, indent=4, sort_keys=True))
# Return 'True' in order for the 'run' call to continue attempting to
# consume records.
return True
# Consume records indefinitely
channel.run(process_callback, wait_between_queries=WAIT_BETWEEN_QUERIES,
topics=CHANNEL_TOPIC_SUBSCRIPTIONS)
65 changes: 65 additions & 0 deletions sample/basic/basic_produce_with_client_credentials_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import absolute_import
from __future__ import print_function
import base64
import json
import os
import sys
from dxlstreamingclient.channel import Channel, ChannelAuth , ClientCredentialsChannelAuth
root_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(root_dir + "/../..")
sys.path.append(root_dir + "/..")
# Import common logging and configuration
from common import *
# Configure local logger
logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)
# Change these below to match the appropriate details for your
# channel connection.
CHANNEL_URL = "http://127.0.0.1:50080"
CHANNEL_IAM_URL = "https://iam.server-cloud.com/"
CHANNEL_CLIENT_ID = "me"
CHANNEL_CLIENT_SECRET = "secret"
CHANNEL_TOPIC = ["testTopic"]
CHANNEL_SCOPE=""
CHANNEL_GRANT_TYPE=""
CHANNEL_AUDIENCE=""
# Path to a CA bundle file containing certificates of trusted CAs. The CA
# bundle is used to validate that the certificate of the server being connected
# to was signed by a valid authority. If set to an empty string, the server
# certificate is not validated.
VERIFY_CERTIFICATE_BUNDLE = ""
# Create the message payload to be included in a record
message_payload = {
"message": "Hello from OpenDXL"
}
# Create the full payload with records to produce to the channel
channel_payload = {
"records": [
{
"routingData": {
"topic": CHANNEL_TOPIC,
"shardingKey": ""
},
"message": {
"headers": {},
# Convert the message payload from a dictionary to a
# base64-encoded string.
"payload": base64.b64encode(
json.dumps(message_payload).encode()).decode()
}
}
]
}
# Create a new channel object
with Channel(CHANNEL_URL,
auth=ClientCredentialsChannelAuth(CHANNEL_IAM_URL,
CHANNEL_CLIENT_ID,
CHANNEL_CLIENT_SECRET,
verify_cert_bundle=VERIFY_CERTIFICATE_BUNDLE,
scope=CHANNEL_SCOPE,
grant_type=CHANNEL_GRANT_TYPE,
audience=CHANNEL_AUDIENCE),
verify_cert_bundle=VERIFY_CERTIFICATE_BUNDLE) as channel:
# Produce the payload records to the channel
channel.produce(channel_payload)
print("Succeeded.")

0 comments on commit ee64b54

Please sign in to comment.