Skip to content

Commit

Permalink
[GOBBLIN-2065] Increase TestMetastoreDatabaseServer concurrent cnxn…
Browse files Browse the repository at this point in the history
…s; plus make closing `ITestMetastoreDatabase` `alwaysRun = true` and doc reason (#3948)

* Make closing `ITestMetastoreDatabase` more uniform, document reason, and ensure `alwaysRun = true`
* Increase concurrent cnxns for `TestMetastoreDatabaseServer` to 501
* Make remaining `GobblinServiceHATest` `tearDown` resilient to `mysql.stop()` failure
  • Loading branch information
phet committed May 9, 2024
1 parent 8eeb743 commit c823e98
Show file tree
Hide file tree
Showing 23 changed files with 162 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void setUp() throws Exception {
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (testMetastoreDatabase != null) {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
testMetastoreDatabase.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class TestMetastoreDatabaseServer implements Closeable {
.withPort(this.dbPort)
.withUser(this.dbUserName, this.dbUserPassword)
.withServerVariable("explicit_defaults_for_timestamp", "off")
// default `max_connections` is apparently 151 - see: https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_max_connections
.withServerVariable("max_connections", "501")
.build();
if (this.embeddedMysqlEnabled) {
testingMySqlServer = EmbeddedMysql.anEmbeddedMysql(config).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.gobblin.runtime;

import java.io.IOException;
import java.net.URI;

import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -124,8 +123,9 @@ public void setUp() throws Exception {
this.testDb = TestMetastoreDatabaseFactory.get();
}

@AfterClass
public void tearDown() throws IOException {
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
this.testDb.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,12 @@ public void testDeleteDatasetJobState() throws IOException {
Assert.assertNull(datasetState);
}

@AfterClass
public void tearDown() throws IOException {
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
dbJobStateStore.delete(TEST_JOB_NAME);
dbDatasetStateStore.delete(TEST_JOB_NAME);
if (testMetastoreDatabase != null) {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
testMetastoreDatabase.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ public void setUp() throws Exception {
jobState);
}

@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (this.testMetastoreDatabase != null) {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
this.testMetastoreDatabase.close();
}
}

@Test
public void testClBulkDelete() throws Exception {
String deleteFileText = TEST_JOB_NAME +"\n" + TEST_JOB_NAME2;
Expand Down Expand Up @@ -175,11 +183,4 @@ public void testCliDeleteSingle() throws Exception {

Assert.assertNull(jobState);
}

@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (this.testMetastoreDatabase != null) {
this.testMetastoreDatabase.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.common.base.Predicates;
Expand All @@ -46,17 +47,21 @@ public class TestMysqlJobCatalog {
private static final String PASSWORD = "testPassword";
private static final String TABLE = "job_catalog";

private ITestMetastoreDatabase testDb;
private MysqlJobCatalog cat;
private static ITestMetastoreDatabase testDb;

@BeforeClass
public void setUpClass() throws Exception {
// PERF: when within `@{Before,After}Class` the 2 current tests take only 24s; when `@{Before,After}Method` `.get()`s a per-test DB, the same take 38s
this.testDb = TestMetastoreDatabaseFactory.get();
}

/** create a new DB/`JobCatalog` for each test, so they're completely independent */
@BeforeMethod
public void setUp() throws Exception {
testDb = TestMetastoreDatabaseFactory.get();

Config config = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.METRICS_ENABLED_KEY, "true")
.addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
.addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
.addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
.addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
Expand All @@ -65,6 +70,14 @@ public void setUp() throws Exception {
this.cat = new MysqlJobCatalog(config);
}

@AfterClass(alwaysRun = true)
public void tearDownClass() throws Exception {
if (this.testDb != null) {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
this.testDb.close();
}
}

@Test
public void testCallbacks() throws Exception {
cat.startAsync();
Expand Down Expand Up @@ -214,11 +227,4 @@ public void testMetrics() throws Exception {
cat.stopAsync();
cat.awaitTerminated(1, TimeUnit.SECONDS);
}

@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (testDb != null) {
testDb.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,23 @@ public class MysqlBaseSpecStoreTest {
private static final String PASSWORD = "testPassword";
private static final String TABLE = "base_spec_store";

private ITestMetastoreDatabase testDb;
private MysqlBaseSpecStore specStore;
private final URI uri1 = new URI(new TopologySpec.Builder().getDefaultTopologyCatalogURI().toString() + "1");
private final URI uri2 = new URI(new TopologySpec.Builder().getDefaultTopologyCatalogURI().toString() + "2");
private TopologySpec topoSpec1, topoSpec2;
private static ITestMetastoreDatabase testDb;

public MysqlBaseSpecStoreTest()
throws URISyntaxException { // (based on `uri1` and other initializations just above)
}

@BeforeClass
public void setUp() throws Exception {
testDb = TestMetastoreDatabaseFactory.get();
this.testDb = TestMetastoreDatabaseFactory.get();

// prefix keys to demonstrate disambiguation mechanism used to side-step intentially-sabatoged non-prefixed, 'fallback'
Config config = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, " SABATOGE! !" + testDb.getJdbcUrl())
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, " SABATOGE! !" + this.testDb.getJdbcUrl())
.addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
.addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
Expand Down Expand Up @@ -93,8 +93,9 @@ public void setUp() throws Exception {

@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (testDb != null) {
testDb.close();
if (this.testDb != null) {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
this.testDb.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,25 @@ public class MysqlSpecStoreTest {
private static final String PASSWORD = "testPassword";
private static final String TABLE = "spec_store";

private ITestMetastoreDatabase testDb;
private MysqlSpecStore specStore;
private MysqlSpecStore oldSpecStore;
private final URI uri1 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg1").setFlowGroup("fn1"));
private final URI uri2 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg2").setFlowGroup("fn2"));
private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg3").setFlowGroup("fn3"));
private final URI uri4 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg4").setFlowGroup("fn4"));
private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4;
private static ITestMetastoreDatabase testDb;

public MysqlSpecStoreTest()
throws URISyntaxException { // (based on `uri1` and other initializations just above)
}

@BeforeClass
public void setUp() throws Exception {
testDb = TestMetastoreDatabaseFactory.get();
this.testDb = TestMetastoreDatabaseFactory.get();

Config config = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
Expand Down Expand Up @@ -134,8 +134,9 @@ public void setUp() throws Exception {

@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (testDb != null) {
testDb.close();
if (this.testDb != null) {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
this.testDb.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,22 @@ public class MysqlSpecStoreWithUpdateTest {
private static final String PASSWORD = "testPassword";
private static final String TABLE = "spec_store";

private ITestMetastoreDatabase testDb;
private MysqlSpecStoreWithUpdate specStore;
private MysqlSpecStore oldSpecStore;
private final URI uri1 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg1").setFlowGroup("fn1"));
private final URI uri2 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg2").setFlowGroup("fn2"));
private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg3").setFlowGroup("fn3"));
private final URI uri4 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg4").setFlowGroup("fn4"));
private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4, flowSpec4_update;
private static ITestMetastoreDatabase testDb;

public MysqlSpecStoreWithUpdateTest()
throws URISyntaxException { // (based on `uri1` and other initializations just above)
}

@BeforeClass
public void setUp() throws Exception {
testDb = TestMetastoreDatabaseFactory.get();
this.testDb = TestMetastoreDatabaseFactory.get();

Config config = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
Expand Down Expand Up @@ -144,8 +144,9 @@ public void setUp() throws Exception {

@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (testDb != null) {
testDb.close();
if (this.testDb != null) {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
this.testDb.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public class GobblinServiceHATest {

private TestingServer testingZKServer;

private ITestMetastoreDatabase testMetastoreDatabase;
private MySQLContainer mysql;
private static ITestMetastoreDatabase testMetastoreDatabase;

@BeforeClass
public void setup() throws Exception {
Expand All @@ -122,7 +122,6 @@ public void setup() throws Exception {
logger.info("Testing ZK Server listening on: " + testingZKServer.getConnectString());
HelixUtils.createGobblinHelixCluster(testingZKServer.getConnectString(), TEST_HELIX_CLUSTER_NAME);


testMetastoreDatabase = TestMetastoreDatabaseFactory.get();

Properties commonServiceCoreProperties = new Properties();
Expand Down Expand Up @@ -202,9 +201,6 @@ private void cleanUpDir(String dir) throws Exception {

@AfterClass(alwaysRun = true)
public void cleanUp() throws Exception {
if (testMetastoreDatabase != null) {
testMetastoreDatabase.close();
}
// Shutdown Node 1
try {
logger.info("+++++++++++++++++++ start shutdown noad1");
Expand Down Expand Up @@ -254,7 +250,14 @@ public void cleanUp() throws Exception {

cleanUpDir(COMMON_SPEC_STORE_PARENT_DIR);

mysql.stop();
try {
mysql.stop();
} catch (Exception e) {
logger.warn("Could not completely stop Mysql");
}

// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
testMetastoreDatabase.close();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ public class GobblinServiceRedirectTest {
private Properties node1ServiceCoreProperties;
private Properties node2ServiceCoreProperties;

private ITestMetastoreDatabase testMetastoreDatabase;
private MySQLContainer mysql;
private static ITestMetastoreDatabase testMetastoreDatabase;

@BeforeClass
public void setup() throws Exception {
Expand All @@ -124,7 +124,7 @@ public void setup() throws Exception {
logger.info("Testing ZK Server listening on: " + testingZKServer.getConnectString());
HelixUtils.createGobblinHelixCluster(testingZKServer.getConnectString(), TEST_HELIX_CLUSTER_NAME);

testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();

Properties commonServiceCoreProperties = new Properties();

Expand Down Expand Up @@ -224,8 +224,9 @@ public void cleanUp() throws Exception {
}

mysql.stop();
if (testMetastoreDatabase != null) {
testMetastoreDatabase.close();
if (this.testMetastoreDatabase != null) {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
this.testMetastoreDatabase.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@ public class DagManagementTaskStreamImplTest {
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
private static final String TEST_TABLE = "quotas";
static ITestMetastoreDatabase testMetastoreDatabase;
private ITestMetastoreDatabase testMetastoreDatabase;
DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
DagManagementTaskStreamImpl dagManagementTaskStream;
DagProcFactory dagProcFactory;

@BeforeClass
public void setUp() throws Exception {
// Setting up mock DB
testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();

ConfigBuilder configBuilder = ConfigBuilder.create();
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY), testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY), this.testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY), TEST_USER)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY), TEST_PASSWORD)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY), TEST_TABLE);
Expand All @@ -85,6 +85,12 @@ public void setUp() throws Exception {
this.dagManagementTaskStream, this.dagProcFactory, dagManagementStateStore, 0);
}

@AfterClass(alwaysRun = true)
public void tearDown() throws IOException {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
this.testMetastoreDatabase.close();
}

/* This tests adding and removal of dag actions from dag task stream with a launch task. It verifies that the
{@link DagManagementTaskStreamImpl#next()} call blocks until a {@link LeaseAttemptStatus.LeaseObtainedStatus} is
returned for a particular action.
Expand All @@ -111,9 +117,4 @@ statuses that should cause the next() method to continue polling for tasks befor
DagProc dagProc = dagTask.host(this.dagProcFactory);
Assert.assertNotNull(dagProc);
}

@AfterClass
public void tearDown() throws IOException {
testMetastoreDatabase.close();
}
}

0 comments on commit c823e98

Please sign in to comment.