Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add throughput summary and fix timmer blocked by end=3 count=0 settings #346

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 26 additions & 11 deletions splunk_eventgen/eventgen_api_server/eventgen_controller_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ def __init__(self, redis_connector, host):
self.interval = 0.001

self.server_responses = {}

def get_blueprint(self):
return self.bp

def __create_blueprint(self):
bp = Blueprint('api', __name__)

def publish_message(job, request_method, body=None, target="all"):
message_uuid = str(uuid.uuid4())
formatted_message = json.dumps({'job': job, 'target': target, 'body': body, 'request_method': request_method, 'message_uuid': message_uuid})
self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, formatted_message)
self.logger.info("Published {}".format(formatted_message))
return message_uuid

def gather_response(target_job, message_uuid, response_number_target=0):
if not response_number_target:
response_number_target = int(self.redis_connector.message_connection.pubsub_numsub(self.redis_connector.servers_channel)[0][1])
Expand All @@ -55,6 +55,8 @@ def gather_response(target_job, message_uuid, response_number_target=0):
if response_message_uuid not in self.server_responses:
self.server_responses[response_message_uuid] = {}
self.server_responses[response_message_uuid][server_response['host']] = server_response['response']
if target_job == 'status':
self.server_responses[message_uuid] = self.calculate_throughput(data=self.server_responses[message_uuid])
return self.server_responses.get(message_uuid, {})

