diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index cc13b8f..f95b238 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,6 +7,17 @@ on: pull_request: jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Ruff + run: pipx install ruff + - name: Ruff check + run: ruff check + - name: Ruff format + run: ruff format --diff + test: runs-on: ubuntu-latest strategy: diff --git a/fluent/__about__.py b/fluent/__about__.py index 4b4d30f..1cd2317 100644 --- a/fluent/__about__.py +++ b/fluent/__about__.py @@ -1 +1 @@ -__version__ = '0.10.1dev1' +__version__ = "0.10.1dev1" diff --git a/fluent/asynchandler.py b/fluent/asynchandler.py index bbba4c4..e150383 100644 --- a/fluent/asynchandler.py +++ b/fluent/asynchandler.py @@ -1,13 +1,11 @@ -# -*- coding: utf-8 -*- - from fluent import asyncsender from fluent import handler class FluentHandler(handler.FluentHandler): - ''' + """ Asynchronous Logging Handler for fluent. - ''' + """ def getSenderClass(self): return asyncsender.FluentSender @@ -18,7 +16,7 @@ def close(self): try: self.sender.close() finally: - super(FluentHandler, self).close() + super().close() finally: self.release() diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py index 24c6924..73a1e61 100644 --- a/fluent/asyncsender.py +++ b/fluent/asyncsender.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - import threading from queue import Queue, Full, Empty @@ -17,8 +15,7 @@ def _set_global_sender(sender): # pragma: no cover - """ [For testing] Function to set global sender directly - """ + """[For testing] Function to set global sender directly""" global _global_sender _global_sender = sender @@ -37,28 +34,37 @@ def close(): # pragma: no cover class FluentSender(sender.FluentSender): - def __init__(self, - tag, - host='localhost', - port=24224, - bufmax=1 * 1024 * 1024, - timeout=3.0, - verbose=False, - buffer_overflow_handler=None, - nanosecond_precision=False, - msgpack_kwargs=None, - queue_maxsize=DEFAULT_QUEUE_MAXSIZE, - queue_circular=DEFAULT_QUEUE_CIRCULAR, - queue_overflow_handler=None, - **kwargs): + def __init__( + self, + tag, + host="localhost", + port=24224, + bufmax=1 * 1024 * 1024, + timeout=3.0, + verbose=False, + buffer_overflow_handler=None, + nanosecond_precision=False, + msgpack_kwargs=None, + queue_maxsize=DEFAULT_QUEUE_MAXSIZE, + queue_circular=DEFAULT_QUEUE_CIRCULAR, + queue_overflow_handler=None, + **kwargs, + ): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. """ - super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout, - verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, - nanosecond_precision=nanosecond_precision, - msgpack_kwargs=msgpack_kwargs, - **kwargs) + super().__init__( + tag=tag, + host=host, + port=port, + bufmax=bufmax, + timeout=timeout, + verbose=verbose, + buffer_overflow_handler=buffer_overflow_handler, + nanosecond_precision=nanosecond_precision, + msgpack_kwargs=msgpack_kwargs, + **kwargs, + ) self._queue_maxsize = queue_maxsize self._queue_circular = queue_circular if queue_circular and queue_overflow_handler: @@ -66,12 +72,15 @@ def __init__(self, else: self._queue_overflow_handler = self._queue_overflow_handler_default - self._thread_guard = threading.Event() # This ensures visibility across all variables + self._thread_guard = ( + threading.Event() + ) # This ensures visibility across all variables self._closed = False self._queue = Queue(maxsize=queue_maxsize) - self._send_thread = threading.Thread(target=self._send_loop, - name="AsyncFluentSender %d" % id(self)) + self._send_thread = threading.Thread( + target=self._send_loop, name="AsyncFluentSender %d" % id(self) + ) self._send_thread.daemon = True self._send_thread.start() @@ -121,7 +130,7 @@ def _send(self, bytes_): return True def _send_loop(self): - send_internal = super(FluentSender, self)._send_internal + send_internal = super()._send_internal try: while True: diff --git a/fluent/event.py b/fluent/event.py index 76f27ca..c69e537 100644 --- a/fluent/event.py +++ b/fluent/event.py @@ -1,13 +1,11 @@ -# -*- coding: utf-8 -*- - import time from fluent import sender -class Event(object): +class Event: def __init__(self, label, data, **kwargs): - assert isinstance(data, dict), 'data must be a dict' - sender_ = kwargs.get('sender', sender.get_global_sender()) - timestamp = kwargs.get('time', int(time.time())) + assert isinstance(data, dict), "data must be a dict" + sender_ = kwargs.get("sender", sender.get_global_sender()) + timestamp = kwargs.get("time", int(time.time())) sender_.emit_with_time(label, timestamp, data) diff --git a/fluent/handler.py b/fluent/handler.py index 7aefd8f..2bc42b4 100644 --- a/fluent/handler.py +++ b/fluent/handler.py @@ -1,19 +1,12 @@ -# -*- coding: utf-8 -*- - +import json import logging import socket -import sys - -try: - import simplejson as json -except ImportError: # pragma: no cover - import json from fluent import sender -class FluentRecordFormatter(logging.Formatter, object): - """ A structured formatter for Fluent. +class FluentRecordFormatter(logging.Formatter): + """A structured formatter for Fluent. Best used with server storing data in an ElasticSearch cluster for example. @@ -33,36 +26,49 @@ class FluentRecordFormatter(logging.Formatter, object): Can be an iterable. """ - def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False, format_json=True, - exclude_attrs=None): - super(FluentRecordFormatter, self).__init__(None, datefmt) - - if sys.version_info[0:2] >= (3, 2) and style != '%': + def __init__( + self, + fmt=None, + datefmt=None, + style="%", + fill_missing_fmt_key=False, + format_json=True, + exclude_attrs=None, + ): + super().__init__(None, datefmt) + + if style != "%": self.__style, basic_fmt_dict = { - '{': (logging.StrFormatStyle, { - 'sys_host': '{hostname}', - 'sys_name': '{name}', - 'sys_module': '{module}', - }), - '$': (logging.StringTemplateStyle, { - 'sys_host': '${hostname}', - 'sys_name': '${name}', - 'sys_module': '${module}', - }), + "{": ( + logging.StrFormatStyle, + { + "sys_host": "{hostname}", + "sys_name": "{name}", + "sys_module": "{module}", + }, + ), + "$": ( + logging.StringTemplateStyle, + { + "sys_host": "${hostname}", + "sys_name": "${name}", + "sys_module": "${module}", + }, + ), }[style] else: self.__style = None basic_fmt_dict = { - 'sys_host': '%(hostname)s', - 'sys_name': '%(name)s', - 'sys_module': '%(module)s', + "sys_host": "%(hostname)s", + "sys_name": "%(name)s", + "sys_module": "%(module)s", } if exclude_attrs is not None: self._exc_attrs = set(exclude_attrs) self._fmt_dict = None self._formatter = self._format_by_exclusion - self.usesTime = super(FluentRecordFormatter, self).usesTime + self.usesTime = super().usesTime else: self._exc_attrs = None if not fmt: @@ -89,7 +95,7 @@ def __init__(self, fmt=None, datefmt=None, style='%', fill_missing_fmt_key=False def format(self, record): # Compute attributes handled by parent class. - super(FluentRecordFormatter, self).format(record) + super().format(record) # Add ours record.hostname = self.hostname @@ -103,7 +109,7 @@ def usesTime(self): """This method is substituted on construction based on settings for performance reasons""" def _structuring(self, data, record): - """ Melds `msg` into `data`. + """Melds `msg` into `data`. :param data: dictionary to be sent to fluent server :param msg: :class:`LogRecord`'s message to add to `data`. @@ -118,7 +124,7 @@ def _structuring(self, data, record): elif isinstance(msg, str): self._add_dic(data, self._format_msg(record, msg)) else: - self._add_dic(data, {'message': msg}) + self._add_dic(data, {"message": msg}) def _format_msg_json(self, record, msg): try: @@ -131,7 +137,7 @@ def _format_msg_json(self, record, msg): return self._format_msg_default(record, msg) def _format_msg_default(self, record, msg): - return {'message': super(FluentRecordFormatter, self).format(record)} + return {"message": super().format(record)} def _format_by_exclusion(self, record): data = {} @@ -175,17 +181,18 @@ class FluentHandler(logging.Handler): Logging Handler for fluent. """ - def __init__(self, - tag, - host='localhost', - port=24224, - timeout=3.0, - verbose=False, - buffer_overflow_handler=None, - msgpack_kwargs=None, - nanosecond_precision=False, - **kwargs): - + def __init__( + self, + tag, + host="localhost", + port=24224, + timeout=3.0, + verbose=False, + buffer_overflow_handler=None, + msgpack_kwargs=None, + nanosecond_precision=False, + **kwargs, + ): self.tag = tag self._host = host self._port = port @@ -213,29 +220,45 @@ def sender(self): buffer_overflow_handler=self._buffer_overflow_handler, msgpack_kwargs=self._msgpack_kwargs, nanosecond_precision=self._nanosecond_precision, - **self._kwargs + **self._kwargs, ) return self._sender - def getSenderInstance(self, tag, host, port, timeout, verbose, - buffer_overflow_handler, msgpack_kwargs, - nanosecond_precision, **kwargs): + def getSenderInstance( + self, + tag, + host, + port, + timeout, + verbose, + buffer_overflow_handler, + msgpack_kwargs, + nanosecond_precision, + **kwargs, + ): sender_class = self.getSenderClass() - return sender_class(tag, - host=host, port=port, - timeout=timeout, verbose=verbose, - buffer_overflow_handler=buffer_overflow_handler, - msgpack_kwargs=msgpack_kwargs, - nanosecond_precision=nanosecond_precision, **kwargs) + return sender_class( + tag, + host=host, + port=port, + timeout=timeout, + verbose=verbose, + buffer_overflow_handler=buffer_overflow_handler, + msgpack_kwargs=msgpack_kwargs, + nanosecond_precision=nanosecond_precision, + **kwargs, + ) def emit(self, record): data = self.format(record) _sender = self.sender - return _sender.emit_with_time(None, - sender.EventTime(record.created) - if _sender.nanosecond_precision - else int(record.created), - data) + return _sender.emit_with_time( + None, + sender.EventTime(record.created) + if _sender.nanosecond_precision + else int(record.created), + data, + ) def close(self): self.acquire() @@ -243,7 +266,7 @@ def close(self): try: self.sender.close() finally: - super(FluentHandler, self).close() + super().close() finally: self.release() diff --git a/fluent/sender.py b/fluent/sender.py index 68e86d5..8770dcd 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - import errno import socket import struct @@ -13,8 +11,7 @@ def _set_global_sender(sender): # pragma: no cover - """ [For testing] Function to set global sender directly - """ + """[For testing] Function to set global sender directly""" global _global_sender _global_sender = sender @@ -35,26 +32,28 @@ def close(): # pragma: no cover class EventTime(msgpack.ExtType): def __new__(cls, timestamp): seconds = int(timestamp) - nanoseconds = int(timestamp % 1 * 10 ** 9) - return super(EventTime, cls).__new__( + nanoseconds = int(timestamp % 1 * 10**9) + return super().__new__( cls, code=0, data=struct.pack(">II", seconds, nanoseconds), ) -class FluentSender(object): - def __init__(self, - tag, - host='localhost', - port=24224, - bufmax=1 * 1024 * 1024, - timeout=3.0, - verbose=False, - buffer_overflow_handler=None, - nanosecond_precision=False, - msgpack_kwargs=None, - **kwargs): +class FluentSender: + def __init__( + self, + tag, + host="localhost", + port=24224, + bufmax=1 * 1024 * 1024, + timeout=3.0, + verbose=False, + buffer_overflow_handler=None, + nanosecond_precision=False, + msgpack_kwargs=None, + **kwargs, + ): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. """ @@ -88,23 +87,28 @@ def emit_with_time(self, label, timestamp, data): bytes_ = self._make_packet(label, timestamp, data) except Exception as e: self.last_error = e - bytes_ = self._make_packet(label, timestamp, - {"level": "CRITICAL", - "message": "Can't output to log", - "traceback": traceback.format_exc()}) + bytes_ = self._make_packet( + label, + timestamp, + { + "level": "CRITICAL", + "message": "Can't output to log", + "traceback": traceback.format_exc(), + }, + ) return self._send(bytes_) @property def last_error(self): - return getattr(self._last_error_threadlocal, 'exception', None) + return getattr(self._last_error_threadlocal, "exception", None) @last_error.setter def last_error(self, err): self._last_error_threadlocal.exception = err def clear_last_error(self, _thread_id=None): - if hasattr(self._last_error_threadlocal, 'exception'): - delattr(self._last_error_threadlocal, 'exception') + if hasattr(self._last_error_threadlocal, "exception"): + delattr(self._last_error_threadlocal, "exception") def close(self): with self.lock: @@ -122,7 +126,7 @@ def close(self): def _make_packet(self, label, timestamp, data): if label: - tag = '.'.join((self.tag, label)) if self.tag else label + tag = ".".join((self.tag, label)) if self.tag else label else: tag = self.tag packet = (tag, timestamp, data) @@ -149,7 +153,7 @@ def _send_internal(self, bytes_): self.pendings = None return True - except socket.error as e: + except OSError as e: self.last_error = e # close socket @@ -169,13 +173,13 @@ def _check_recv_side(self): self.socket.settimeout(0.0) try: recvd = self.socket.recv(4096) - except socket.error as recv_e: + except OSError as recv_e: if recv_e.errno != errno.EWOULDBLOCK: raise return - if recvd == b'': - raise socket.error(errno.EPIPE, "Broken pipe") + if recvd == b"": + raise OSError(errno.EPIPE, "Broken pipe") finally: self.socket.settimeout(self.timeout) @@ -189,17 +193,17 @@ def _send_data(self, bytes_): while bytes_sent < bytes_to_send: sent = self.socket.send(bytes_[bytes_sent:]) if sent == 0: - raise socket.error(errno.EPIPE, "Broken pipe") + raise OSError(errno.EPIPE, "Broken pipe") bytes_sent += sent self._check_recv_side() def _reconnect(self): if not self.socket: try: - if self.host.startswith('unix://'): + if self.host.startswith("unix://"): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) - sock.connect(self.host[len('unix://'):]) + sock.connect(self.host[len("unix://") :]) else: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) @@ -219,7 +223,7 @@ def _call_buffer_overflow_handler(self, pending_events): try: if self.buffer_overflow_handler: self.buffer_overflow_handler(pending_events) - except Exception as e: + except Exception: # User should care any exception in handler pass @@ -230,12 +234,12 @@ def _close(self): try: try: sock.shutdown(socket.SHUT_RDWR) - except socket.error: # pragma: no cover + except OSError: # pragma: no cover pass finally: try: sock.close() - except socket.error: # pragma: no cover + except OSError: # pragma: no cover pass finally: self.socket = None diff --git a/tests/mockserver.py b/tests/mockserver.py index f1462a1..6ea2fff 100644 --- a/tests/mockserver.py +++ b/tests/mockserver.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - try: from cStringIO import StringIO as BytesIO except ImportError: @@ -16,13 +14,13 @@ class MockRecvServer(threading.Thread): Single threaded server accepts one connection and recv until EOF. """ - def __init__(self, host='localhost', port=0): - super(MockRecvServer, self).__init__() + def __init__(self, host="localhost", port=0): + super().__init__() - if host.startswith('unix://'): + if host.startswith("unix://"): self.socket_proto = socket.AF_UNIX self.socket_type = socket.SOCK_STREAM - self.socket_addr = host[len('unix://'):] + self.socket_addr = host[len("unix://") :] else: self.socket_proto = socket.AF_INET self.socket_type = socket.SOCK_STREAM @@ -55,7 +53,7 @@ def run(self): if not data: break self._buf.write(data) - except socket.error as e: + except OSError as e: print("MockServer error: %s" % e) break finally: @@ -69,15 +67,13 @@ def get_received(self): return list(Unpacker(self._buf)) def close(self): - try: self._sock.close() except Exception: pass try: - conn = socket.socket(socket.AF_INET, - socket.SOCK_STREAM) + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: conn.connect((self.socket_addr[0], self.port)) finally: diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index bbbf52e..7bbf108 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -1,18 +1,14 @@ -#  -*- coding: utf-8 -*- - import logging -import sys import unittest try: from unittest import mock except ImportError: - import mock + from unittest import mock try: from unittest.mock import patch except ImportError: - from mock import patch - + from unittest.mock import patch import fluent.asynchandler @@ -28,8 +24,8 @@ def get_logger(name, level=logging.INFO): class TestHandler(unittest.TestCase): def setUp(self): - super(TestHandler, self).setUp() - self._server = mockserver.MockRecvServer('localhost') + super().setUp() + self._server = mockserver.MockRecvServer("localhost") self._port = self._server.port def tearDown(self): @@ -43,233 +39,233 @@ def get_data(self): return self._server.get_received() def test_simple(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info({ - 'from': 'userA', - 'to': 'userB' - }) + log.info({"from": "userA", "to": "userB"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('app.follow', data[0][0]) - eq('userA', data[0][2]['from']) - eq('userB', data[0][2]['to']) + eq("app.follow", data[0][0]) + eq("userA", data[0][2]["from"]) + eq("userB", data[0][2]["to"]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_custom_fmt(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'lineno': '%(lineno)d', - 'emitted_at': '%(asctime)s', - }) + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "%(name)s", + "lineno": "%(lineno)d", + "emitted_at": "%(asctime)s", + } + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) - @unittest.skipUnless(sys.version_info[0:2] >= (3, 2), 'supported with Python 3.2 or above') def test_custom_fmt_with_format_style(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '{name}', - 'lineno': '{lineno}', - 'emitted_at': '{asctime}', - }, style='{') + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "{name}", + "lineno": "{lineno}", + "emitted_at": "{asctime}", + }, + style="{", + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) - @unittest.skipUnless(sys.version_info[0:2] >= (3, 2), 'supported with Python 3.2 or above') def test_custom_fmt_with_template_style(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '${name}', - 'lineno': '${lineno}', - 'emitted_at': '${asctime}', - }, style='$') + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "${name}", + "lineno": "${lineno}", + "emitted_at": "${asctime}", + }, + style="$", + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) def test_custom_field_raise_exception(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' - }) + fluent.handler.FluentRecordFormatter( + fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"} + ) ) log.addHandler(handler) with self.assertRaises(KeyError): - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) def test_custom_field_fill_missing_fmt_key_is_true(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' - }, - fill_missing_fmt_key=True + fluent.handler.FluentRecordFormatter( + fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"}, + fill_missing_fmt_key=True, ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('custom_field' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("custom_field" in data[0][2]) # field defaults to none if not in log record - self.assertIsNone(data[0][2]['custom_field']) + self.assertIsNone(data[0][2]["custom_field"]) def test_json_encoded_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) log.info('{"key": "hello world!", "param": "value"}') data = self.get_data() - self.assertTrue('key' in data[0][2]) - self.assertEqual('hello world!', data[0][2]['key']) + self.assertTrue("key" in data[0][2]) + self.assertEqual("hello world!", data[0][2]["key"]) def test_unstructured_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info('hello %s', 'world') + log.info("hello %s", "world") data = self.get_data() - self.assertTrue('message' in data[0][2]) - self.assertEqual('hello world', data[0][2]['message']) + self.assertTrue("message" in data[0][2]) + self.assertEqual("hello world", data[0][2]["message"]) def test_unstructured_formatted_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info('hello world, %s', 'you!') + log.info("hello world, %s", "you!") data = self.get_data() - self.assertTrue('message' in data[0][2]) - self.assertEqual('hello world, you!', data[0][2]['message']) + self.assertTrue("message" in data[0][2]) + self.assertEqual("hello world, you!", data[0][2]["message"]) def test_number_string_simple_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) log.info("1") data = self.get_data() - self.assertTrue('message' in data[0][2]) + self.assertTrue("message" in data[0][2]) def test_non_string_simple_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) log.info(42) data = self.get_data() - self.assertTrue('message' in data[0][2]) + self.assertTrue("message" in data[0][2]) def test_non_string_dict_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info({42: 'root'}) + log.info({42: "root"}) data = self.get_data() # For some reason, non-string keys are ignored self.assertFalse(42 in data[0][2]) def test_exception_message(self): - handler = self.get_handler_class()('app.follow', port=self._port) + handler = self.get_handler_class()("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) try: - raise Exception('sample exception') + raise Exception("sample exception") except Exception: - log.exception('it failed') + log.exception("it failed") data = self.get_data() - message = data[0][2]['message'] + message = data[0][2]["message"] # Includes the logged message, as well as the stack trace. - self.assertTrue('it failed' in message) + self.assertTrue("it failed" in message) self.assertTrue('tests/test_asynchandler.py", line' in message) - self.assertTrue('Exception: sample exception' in message) + self.assertTrue("Exception: sample exception" in message) class TestHandlerWithCircularQueue(unittest.TestCase): Q_SIZE = 3 def setUp(self): - super(TestHandlerWithCircularQueue, self).setUp() - self._server = mockserver.MockRecvServer('localhost') + super().setUp() + self._server = mockserver.MockRecvServer("localhost") self._port = self._server.port def tearDown(self): @@ -283,21 +279,24 @@ def get_data(self): return self._server.get_received() def test_simple(self): - handler = self.get_handler_class()('app.follow', port=self._port, - queue_maxsize=self.Q_SIZE, - queue_circular=True) + handler = self.get_handler_class()( + "app.follow", + port=self._port, + queue_maxsize=self.Q_SIZE, + queue_circular=True, + ) with handler: self.assertEqual(handler.sender.queue_circular, True) self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 5, 'from': 'userA', 'to': 'userB'}) + log.info({"cnt": 1, "from": "userA", "to": "userB"}) + log.info({"cnt": 2, "from": "userA", "to": "userB"}) + log.info({"cnt": 3, "from": "userA", "to": "userB"}) + log.info({"cnt": 4, "from": "userA", "to": "userB"}) + log.info({"cnt": 5, "from": "userA", "to": "userB"}) data = self.get_data() eq = self.assertEqual @@ -307,9 +306,9 @@ def test_simple(self): el = data[0] eq(3, len(el)) - eq('app.follow', el[0]) - eq('userA', el[2]['from']) - eq('userB', el[2]['to']) + eq("app.follow", el[0]) + eq("userA", el[2]["from"]) + eq("userB", el[2]["to"]) self.assertTrue(el[1]) self.assertTrue(isinstance(el[1], int)) @@ -326,8 +325,8 @@ class TestHandlerWithCircularQueueHandler(unittest.TestCase): Q_SIZE = 1 def setUp(self): - super(TestHandlerWithCircularQueueHandler, self).setUp() - self._server = mockserver.MockRecvServer('localhost') + super().setUp() + self._server = mockserver.MockRecvServer("localhost") self._port = self._server.port def tearDown(self): @@ -338,41 +337,49 @@ def get_handler_class(self): return fluent.asynchandler.FluentHandler def test_simple(self): - handler = self.get_handler_class()('app.follow', port=self._port, - queue_maxsize=self.Q_SIZE, - queue_circular=True, - queue_overflow_handler=queue_overflow_handler) + handler = self.get_handler_class()( + "app.follow", + port=self._port, + queue_maxsize=self.Q_SIZE, + queue_circular=True, + queue_overflow_handler=queue_overflow_handler, + ) with handler: + def custom_full_queue(): - handler.sender._queue.put(b'Mock', block=True) + handler.sender._queue.put(b"Mock", block=True) return True - with patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(side_effect=custom_full_queue)): + with patch.object( + fluent.asynchandler.asyncsender.Queue, + "full", + mock.Mock(side_effect=custom_full_queue), + ): self.assertEqual(handler.sender.queue_circular, True) self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) exc_counter = 0 try: - log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + log.info({"cnt": 1, "from": "userA", "to": "userB"}) except QueueOverflowException: exc_counter += 1 try: - log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + log.info({"cnt": 2, "from": "userA", "to": "userB"}) except QueueOverflowException: exc_counter += 1 try: - log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) + log.info({"cnt": 3, "from": "userA", "to": "userB"}) except QueueOverflowException: exc_counter += 1 # we can't be sure to have exception in every case due to multithreading, # so we can test only for a cautelative condition here - print('Exception raised: {} (expected 3)'.format(exc_counter)) + print(f"Exception raised: {exc_counter} (expected 3)") assert exc_counter >= 0 diff --git a/tests/test_asyncsender.py b/tests/test_asyncsender.py index eb36f96..d690ae1 100644 --- a/tests/test_asyncsender.py +++ b/tests/test_asyncsender.py @@ -1,7 +1,3 @@ -# -*- coding: utf-8 -*- - -from __future__ import print_function - import socket import unittest @@ -14,6 +10,7 @@ class TestSetup(unittest.TestCase): def tearDown(self): from fluent.asyncsender import _set_global_sender + _set_global_sender(None) def test_no_kwargs(self): @@ -46,10 +43,11 @@ def test_tolerant(self): class TestSender(unittest.TestCase): def setUp(self): - super(TestSender, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", port=self._server.port + ) def tearDown(self): try: @@ -62,41 +60,41 @@ def get_data(self): def test_simple(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_decorator_simple(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_nanosecond(self): with self._sender as sender: sender.nanosecond_precision = True - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(isinstance(data[0][1], msgpack.ExtType)) eq(data[0][1].code, 0) @@ -104,21 +102,21 @@ def test_nanosecond_coerce_float(self): time_ = 1490061367.8616468906402588 with self._sender as sender: sender.nanosecond_precision = True - sender.emit_with_time('foo', time_, {'bar': 'baz'}) + sender.emit_with_time("foo", time_, {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(isinstance(data[0][1], msgpack.ExtType)) eq(data[0][1].code, 0) - eq(data[0][1].data, b'X\xd0\x8873[\xb0*') + eq(data[0][1].data, b"X\xd0\x8873[\xb0*") def test_no_last_error_on_successful_emit(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) self.assertEqual(sender.last_error, None) @@ -135,8 +133,10 @@ def test_clear_last_error(self): self.assertEqual(self._sender.last_error, None) - @unittest.skip("This test failed with 'TypeError: catching classes that do not " - "inherit from BaseException is not allowed' so skipped") + @unittest.skip( + "This test failed with 'TypeError: catching classes that do not " + "inherit from BaseException is not allowed' so skipped" + ) def test_connect_exception_during_sender_init(self, mock_socket): # Make the socket.socket().connect() call raise a custom exception mock_connect = mock_socket.socket.return_value.connect @@ -147,7 +147,9 @@ def test_connect_exception_during_sender_init(self, mock_socket): def test_sender_without_flush(self): with self._sender as sender: - sender._queue.put(fluent.asyncsender._TOMBSTONE) # This closes without closing + sender._queue.put( + fluent.asyncsender._TOMBSTONE + ) # This closes without closing sender._send_thread.join() for x in range(1, 10): sender._queue.put(x) @@ -157,10 +159,11 @@ def test_sender_without_flush(self): class TestSenderDefaultProperties(unittest.TestCase): def setUp(self): - super(TestSenderDefaultProperties, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", port=self._server.port + ) def tearDown(self): try: @@ -178,11 +181,11 @@ def test_default_properties(self): class TestSenderWithTimeout(unittest.TestCase): def setUp(self): - super(TestSenderWithTimeout, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port, - queue_timeout=0.04) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", port=self._server.port, queue_timeout=0.04 + ) def tearDown(self): try: @@ -195,27 +198,27 @@ def get_data(self): def test_simple(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_simple_with_timeout_props(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) @@ -224,19 +227,21 @@ class TestEventTime(unittest.TestCase): def test_event_time(self): time = fluent.asyncsender.EventTime(1490061367.8616468906402588) self.assertEqual(time.code, 0) - self.assertEqual(time.data, b'X\xd0\x8873[\xb0*') + self.assertEqual(time.data, b"X\xd0\x8873[\xb0*") class TestSenderWithTimeoutAndCircular(unittest.TestCase): Q_SIZE = 3 def setUp(self): - super(TestSenderWithTimeoutAndCircular, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port, - queue_maxsize=self.Q_SIZE, - queue_circular=True) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", + port=self._server.port, + queue_maxsize=self.Q_SIZE, + queue_circular=True, + ) def tearDown(self): try: @@ -253,15 +258,15 @@ def test_simple(self): self.assertEqual(self._sender.queue_circular, True) self.assertEqual(self._sender.queue_blocking, False) - ok = sender.emit('foo1', {'bar': 'baz1'}) + ok = sender.emit("foo1", {"bar": "baz1"}) self.assertTrue(ok) - ok = sender.emit('foo2', {'bar': 'baz2'}) + ok = sender.emit("foo2", {"bar": "baz2"}) self.assertTrue(ok) - ok = sender.emit('foo3', {'bar': 'baz3'}) + ok = sender.emit("foo3", {"bar": "baz3"}) self.assertTrue(ok) - ok = sender.emit('foo4', {'bar': 'baz4'}) + ok = sender.emit("foo4", {"bar": "baz4"}) self.assertTrue(ok) - ok = sender.emit('foo5', {'bar': 'baz5'}) + ok = sender.emit("foo5", {"bar": "baz5"}) self.assertTrue(ok) data = self.get_data() @@ -282,11 +287,11 @@ class TestSenderWithTimeoutMaxSizeNonCircular(unittest.TestCase): Q_SIZE = 3 def setUp(self): - super(TestSenderWithTimeoutMaxSizeNonCircular, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port, - queue_maxsize=self.Q_SIZE) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", port=self._server.port, queue_maxsize=self.Q_SIZE + ) def tearDown(self): try: @@ -303,15 +308,15 @@ def test_simple(self): self.assertEqual(self._sender.queue_blocking, True) self.assertEqual(self._sender.queue_circular, False) - ok = sender.emit('foo1', {'bar': 'baz1'}) + ok = sender.emit("foo1", {"bar": "baz1"}) self.assertTrue(ok) - ok = sender.emit('foo2', {'bar': 'baz2'}) + ok = sender.emit("foo2", {"bar": "baz2"}) self.assertTrue(ok) - ok = sender.emit('foo3', {'bar': 'baz3'}) + ok = sender.emit("foo3", {"bar": "baz3"}) self.assertTrue(ok) - ok = sender.emit('foo4', {'bar': 'baz4'}) + ok = sender.emit("foo4", {"bar": "baz4"}) self.assertTrue(ok) - ok = sender.emit('foo5', {'bar': 'baz5'}) + ok = sender.emit("foo5", {"bar": "baz5"}) self.assertTrue(ok) data = self.get_data() @@ -319,26 +324,25 @@ def test_simple(self): print(data) eq(5, len(data)) eq(3, len(data[0])) - eq('test.foo1', data[0][0]) - eq({'bar': 'baz1'}, data[0][2]) + eq("test.foo1", data[0][0]) + eq({"bar": "baz1"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) eq(3, len(data[2])) - eq('test.foo3', data[2][0]) - eq({'bar': 'baz3'}, data[2][2]) + eq("test.foo3", data[2][0]) + eq({"bar": "baz3"}, data[2][2]) class TestSenderUnlimitedSize(unittest.TestCase): Q_SIZE = 3 def setUp(self): - super(TestSenderUnlimitedSize, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.asyncsender.FluentSender(tag='test', - port=self._server.port, - queue_timeout=0.04, - queue_maxsize=0) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.asyncsender.FluentSender( + tag="test", port=self._server.port, queue_timeout=0.04, queue_maxsize=0 + ) def tearDown(self): try: @@ -357,7 +361,7 @@ def test_simple(self): NUM = 1000 for i in range(1, NUM + 1): - ok = sender.emit("foo{}".format(i), {'bar': "baz{}".format(i)}) + ok = sender.emit(f"foo{i}", {"bar": f"baz{i}"}) self.assertTrue(ok) data = self.get_data() @@ -365,12 +369,12 @@ def test_simple(self): eq(NUM, len(data)) el = data[0] eq(3, len(el)) - eq('test.foo1', el[0]) - eq({'bar': 'baz1'}, el[2]) + eq("test.foo1", el[0]) + eq({"bar": "baz1"}, el[2]) self.assertTrue(el[1]) self.assertTrue(isinstance(el[1], int)) el = data[NUM - 1] eq(3, len(el)) - eq("test.foo{}".format(NUM), el[0]) - eq({'bar': "baz{}".format(NUM)}, el[2]) + eq(f"test.foo{NUM}", el[0]) + eq({"bar": f"baz{NUM}"}, el[2]) diff --git a/tests/test_event.py b/tests/test_event.py index 0f47ffa..6e2f0a0 100644 --- a/tests/test_event.py +++ b/tests/test_event.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - import unittest from fluent import event, sender @@ -12,43 +10,37 @@ class TestException(BaseException): class TestEvent(unittest.TestCase): def setUp(self): - self._server = mockserver.MockRecvServer('localhost') - sender.setup('app', port=self._server.port) + self._server = mockserver.MockRecvServer("localhost") + sender.setup("app", port=self._server.port) def tearDown(self): from fluent.sender import _set_global_sender + sender.close() _set_global_sender(None) - + def test_logging(self): # XXX: This tests succeeds even if the fluentd connection failed # send event with tag app.follow - event.Event('follow', { - 'from': 'userA', - 'to': 'userB' - }) + event.Event("follow", {"from": "userA", "to": "userB"}) def test_logging_with_timestamp(self): # XXX: This tests succeeds even if the fluentd connection failed # send event with tag app.follow, with timestamp - event.Event('follow', { - 'from': 'userA', - 'to': 'userB' - }, time=int(0)) + event.Event("follow", {"from": "userA", "to": "userB"}, time=int(0)) def test_no_last_error_on_successful_event(self): global_sender = sender.get_global_sender() - event.Event('unfollow', { - 'from': 'userC', - 'to': 'userD' - }) + event.Event("unfollow", {"from": "userC", "to": "userD"}) self.assertEqual(global_sender.last_error, None) sender.close() - @unittest.skip("This test failed with 'TypeError: catching classes that do not " - "inherit from BaseException is not allowed' so skipped") + @unittest.skip( + "This test failed with 'TypeError: catching classes that do not " + "inherit from BaseException is not allowed' so skipped" + ) def test_connect_exception_during_event_send(self, mock_socket): # Make the socket.socket().connect() call raise a custom exception mock_connect = mock_socket.socket.return_value.connect @@ -59,10 +51,7 @@ def test_connect_exception_during_event_send(self, mock_socket): global_sender = sender.get_global_sender() global_sender._close() - event.Event('unfollow', { - 'from': 'userE', - 'to': 'userF' - }) + event.Event("unfollow", {"from": "userE", "to": "userF"}) ex = global_sender.last_error self.assertEqual(ex.args, EXCEPTION_MSG) diff --git a/tests/test_handler.py b/tests/test_handler.py index 2ef0695..711d282 100644 --- a/tests/test_handler.py +++ b/tests/test_handler.py @@ -1,7 +1,4 @@ -#  -*- coding: utf-8 -*- - import logging -import sys import unittest import fluent.handler @@ -16,8 +13,8 @@ def get_logger(name, level=logging.INFO): class TestHandler(unittest.TestCase): def setUp(self): - super(TestHandler, self).setUp() - self._server = mockserver.MockRecvServer('localhost') + super().setUp() + self._server = mockserver.MockRecvServer("localhost") self._port = self._server.port def tearDown(self): @@ -27,17 +24,14 @@ def get_data(self): return self._server.get_received() def test_simple(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info({ - 'from': 'userA', - 'to': 'userB' - }) + log.info({"from": "userA", "to": "userB"}) log.removeHandler(handler) @@ -45,206 +39,199 @@ def test_simple(self): eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('app.follow', data[0][0]) - eq('userA', data[0][2]['from']) - eq('userB', data[0][2]['to']) + eq("app.follow", data[0][0]) + eq("userA", data[0][2]["from"]) + eq("userB", data[0][2]["to"]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_custom_fmt(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'lineno': '%(lineno)d', - 'emitted_at': '%(asctime)s', - }) + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "%(name)s", + "lineno": "%(lineno)d", + "emitted_at": "%(asctime)s", + } + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) def test_exclude_attrs(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(exclude_attrs=[]) - ) + log = get_logger("fluent.test") + handler.setFormatter(fluent.handler.FluentRecordFormatter(exclude_attrs=[])) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) def test_exclude_attrs_with_exclusion(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( fluent.handler.FluentRecordFormatter(exclude_attrs=["funcName"]) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) def test_exclude_attrs_with_extra(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(exclude_attrs=[]) - ) + log = get_logger("fluent.test") + handler.setFormatter(fluent.handler.FluentRecordFormatter(exclude_attrs=[])) log.addHandler(handler) log.info("Test with value '%s'", "test value", extra={"x": 1234}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertEqual("Test with value 'test value'", data[0][2]['message']) - self.assertEqual(1234, data[0][2]['x']) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertEqual("Test with value 'test value'", data[0][2]["message"]) + self.assertEqual(1234, data[0][2]["x"]) def test_format_dynamic(self): def formatter(record): - return { - "message": record.message, - "x": record.x, - "custom_value": 1 - } + return {"message": record.message, "x": record.x, "custom_value": 1} formatter.usesTime = lambda: True - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') - handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt=formatter) - ) + log = get_logger("fluent.test") + handler.setFormatter(fluent.handler.FluentRecordFormatter(fmt=formatter)) log.addHandler(handler) log.info("Test with value '%s'", "test value", extra={"x": 1234}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('x' in data[0][2]) - self.assertEqual(1234, data[0][2]['x']) - self.assertEqual(1, data[0][2]['custom_value']) + self.assertTrue("x" in data[0][2]) + self.assertEqual(1234, data[0][2]["x"]) + self.assertEqual(1, data[0][2]["custom_value"]) - @unittest.skipUnless(sys.version_info[0:2] >= (3, 2), 'supported with Python 3.2 or above') def test_custom_fmt_with_format_style(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '{name}', - 'lineno': '{lineno}', - 'emitted_at': '{asctime}', - }, style='{') + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "{name}", + "lineno": "{lineno}", + "emitted_at": "{asctime}", + }, + style="{", + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) - @unittest.skipUnless(sys.version_info[0:2] >= (3, 2), 'supported with Python 3.2 or above') def test_custom_fmt_with_template_style(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '${name}', - 'lineno': '${lineno}', - 'emitted_at': '${asctime}', - }, style='$') + fluent.handler.FluentRecordFormatter( + fmt={ + "name": "${name}", + "lineno": "${lineno}", + "emitted_at": "${asctime}", + }, + style="$", + ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('lineno' in data[0][2]) - self.assertTrue('emitted_at' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("lineno" in data[0][2]) + self.assertTrue("emitted_at" in data[0][2]) def test_custom_field_raise_exception(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' - }) + fluent.handler.FluentRecordFormatter( + fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"} + ) ) log.addHandler(handler) with self.assertRaises(KeyError): - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) def test_custom_field_fill_missing_fmt_key_is_true(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter( - fluent.handler.FluentRecordFormatter(fmt={ - 'name': '%(name)s', - 'custom_field': '%(custom_field)s' - }, - fill_missing_fmt_key=True + fluent.handler.FluentRecordFormatter( + fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"}, + fill_missing_fmt_key=True, ) ) log.addHandler(handler) - log.info({'sample': 'value'}) + log.info({"sample": "value"}) log.removeHandler(handler) data = self.get_data() - self.assertTrue('name' in data[0][2]) - self.assertEqual('fluent.test', data[0][2]['name']) - self.assertTrue('custom_field' in data[0][2]) + self.assertTrue("name" in data[0][2]) + self.assertEqual("fluent.test", data[0][2]["name"]) + self.assertTrue("custom_field" in data[0][2]) # field defaults to none if not in log record - self.assertIsNone(data[0][2]['custom_field']) + self.assertIsNone(data[0][2]["custom_field"]) def test_json_encoded_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) @@ -253,15 +240,17 @@ def test_json_encoded_message(self): log.removeHandler(handler) data = self.get_data() - self.assertTrue('key' in data[0][2]) - self.assertEqual('hello world!', data[0][2]['key']) + self.assertTrue("key" in data[0][2]) + self.assertEqual("hello world!", data[0][2]["key"]) def test_json_encoded_message_without_json(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter(format_json=False)) + log = get_logger("fluent.test") + handler.setFormatter( + fluent.handler.FluentRecordFormatter(format_json=False) + ) log.addHandler(handler) log.info('{"key": "hello world!", "param": "value"}') @@ -269,71 +258,73 @@ def test_json_encoded_message_without_json(self): log.removeHandler(handler) data = self.get_data() - self.assertTrue('key' not in data[0][2]) - self.assertEqual('{"key": "hello world!", "param": "value"}', data[0][2]['message']) + self.assertTrue("key" not in data[0][2]) + self.assertEqual( + '{"key": "hello world!", "param": "value"}', data[0][2]["message"] + ) def test_unstructured_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info('hello %s', 'world') + log.info("hello %s", "world") log.removeHandler(handler) data = self.get_data() - self.assertTrue('message' in data[0][2]) - self.assertEqual('hello world', data[0][2]['message']) + self.assertTrue("message" in data[0][2]) + self.assertEqual("hello world", data[0][2]["message"]) def test_unstructured_formatted_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info('hello world, %s', 'you!') + log.info("hello world, %s", "you!") log.removeHandler(handler) data = self.get_data() - self.assertTrue('message' in data[0][2]) - self.assertEqual('hello world, you!', data[0][2]['message']) + self.assertTrue("message" in data[0][2]) + self.assertEqual("hello world, you!", data[0][2]["message"]) def test_number_string_simple_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) log.info("1") log.removeHandler(handler) data = self.get_data() - self.assertTrue('message' in data[0][2]) + self.assertTrue("message" in data[0][2]) def test_non_string_simple_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) log.info(42) log.removeHandler(handler) data = self.get_data() - self.assertTrue('message' in data[0][2]) + self.assertTrue("message" in data[0][2]) def test_non_string_dict_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - log.info({42: 'root'}) + log.info({42: "root"}) log.removeHandler(handler) data = self.get_data() @@ -341,21 +332,21 @@ def test_non_string_dict_message(self): self.assertFalse(42 in data[0][2]) def test_exception_message(self): - handler = fluent.handler.FluentHandler('app.follow', port=self._port) + handler = fluent.handler.FluentHandler("app.follow", port=self._port) with handler: - log = get_logger('fluent.test') + log = get_logger("fluent.test") handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) try: - raise Exception('sample exception') + raise Exception("sample exception") except Exception: - log.exception('it failed') + log.exception("it failed") log.removeHandler(handler) data = self.get_data() - message = data[0][2]['message'] + message = data[0][2]["message"] # Includes the logged message, as well as the stack trace. - self.assertTrue('it failed' in message) + self.assertTrue("it failed" in message) self.assertTrue('tests/test_handler.py", line' in message) - self.assertTrue('Exception: sample exception' in message) + self.assertTrue("Exception: sample exception" in message) diff --git a/tests/test_sender.py b/tests/test_sender.py index 1c0fbe9..e2c5710 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -1,7 +1,3 @@ -# -*- coding: utf-8 -*- - -from __future__ import print_function - import errno import socket import sys @@ -18,6 +14,7 @@ class TestSetup(unittest.TestCase): def tearDown(self): from fluent.sender import _set_global_sender + _set_global_sender(None) def test_no_kwargs(self): @@ -47,10 +44,9 @@ def test_tolerant(self): class TestSender(unittest.TestCase): def setUp(self): - super(TestSender, self).setUp() - self._server = mockserver.MockRecvServer('localhost') - self._sender = fluent.sender.FluentSender(tag='test', - port=self._server.port) + super().setUp() + self._server = mockserver.MockRecvServer("localhost") + self._sender = fluent.sender.FluentSender(tag="test", port=self._server.port) def tearDown(self): try: @@ -63,40 +59,40 @@ def get_data(self): def test_simple(self): sender = self._sender - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) sender._close() data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_decorator_simple(self): with self._sender as sender: - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(data[0][1]) self.assertTrue(isinstance(data[0][1], int)) def test_nanosecond(self): sender = self._sender sender.nanosecond_precision = True - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) sender._close() data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(isinstance(data[0][1], msgpack.ExtType)) eq(data[0][1].code, 0) @@ -104,21 +100,21 @@ def test_nanosecond_coerce_float(self): time = 1490061367.8616468906402588 sender = self._sender sender.nanosecond_precision = True - sender.emit_with_time('foo', time, {'bar': 'baz'}) + sender.emit_with_time("foo", time, {"bar": "baz"}) sender._close() data = self.get_data() eq = self.assertEqual eq(1, len(data)) eq(3, len(data[0])) - eq('test.foo', data[0][0]) - eq({'bar': 'baz'}, data[0][2]) + eq("test.foo", data[0][0]) + eq({"bar": "baz"}, data[0][2]) self.assertTrue(isinstance(data[0][1], msgpack.ExtType)) eq(data[0][1].code, 0) - eq(data[0][1].data, b'X\xd0\x8873[\xb0*') + eq(data[0][1].data, b"X\xd0\x8873[\xb0*") def test_no_last_error_on_successful_emit(self): sender = self._sender - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) sender._close() self.assertEqual(sender.last_error, None) @@ -159,7 +155,7 @@ def test_emit_after_close(self): def test_verbose(self): with self._sender as sender: sender.verbose = True - sender.emit('foo', {'bar': 'baz'}) + sender.emit("foo", {"bar": "baz"}) # No assertions here, just making sure there are no exceptions def test_failure_to_connect(self): @@ -222,13 +218,14 @@ def __init__(self): self.to = 123 self.send_side_effects = [3, 0, 9] self.send_idx = 0 - self.recv_side_effects = [socket.error(errno.EWOULDBLOCK, "Blah"), - b"this data is going to be ignored", - b"", - socket.error(errno.EWOULDBLOCK, "Blah"), - socket.error(errno.EWOULDBLOCK, "Blah"), - socket.error(errno.EACCES, "This error will never happen"), - ] + self.recv_side_effects = [ + socket.error(errno.EWOULDBLOCK, "Blah"), + b"this data is going to be ignored", + b"", + socket.error(errno.EWOULDBLOCK, "Blah"), + socket.error(errno.EWOULDBLOCK, "Blah"), + socket.error(errno.EACCES, "This error will never happen"), + ] self.recv_idx = 0 def send(self, bytes_): @@ -296,16 +293,15 @@ def test_unix_socket(self): self.tearDown() tmp_dir = mkdtemp() try: - server_file = 'unix://' + tmp_dir + "/tmp.unix" + server_file = "unix://" + tmp_dir + "/tmp.unix" self._server = mockserver.MockRecvServer(server_file) - self._sender = fluent.sender.FluentSender(tag='test', - host=server_file) + self._sender = fluent.sender.FluentSender(tag="test", host=server_file) with self._sender as sender: - self.assertTrue(sender.emit('foo', {'bar': 'baz'})) + self.assertTrue(sender.emit("foo", {"bar": "baz"})) data = self._server.get_received() self.assertEqual(len(data), 1) - self.assertEqual(data[0][2], {'bar': 'baz'}) + self.assertEqual(data[0][2], {"bar": "baz"}) finally: rmtree(tmp_dir, True) @@ -315,4 +311,4 @@ class TestEventTime(unittest.TestCase): def test_event_time(self): time = fluent.sender.EventTime(1490061367.8616468906402588) self.assertEqual(time.code, 0) - self.assertEqual(time.data, b'X\xd0\x8873[\xb0*') + self.assertEqual(time.data, b"X\xd0\x8873[\xb0*")