Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor stop_host method to use topology #292

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 13 additions & 9 deletions packaging/bdist_prestoadmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import os
import re
import sys
import urllib
import requests
from distutils import log as logger
from distutils.dir_util import remove_tree

Expand Down Expand Up @@ -112,19 +112,23 @@ def package_dependencies(self, build_dir):
# the internal PyPI. During the build we download wheels for both
# interpreters compiled on Centos 6.6.
for wheel in self.NATIVE_WHEELS:
pypi_pycrypto_url = 'http://bdch-ftp.td.teradata.com:8082/' + \
'packages/' + wheel
wheel_url = 'http://bdch-ftp.td.teradata.com:8082/' + \
'packages/' + wheel
if sys.version.startswith('2.6'):
alternate_interpreter_version = 'cp27' # fetch 2.7 from PyPI
else:
alternate_interpreter_version = 'cp26'

urllib.urlretrieve(
pypi_pycrypto_url.format(alternate_interpreter_version),
os.path.join(
thirdparty_dir,
wheel.format(alternate_interpreter_version))
)
target_path = os.path.join(thirdparty_dir, wheel.format(alternate_interpreter_version))
with open(target_path, 'wb') as handle:
response = requests.get(wheel_url.format(alternate_interpreter_version), stream=True)

if not response.ok:
# Something went wrong
raise Exception

for block in response.iter_content(1024):
handle.write(block)
# Thank you for visiting HackLand!

