Skip to content

Commit

Permalink
Remove the stop_host method
Browse files Browse the repository at this point in the history
- Stopping a host is now done implicitly by uploading
  a topology that points to a host that doesn't exist.
  This was done because stopping a container caused nasty
  race conditions.
  • Loading branch information
petroav committed Jan 6, 2017
1 parent eee320a commit 0ab3512
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 153 deletions.
26 changes: 12 additions & 14 deletions tests/base_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import sys

from tests.product import determine_jdk_directory
from tests.product.config_dir_utils import get_config_file_path


class BaseCluster(object):
Expand Down Expand Up @@ -84,20 +85,17 @@ def get_ip_address_dict(self):
"""
pass

@abc.abstractmethod
def stop_host(self, host_name):
"""Stops a host. Paradoxically, start_host doesn't seem to be required
for the product tests to run successfully."""
pass

@abc.abstractmethod
def get_down_hostname(self, host_name):
"""This is part of the magic involved in stopping a host. If you're
rolling a new implementation, you should dig more deeply into the
existing implementations, figure out how it all works, and update this
comment.
"""
pass
@staticmethod
def get_down_hostname():
"""Fake hostname used to mimic stopping a host by uploading a topology
file pointing to an unresolvable host"""
return 'dummyhost'

@staticmethod
def get_down_ip():
"""This method serves the same purpose as get_down_hostname but
returns the corresponding fake IP"""
return '1.0.0.0'

@abc.abstractmethod
def postinstall(self, installer):
Expand Down
32 changes: 2 additions & 30 deletions tests/configurable_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import yaml
from prestoadmin import main_dir
from tests.base_cluster import BaseCluster
from tests.product.config_dir_utils import get_config_file_path, get_install_directory, get_config_directory
from tests.product.config_dir_utils import get_install_directory, get_config_directory

CONFIG_FILE_GLOB = r'*.yaml'
DIST_DIR = os.path.join(main_dir, 'tmp/installer')
Expand Down Expand Up @@ -121,35 +121,6 @@ def tear_down(self):
mount_dir=self.mount_dir)
self.run_script_on_host(script, host)

def stop_host(self, host_name):
if host_name not in self.all_hosts():
raise Exception('Must specify external hostname to stop_host')

# Change the topology to something that doesn't exist
ips = self.get_ip_address_dict()
down_hostname = self.get_down_hostname(host_name)
self.exec_cmd_on_host(
self.master,
'sed -i s/%s/%s/g %s' % (host_name, down_hostname, get_config_file_path())
)
self.exec_cmd_on_host(
self.master,
'sed -i s/%s/%s/g %s' % (ips[host_name], down_hostname, get_config_file_path())
)
index = self.all_hosts().index(host_name)
self.exec_cmd_on_host(
self.master,
'sed -i s/%s/%s/g %s' % (self.all_internal_hosts()[index], down_hostname, get_config_file_path())
)

if index >= len(self.internal_slaves):
self.internal_master = down_hostname
else:
self.internal_slaves[index] = down_hostname

def get_down_hostname(self, host_name):
return '1.0.0.0'

def exec_cmd_on_host(self, host, cmd, user=None, raise_error=True,
tty=False, invoke_sudo=False):
# If the corresponding variable is set, invoke command with sudo since EMR's login
Expand Down Expand Up @@ -248,6 +219,7 @@ def get_ip_address_dict(self):
for internal_host in self.all_internal_hosts():
ip_addresses[internal_host] = self._get_ip_from_hosts_file(
hosts_file, internal_host)
ip_addresses[self.get_down_hostname()] = self.get_down_ip()
return ip_addresses

@staticmethod
Expand Down
14 changes: 3 additions & 11 deletions tests/docker_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,14 @@ def _tear_down_container(self, container_name):
raise

try:
self.stop_host(container_name)
self.client.stop(container_name)
self.client.wait(container_name)
self.client.remove_container(container_name, v=True, force=True)
except APIError as e:
# container does not exist
if e.response.status_code != 404:
raise

def stop_host(self, container_name):
self.client.stop(container_name)
self.client.wait(container_name)

def start_host(self, container_name):
self.client.start(container_name)

def get_down_hostname(self, host_name):
return host_name

def _remove_host_mount_dirs(self):
for container_name in self.all_hosts():
try:
Expand Down Expand Up @@ -430,6 +421,7 @@ def get_ip_address_dict(self):
ip_addresses[host] = inspect['NetworkSettings']['IPAddress']
ip_addresses[internal_host] = \
inspect['NetworkSettings']['IPAddress']
ip_addresses[self.get_down_hostname()] = self.get_down_ip()
return ip_addresses

def _post_presto_install(self):
Expand Down
13 changes: 7 additions & 6 deletions tests/product/base_product_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ class BaseProductTestCase(BaseTestCase):
r'socket error connecting to host ' \
r'%(host)s on port 22: Network is unreachable ' \
r'\(tried 1 time\)\n\nUnderlying ' \
r'exception:\n Network is unreachable\n'
r'exception:\n Network is unreachable\n' \
r'|Name lookup failed for %(host)s'

status_down_node_string = r'(\tLow level socket error connecting to ' \
r'host %(host)s on port 22: No route to host ' \
r'\(tried 1 time\)|\tTimed out trying to ' \
r'connect to %(host)s \(tried 1 time\))' \
r'|\tLow level socket error connecting to ' \
r'host %(host)s on port 22: Network is unreachable ' \
r'\(tried 1 time\)'
r'\(tried 1 time\)|\tName lookup failed for %(host)s'

len_down_node_error = 6

Expand Down Expand Up @@ -494,12 +495,12 @@ def retry(method_to_check, retry_timeout=RETRY_TIMEOUT,
return Retrying(stop_max_delay=retry_timeout * 1000,
wait_fixed=retry_interval * 1000).call(method_to_check)

def down_node_connection_error(self, host):
hostname = self.cluster.get_down_hostname(host)
def down_node_connection_error(self):
hostname = self.cluster.get_down_hostname()
return self.down_node_connection_string % {'host': hostname}

def status_node_connection_error(self, host):
hostname = self.cluster.get_down_hostname(host)
def status_node_connection_error(self):
hostname = self.cluster.get_down_hostname()
return self.status_down_node_string % {'host': hostname}


Expand Down
24 changes: 15 additions & 9 deletions tests/product/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,35 +172,41 @@ def fatal_error(self, error):
def test_catalog_add_lost_host(self):
installer = StandalonePrestoInstaller(self)
self.setup_cluster(NoHadoopBareImageProvider, STANDALONE_PA_CLUSTER)
self.upload_topology()
installer.install()
self.run_prestoadmin('catalog remove tpch')

self.cluster.stop_host(
self.cluster.slaves[0])
good_hosts = [self.cluster.internal_master,
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]
topology = {"coordinator": self.cluster.internal_master,
"workers": [self.cluster.get_down_hostname(),
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]}
self.upload_topology(topology)
self.cluster.write_content_to_host(
'connector.name=tpch',
os.path.join(get_catalog_directory(), 'tpch.properties'),
self.cluster.master
)

output = self.run_prestoadmin('catalog add tpch', raise_error=False)
for host in self.cluster.all_internal_hosts():

hosts = good_hosts + [self.cluster.get_down_hostname()]
for host in hosts:
deploying_message = 'Deploying tpch.properties catalog configurations on: %s'
self.assertTrue(deploying_message % host in output,
'expected %s \n actual %s'
% (deploying_message % host, output))
self.assertRegexpMatches(
output,
self.down_node_connection_error(self.cluster.internal_slaves[0])
self.down_node_connection_error()
)
self.assertEqual(len(output.splitlines()),
len(self.cluster.all_hosts()) +
self.len_down_node_error)
self.run_prestoadmin('server start', raise_error=False)

for host in [self.cluster.master,
self.cluster.slaves[1],
self.cluster.slaves[2]]:
self.run_prestoadmin('server start', raise_error=False)
for host in good_hosts:
self.assert_has_default_catalog(host)
self._assert_catalogs_loaded([['system'], ['tpch']])

Expand Down
14 changes: 7 additions & 7 deletions tests/product/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,31 +196,31 @@ def test_configuration_deploy_using_dash_x_coord_worker(self):
self.default_workers_test_config_)

def test_lost_coordinator_connection(self):
internal_bad_host = self.cluster.internal_slaves[0]
bad_host = self.cluster.slaves[0]
bad_host = self.cluster.get_down_hostname()
good_hosts = [self.cluster.internal_master,
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]
topology = {'coordinator': internal_bad_host,
topology = {'coordinator': bad_host,
'workers': good_hosts}
self.upload_topology(topology)
self.cluster.stop_host(bad_host)

output = self.run_prestoadmin('configuration deploy',
raise_error=False)
self.assertRegexpMatches(
output,
self.down_node_connection_error(internal_bad_host)
self.down_node_connection_error()
)
for host in self.cluster.all_internal_hosts():
for host in good_hosts:
self.assertTrue('Deploying configuration on: %s' % host in output)
self.assertTrue('Deploying configuration on: %s' % bad_host in output)
expected_size = self.len_down_node_error + len(self.cluster.all_hosts())
self.assertEqual(len(output.splitlines()), expected_size)

output = self.run_prestoadmin('configuration show config',
raise_error=False)
self.assertRegexpMatches(
output,
self.down_node_connection_error(internal_bad_host)
self.down_node_connection_error()
)
with open(os.path.join(LOCAL_RESOURCES_DIR,
'configuration_show_down_node.txt'), 'r') as f:
Expand Down
50 changes: 27 additions & 23 deletions tests/product/test_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,55 +81,59 @@ def test_server_restart_nothing_started(self):
def test_start_coordinator_down(self):
installer = StandalonePrestoInstaller(self)
self.setup_cluster(NoHadoopBareImageProvider, STANDALONE_PA_CLUSTER)
topology = {"coordinator": "slave1", "workers":
["master", "slave2", "slave3"]}
good_hosts = [self.cluster.internal_master,
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]
topology = {"coordinator": self.cluster.internal_slaves[0],
"workers": good_hosts}
self.upload_topology(topology=topology)
installer.install(coordinator='slave1')
self.assert_start_coordinator_down(
self.cluster.slaves[0],
self.cluster.internal_slaves[0])
installer.install(coordinator=self.cluster.internal_slaves[0])

self.assert_start_coordinator_down(good_hosts)

def test_start_worker_down(self):
self.setup_cluster(NoHadoopBareImageProvider, STANDALONE_PRESTO_CLUSTER)
self.assert_start_worker_down(
self.cluster.slaves[0],
self.cluster.internal_slaves[0])
good_hosts = [self.cluster.internal_master,
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]
topology = {"coordinator": self.cluster.internal_master,
"workers": [self.cluster.get_down_hostname(),
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]}
self.upload_topology(topology=topology)

self.assert_start_worker_down(good_hosts)

def assert_start_coordinator_down(self, coordinator, coordinator_internal):
self.cluster.stop_host(coordinator)
alive_hosts = self.cluster.all_internal_hosts()[:]
alive_hosts.remove(self.cluster.get_down_hostname(coordinator_internal))
def assert_start_coordinator_down(self, good_hosts):
self.upload_topology({'coordinator': self.cluster.get_down_hostname(),
'workers': good_hosts})

# test server start
start_output = self.run_prestoadmin('server start', raise_error=False)

# when the coordinator is down, you can't confirm that the server is started
# on any of the nodes
expected_start = self.expected_start(failed_hosts=alive_hosts)
for host in alive_hosts:
expected_start = self.expected_start(failed_hosts=good_hosts)
for host in good_hosts:
expected_start.append(self.expected_no_status_message(host))
expected_start.append(self.down_node_connection_error(coordinator_internal))
expected_start.append(self.down_node_connection_error())
for message in expected_start:
self.assertRegexpMatches(start_output, message, 'expected %s \n '
'actual %s' % (message, start_output))

process_per_host = self.get_process_per_host(start_output.splitlines())
self.assert_started(process_per_host)

def assert_start_worker_down(self, down_node, down_internal_node):
self.cluster.stop_host(down_node)
alive_hosts = self.cluster.all_internal_hosts()[:]
alive_hosts.remove(self.cluster.get_down_hostname(down_internal_node))

def assert_start_worker_down(self, good_hosts):
# test server start
start_output = self.run_prestoadmin('server start', raise_error=False)

self.assertRegexpMatches(
start_output,
self.down_node_connection_error(down_internal_node)
self.down_node_connection_error()
)

expected_start = self.expected_start(start_success=alive_hosts)
expected_start = self.expected_start(start_success=good_hosts)
for message in expected_start:
self.assertRegexpMatches(start_output, message, 'expected %s \n '
'actual %s' % (message, start_output))
Expand Down
20 changes: 7 additions & 13 deletions tests/product/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,41 +57,35 @@ def test_basic_add_jars(self):
self.cluster.exec_cmd_on_host(host, 'rm %s' % temp_jar_location, invoke_sudo=True)

def test_lost_coordinator(self):
internal_bad_host = self.cluster.internal_slaves[0]
bad_host = self.cluster.slaves[0]
good_hosts = [self.cluster.internal_master,
good_hosts = [self.cluster.internal_slaves[0],
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]
topology = {'coordinator': internal_bad_host,
topology = {'coordinator': self.cluster.get_down_hostname(),
'workers': good_hosts}
self.upload_topology(topology)
self.cluster.stop_host(bad_host)
self.deploy_jar_to_master()
output = self.run_prestoadmin(
'plugin add_jar %s hive-cdh5' % TMP_JAR_PATH, raise_error=False)
self.assertRegexpMatches(output, self.down_node_connection_error(
internal_bad_host))
self.assertRegexpMatches(output, self.down_node_connection_error())
self.assertEqual(len(output.splitlines()), self.len_down_node_error)
for host in good_hosts:
self.assert_path_exists(host, STD_REMOTE_PATH)
self.cluster.exec_cmd_on_host(host, 'rm %s' % STD_REMOTE_PATH,
raise_error=False)

def test_lost_worker(self):
internal_bad_host = self.cluster.internal_slaves[0]
bad_host = self.cluster.slaves[0]
good_hosts = [self.cluster.internal_master,
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]
topology = {'coordinator': self.cluster.internal_master,
'workers': self.cluster.internal_slaves}
'workers': [self.cluster.get_down_hostname(),
self.cluster.internal_slaves[1],
self.cluster.internal_slaves[2]]}
self.upload_topology(topology)
self.cluster.stop_host(bad_host)
self.deploy_jar_to_master()
output = self.run_prestoadmin(
'plugin add_jar %s hive-cdh5' % TMP_JAR_PATH, raise_error=False)
self.assertRegexpMatches(output, self.down_node_connection_error(
internal_bad_host))
self.assertRegexpMatches(output, self.down_node_connection_error())
self.assertEqual(len(output.splitlines()), self.len_down_node_error)
for host in good_hosts:
self.assert_path_exists(host, STD_REMOTE_PATH)
Expand Down

0 comments on commit 0ab3512

Please sign in to comment.