@bp.route('/index', methods=['GET'])
Expand All @@ -65,7 +67,7 @@ def index():
You are running Eventgen Controller.\n'''
host = self.host
return home_page.format(host, self.redis_connector.get_registered_servers())

@bp.route('/status', methods=['GET'], defaults={'target': 'all'})
@bp.route('/status/<string:target>', methods=['GET'])
def http_status(target):
Expand Down Expand Up @@ -96,7 +98,7 @@ def http_bundle(target):
except Exception as e:
self.logger.error(e)
return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)

@bp.route('/setup', methods=['POST'], defaults={'target': 'all'})
@bp.route('/setup/<string:target>', methods=['POST'])
def http_setup(target):
Expand All @@ -106,7 +108,7 @@ def http_setup(target):
except Exception as e:
self.logger.error(e)
return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)

@bp.route('/volume', methods=['GET', 'POST'], defaults={'target': 'all'})
@bp.route('/volume/<string:target>', methods=['GET', 'POST'])
def http_volume(target):
Expand All @@ -117,7 +119,7 @@ def http_volume(target):
except Exception as e:
self.logger.error(e)
return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)

@bp.route('/start', methods=['POST'], defaults={'target': 'all'})
@bp.route('/start/<string:target>', methods=['POST'])
def http_start(target):
Expand All @@ -137,7 +139,7 @@ def http_stop(target):
except Exception as e:
self.logger.error(e)
return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)

@bp.route('/restart', methods=['POST'], defaults={'target': 'all'})
@bp.route('/restart/<string:target>', methods=['POST'])
def http_restart(target):
Expand All @@ -147,7 +149,7 @@ def http_restart(target):
except Exception as e:
self.logger.error(e)
return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)

@bp.route('/reset', methods=['POST'], defaults={'target': 'all'})
@bp.route('/reset/<string:target>', methods=['POST'])
def http_reset(target):
Expand Down Expand Up @@ -176,8 +178,21 @@ def http_healthcheck(target):
except Exception as e:
self.logger.error(e)
return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500)

return bp

def calculate_throughput(self, data):
throughput_summary = {'TOTAL_VOLUME_MB': 0, 'TOTAL_COUNT': 0, 'THROUGHPUT_VOLUME_KB': 0, 'THROUGHPUT_COUNT': 0}
for server_name, server_status in data.items():
if server_name != 'time' and 'THROUGHPUT_STATUS' in server_status:
server_throughput = server_status['THROUGHPUT_STATUS']
throughput_summary['TOTAL_VOLUME_MB'] += server_throughput['TOTAL_VOLUME_MB']
throughput_summary['TOTAL_COUNT'] += server_throughput['TOTAL_COUNT']
throughput_summary['THROUGHPUT_VOLUME_KB'] += server_throughput['THROUGHPUT_VOLUME_KB']
throughput_summary['THROUGHPUT_COUNT'] += server_throughput['THROUGHPUT_COUNT']
data['THROUGHTPUT_SUMMARY'] = throughput_summary
self.logger.debug("throughput summary: {}".format(throughput_summary))
return data

def __make_error_response(self, status, message):
return Response(json.dumps({'message': message}), status=status)
1 change: 1 addition & 0 deletions splunk_eventgen/lib/eventgentimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def real_run(self):
logger.info(
"There is no data to be generated in worker {0} because the count is {1}.".format(
self.sample.config.generatorWorkers, count))
self.executions += 1
else:
# Spawn workers at the beginning of job rather than wait for next interval
logger.info("Starting '%d' generatorWorkers for sample '%s'" %
Expand Down
58 changes: 55 additions & 3 deletions tests/large/test_eventgen_orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def setup_class(cls):
print('creating server')
redis_host = container["Id"][:12]
container = cls.client.create_container(
image=IMAGE_NAME, command="server", environment=["REDIS_HOST={}".format(redis_host)],
image=IMAGE_NAME, command="server", environment=["REDIS_HOST={}".format(redis_host)],
host_config=host_config,
networking_config=networking_config)
cls.client.start(container["Id"])
Expand Down Expand Up @@ -136,7 +136,7 @@ def test_controller_status(self):
current_retry += 1
time.sleep(10)
assert output

def test_controller_conf(self):
r = requests.post("http://127.0.0.1:{}/conf".format(self.controller_eventgen_webport), json=self.test_json)
assert r.status_code == 200
Expand Down Expand Up @@ -212,7 +212,7 @@ def test_controller_set_volume_with_volume_and_target(self):
assert r.status_code == 200
output = json.loads(r.content)
assert output[TestEventgenOrchestration.server_id[:12]]["perDayVolume"] == 20

def test_controller_stop(self):
r = requests.post("http://127.0.0.1:{}/stop".format(self.controller_eventgen_webport))
assert r.status_code == 200
Expand Down Expand Up @@ -255,6 +255,32 @@ def test_server_status(self):
assert output
assert output['EVENTGEN_STATUS'] == 0
assert output['TOTAL_VOLUME'] == 20
# check if eventgen status changed when it finished
config_json = {
"windbag": {
"end": "3",
"count": "0"
}
}
r = requests.post("http://127.0.0.1:{}/conf".format(self.server_eventgen_webport), json=config_json)
assert r.status_code == 200
assert json.loads(r.content) == config_json
r = requests.post("http://127.0.0.1:{}/start".format(self.server_eventgen_webport), timeout=5)
assert r.status_code == 200
timeout = 60
while True:
r = requests.get("http://127.0.0.1:{}/status".format(self.server_eventgen_webport))
assert r.status_code == 200
output = json.loads(r.content)
assert output
if output['EVENTGEN_STATUS'] == 1:
time.sleep(5)
timeout -= 5
if timeout < 0:
break
assert timeout >= 0
assert output['EVENTGEN_STATUS'] == 2


def test_server_get_and_set_conf(self):
r = requests.get("http://127.0.0.1:{}/conf".format(self.server_eventgen_webport))
Expand Down Expand Up @@ -308,3 +334,29 @@ def test_server_get_and_set_volume(self):
assert r.status_code == 200
output = json.loads(r.content)
assert output["perDayVolume"] == 150.0

def test_server_get_throughput(self):
r = requests.put("http://127.0.0.1:{}/conf".format(self.server_eventgen_webport), json={"windbag": { 'count' : 1000, "end":1}, "general": {"outputCounter": True}})
assert r.status_code == 200
assert json.loads(r.content)
r = requests.post("http://127.0.0.1:{}/start".format(self.server_eventgen_webport), timeout=5)
assert r.status_code == 200
timeout = 60
while True:
r = requests.get("http://127.0.0.1:{}/status".format(self.server_eventgen_webport))
assert r.status_code == 200
output = json.loads(r.content)
assert output
if output['EVENTGEN_STATUS'] == 1:
time.sleep(5)
timeout -= 5
if timeout < 0:
break
assert timeout >= 0
r = requests.get("http://127.0.0.1:{}/status".format(self.server_eventgen_webport))
assert r.status_code == 200
output = json.loads(r.content)
assert output
assert output['THROUGHTPUT_SUMMARY']['TOTAL_COUNT'] == 1000
assert output['THROUGHTPUT_SUMMARY']['TOTAL_VOLUME_MB'] > 0