pip.main(['install',
Expand Down
26 changes: 12 additions & 14 deletions tests/base_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import sys

from tests.product import determine_jdk_directory
from tests.product.config_dir_utils import get_config_file_path


class BaseCluster(object):
Expand Down Expand Up @@ -84,20 +85,17 @@ def get_ip_address_dict(self):
"""
pass

@abc.abstractmethod
def stop_host(self, host_name):
"""Stops a host. Paradoxically, start_host doesn't seem to be required
for the product tests to run successfully."""
pass

@abc.abstractmethod
def get_down_hostname(self, host_name):
"""This is part of the magic involved in stopping a host. If you're
rolling a new implementation, you should dig more deeply into the
existing implementations, figure out how it all works, and update this
comment.
"""
pass
@staticmethod
def get_down_hostname():
"""Fake hostname used to mimic stopping a host by uploading a topology
file pointing to an unresolvable host"""
return 'dummyhost'

@staticmethod
def get_down_ip():
"""This method serves the same purpose as get_down_hostname but
returns the corresponding fake IP"""
return '1.0.0.0'

@abc.abstractmethod
def postinstall(self, installer):
Expand Down
32 changes: 2 additions & 30 deletions tests/configurable_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import yaml
from prestoadmin import main_dir
from tests.base_cluster import BaseCluster
from tests.product.config_dir_utils import get_config_file_path, get_install_directory, get_config_directory
from tests.product.config_dir_utils import get_install_directory, get_config_directory

CONFIG_FILE_GLOB = r'*.yaml'
DIST_DIR = os.path.join(main_dir, 'tmp/installer')
Expand Down Expand Up @@ -121,35 +121,6 @@ def tear_down(self):
mount_dir=self.mount_dir)
self.run_script_on_host(script, host)

def stop_host(self, host_name):
if host_name not in self.all_hosts():
raise Exception('Must specify external hostname to stop_host')

# Change the topology to something that doesn't exist
ips = self.get_ip_address_dict()
down_hostname = self.get_down_hostname(host_name)
self.exec_cmd_on_host(
self.master,
'sed -i s/%s/%s/g %s' % (host_name, down_hostname, get_config_file_path())
)
self.exec_cmd_on_host(
self.master,
'sed -i s/%s/%s/g %s' % (ips[host_name], down_hostname, get_config_file_path())
)
index = self.all_hosts().index(host_name)
self.exec_cmd_on_host(
self.master,
'sed -i s/%s/%s/g %s' % (self.all_internal_hosts()[index], down_hostname, get_config_file_path())
)

if index >= len(self.internal_slaves):
self.internal_master = down_hostname
else:
self.internal_slaves[index] = down_hostname

def get_down_hostname(self, host_name):
return '1.0.0.0'

def exec_cmd_on_host(self, host, cmd, user=None, raise_error=True,
tty=False, invoke_sudo=False):
# If the corresponding variable is set, invoke command with sudo since EMR's login
Expand Down Expand Up @@ -248,6 +219,7 @@ def get_ip_address_dict(self):
for internal_host in self.all_internal_hosts():
ip_addresses[internal_host] = self._get_ip_from_hosts_file(
hosts_file, internal_host)
ip_addresses[self.get_down_hostname()] = self.get_down_ip()
return ip_addresses

@staticmethod
Expand Down
14 changes: 3 additions & 11 deletions tests/docker_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,14 @@ def _tear_down_container(self, container_name):
raise

try:
self.stop_host(container_name)
self.client.stop(container_name)
self.client.wait(container_name)
self.client.remove_container(container_name, v=True, force=True)
except APIError as e:
# container does not exist
if e.response.status_code != 404:
raise

def stop_host(self, container_name):
self.client.stop(container_name)
self.client.wait(container_name)

def start_host(self, container_name):
self.client.start(container_name)

def get_down_hostname(self, host_name):
return host_name

def _remove_host_mount_dirs(self):
for container_name in self.all_hosts():
try:
Expand Down Expand Up @@ -430,6 +421,7 @@ def get_ip_address_dict(self):
ip_addresses[host] = inspect['NetworkSettings']['IPAddress']
ip_addresses[internal_host] = \
inspect['NetworkSettings']['IPAddress']
ip_addresses[self.get_down_hostname()] = self.get_down_ip()
return ip_addresses

def _post_presto_install(self):
Expand Down
13 changes: 7 additions & 6 deletions tests/product/base_product_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ class BaseProductTestCase(BaseTestCase):
r'socket error connecting to host ' \
r'%(host)s on port 22: Network is unreachable ' \
r'\(tried 1 time\)\n\nUnderlying ' \
r'exception:\n Network is unreachable\n'
r'exception:\n Network is unreachable\n' \
r'|Name lookup failed for %(host)s'

status_down_node_string = r'(\tLow level socket error connecting to ' \
r'host %(host)s on port 22: No route to host ' \
r'\(tried 1 time\)|\tTimed out trying to ' \
r'connect to %(host)s \(tried 1 time\))' \
r'|\tLow level socket error connecting to ' \
r'host %(host)s on port 22: Network is unreachable ' \
r'\(tried 1 time\)'
r'\(tried 1 time\)|\tName lookup failed for %(host)s'

len_down_node_error = 6

Expand Down Expand Up @@ -494,12 +495,12 @@ def retry(method_to_check, retry_timeout=RETRY_TIMEOUT,
return Retrying(stop_max_delay=retry_timeout * 1000,
wait_fixed=retry_interval * 1000).call(method_to_check)

def down_node_connection_error(self, host):
hostname = self.cluster.get_down_hostname(host)
def down_node_connection_error(self):
hostname = self.cluster.get_down_hostname()
return self.down_node_connection_string % {'host': hostname}

def status_node_connection_error(self, host):
hostname = self.cluster.get_down_hostname(host)
def status_node_connection_error(self):
hostname = self.cluster.get_down_hostname()
return self.status_down_node_string % {'host': hostname}


Expand Down
1 change: 1 addition & 0 deletions tests/product/prestoadmin_installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def _build_installer_in_docker(self, cluster, online_installer=None,
installer_container.run_script_on_host(
'set -e\n'
'pip install --upgrade pip==7.1.2\n'
'pip install requests\n'
'pip install --upgrade wheel==0.23.0\n'
'pip install --upgrade setuptools==20.1.1\n'
'mv %s/presto-admin ~/\n'
Expand Down
24 changes: 15 additions & 9 deletions tests/product/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,35 +172,41 @@ def fatal_error(self, error):
def test_catalog_add_lost_host(self):
installer = StandalonePrestoInstaller(self)
self.setup_cluster(NoHadoopBareImageProvider, STANDALONE_PA_CLUSTER)
self.upload_topology()
installer.install()
self.run_prestoadmin('catalog remove tpch')

self.cluster.stop_host(
self.cluster.slaves[0])
good_hosts = [self.cluster.internal_master,
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]
topology = {"coordinator": self.cluster.internal_master,
"workers": [self.cluster.get_down_hostname(),
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]}
self.upload_topology(topology)
self.cluster.write_content_to_host(
'connector.name=tpch',
os.path.join(get_catalog_directory(), 'tpch.properties'),
self.cluster.master
)

output = self.run_prestoadmin('catalog add tpch', raise_error=False)
for host in self.cluster.all_internal_hosts():

hosts = good_hosts + [self.cluster.get_down_hostname()]
for host in hosts:
deploying_message = 'Deploying tpch.properties catalog configurations on: %s'
self.assertTrue(deploying_message % host in output,
'expected %s \n actual %s'
% (deploying_message % host, output))
self.assertRegexpMatches(
output,
self.down_node_connection_error(self.cluster.internal_slaves[0])
self.down_node_connection_error()
)
self.assertEqual(len(output.splitlines()),
len(self.cluster.all_hosts()) +
self.len_down_node_error)
self.run_prestoadmin('server start', raise_error=False)

for host in [self.cluster.master,
self.cluster.slaves[1],
self.cluster.slaves[2]]:
self.run_prestoadmin('server start', raise_error=False)
for host in good_hosts:
self.assert_has_default_catalog(host)
self._assert_catalogs_loaded([['system'], ['tpch']])

Expand Down
14 changes: 7 additions & 7 deletions tests/product/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,31 +196,31 @@ def test_configuration_deploy_using_dash_x_coord_worker(self):
self.default_workers_test_config_)

def test_lost_coordinator_connection(self):
internal_bad_host = self.cluster.internal_slaves[0]
bad_host = self.cluster.slaves[0]
bad_host = self.cluster.get_down_hostname()
good_hosts = [self.cluster.internal_master,
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]
topology = {'coordinator': internal_bad_host,
topology = {'coordinator': bad_host,
'workers': good_hosts}
self.upload_topology(topology)
self.cluster.stop_host(bad_host)

output = self.run_prestoadmin('configuration deploy',
raise_error=False)
self.assertRegexpMatches(
output,
self.down_node_connection_error(internal_bad_host)
self.down_node_connection_error()
)
for host in self.cluster.all_internal_hosts():
for host in good_hosts:
self.assertTrue('Deploying configuration on: %s' % host in output)
self.assertTrue('Deploying configuration on: %s' % bad_host in output)
expected_size = self.len_down_node_error + len(self.cluster.all_hosts())
self.assertEqual(len(output.splitlines()), expected_size)

output = self.run_prestoadmin('configuration show config',
raise_error=False)
self.assertRegexpMatches(
output,
self.down_node_connection_error(internal_bad_host)
self.down_node_connection_error()
)
with open(os.path.join(LOCAL_RESOURCES_DIR,
'configuration_show_down_node.txt'), 'r') as f:
Expand Down
50 changes: 27 additions & 23 deletions tests/product/test_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,55 +81,59 @@ def test_server_restart_nothing_started(self):
def test_start_coordinator_down(self):
installer = StandalonePrestoInstaller(self)
self.setup_cluster(NoHadoopBareImageProvider, STANDALONE_PA_CLUSTER)
topology = {"coordinator": "slave1", "workers":
["master", "slave2", "slave3"]}
good_hosts = [self.cluster.internal_master,
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]
topology = {"coordinator": self.cluster.internal_slaves[0],
"workers": good_hosts}
self.upload_topology(topology=topology)
installer.install(coordinator='slave1')
self.assert_start_coordinator_down(
self.cluster.slaves[0],
self.cluster.internal_slaves[0])
installer.install(coordinator=self.cluster.internal_slaves[0])

self.assert_start_coordinator_down(good_hosts)

def test_start_worker_down(self):
self.setup_cluster(NoHadoopBareImageProvider, STANDALONE_PRESTO_CLUSTER)
self.assert_start_worker_down(
self.cluster.slaves[0],
self.cluster.internal_slaves[0])
good_hosts = [self.cluster.internal_master,
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]
topology = {"coordinator": self.cluster.internal_master,
"workers": [self.cluster.get_down_hostname(),
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]}
self.upload_topology(topology=topology)

self.assert_start_worker_down(good_hosts)

def assert_start_coordinator_down(self, coordinator, coordinator_internal):
self.cluster.stop_host(coordinator)
alive_hosts = self.cluster.all_internal_hosts()[:]
alive_hosts.remove(self.cluster.get_down_hostname(coordinator_internal))
def assert_start_coordinator_down(self, good_hosts):
self.upload_topology({'coordinator': self.cluster.get_down_hostname(),
'workers': good_hosts})

# test server start
start_output = self.run_prestoadmin('server start', raise_error=False)

# when the coordinator is down, you can't confirm that the server is started
# on any of the nodes
expected_start = self.expected_start(failed_hosts=alive_hosts)
for host in alive_hosts:
expected_start = self.expected_start(failed_hosts=good_hosts)
for host in good_hosts:
expected_start.append(self.expected_no_status_message(host))
expected_start.append(self.down_node_connection_error(coordinator_internal))
expected_start.append(self.down_node_connection_error())
for message in expected_start:
self.assertRegexpMatches(start_output, message, 'expected %s \n '
'actual %s' % (message, start_output))

process_per_host = self.get_process_per_host(start_output.splitlines())
self.assert_started(process_per_host)

def assert_start_worker_down(self, down_node, down_internal_node):
self.cluster.stop_host(down_node)
alive_hosts = self.cluster.all_internal_hosts()[:]
alive_hosts.remove(self.cluster.get_down_hostname(down_internal_node))

def assert_start_worker_down(self, good_hosts):
# test server start
start_output = self.run_prestoadmin('server start', raise_error=False)

self.assertRegexpMatches(
start_output,
self.down_node_connection_error(down_internal_node)
self.down_node_connection_error()
)

expected_start = self.expected_start(start_success=alive_hosts)
expected_start = self.expected_start(start_success=good_hosts)
for message in expected_start:
self.assertRegexpMatches(start_output, message, 'expected %s \n '
'actual %s' % (message, start_output))
Expand Down