-
Notifications
You must be signed in to change notification settings - Fork 119
/
emitting.py
196 lines (142 loc) · 5.18 KB
/
emitting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
from __future__ import print_function
import abc
import collections
import json
import os
import pydoc
import re
import sys
import pager
import pygments
import six
from dcos import constants, errors, util
from pygments.formatters import Terminal256Formatter
from pygments.lexers import JsonLexer
logger = util.get_logger(__name__)
class Emitter(object):
"""Abstract class for emitting events."""
@abc.abstractmethod
def publish(self, event):
"""Publishes an event.
:param event: event to publish
:type event: any
"""
raise NotImplementedError
class FlatEmitter(Emitter):
"""Simple emitter that sends all publish events to the provided handler.
If no handler is provider then use :py:const:`DEFAULT_HANDLER`.
:param handler: event handler to call when publish is called
:type handler: func(event) where event is defined in
:py:func:`FlatEmitter.publish`
"""
def __init__(self, handler=None):
if handler is None:
self._handler = DEFAULT_HANDLER
else:
self._handler = handler
def publish(self, event):
"""Publishes an event.
:param event: event to publish
:type event: any
"""
self._handler(event)
def print_handler(event):
"""Default handler for printing event to stdout.
:param event: event to emit to stdout
:type event: str, dict, list, or dcos.errors.Error
"""
pager_command = os.environ.get(constants.DCOS_PAGER_COMMAND_ENV)
if event is None:
# Do nothing
pass
elif isinstance(event, six.string_types):
_page(event, pager_command)
elif isinstance(event, errors.Error):
print(event.error(), file=sys.stderr)
sys.stderr.flush()
elif (isinstance(event, collections.Mapping) or
isinstance(event, collections.Sequence) or isinstance(event, bool) or
isinstance(event, six.integer_types) or isinstance(event, float)):
# These are all valid JSON types let's treat them different
processed_json = _process_json(event, pager_command)
_page(processed_json, pager_command)
elif isinstance(event, errors.DCOSException):
print(event, file=sys.stderr)
else:
logger.debug('Printing unknown type: %s, %r.', type(event), event)
_page(event, pager_command)
def publish_table(emitter, objs, table_fn, json_):
"""Publishes a json representation of `objs` if `json_` is True,
otherwise, publishes a table representation.
:param emitter: emitter to use for publishing
:type emitter: Emitter
:param objs: objects to print
:type objs: [object]
:param table_fn: function used to generate a PrettyTable from `objs`
:type table_fn: objs -> PrettyTable
:param json_: whether or not to publish a json representation
:type json_: bool
:rtype: None
"""
if json_:
emitter.publish(objs)
else:
table = table_fn(objs)
output = six.text_type(table)
if output:
emitter.publish(output)
def _process_json(event, pager_command):
"""Conditionally highlights the supplied JSON value.
:param event: event to emit to stdout
:type event: str, dict, list, or dcos.errors.Error
:returns: String representation of the supplied JSON value,
possibly syntax-highlighted.
:rtype: str
"""
json_output = json.dumps(event, sort_keys=True, indent=2)
# Strip trailing whitespace
json_output = re.sub(r'\s+$', '', json_output, 0, re.M)
force_colors = False # TODO(CD): Introduce a --colors flag
if not sys.stdout.isatty():
if force_colors:
return _highlight_json(json_output)
else:
return json_output
supports_colors = not util.is_windows_platform()
pager_is_set = pager_command is not None
should_highlight = force_colors or supports_colors and not pager_is_set
if should_highlight:
json_output = _highlight_json(json_output)
return json_output
def _page(output, pager_command=None):
"""Conditionally pipes the supplied output through a pager.
:param output:
:type output: object
:param pager_command:
:type pager_command: str
"""
output = six.text_type(output)
if pager_command is None:
pager_command = 'less -R'
if not sys.stdout.isatty() or util.is_windows_platform():
print(output)
return
num_lines = output.count('\n')
exceeds_tty_height = pager.getheight() - 1 < num_lines
paginate = util.get_config().get("core.pagination", True)
if exceeds_tty_height and paginate:
pydoc.pipepager(output, cmd=pager_command)
else:
print(output)
def _highlight_json(json_value):
"""
:param json_value: JSON value to syntax-highlight
:type json_value: dict, list, number, string, boolean, or None
:returns: A string representation of the supplied JSON value,
highlighted for a terminal that supports ANSI colors.
:rtype: str
"""
return pygments.highlight(
json_value, JsonLexer(), Terminal256Formatter()).strip()
DEFAULT_HANDLER = print_handler
"""The default handler for an emitter: :py:func:`print_handler`."""