Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft PR for DLO library #77

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions apps/spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
implementation project(':client:secureclient')
implementation project(':services:common')
implementation project(':cluster:storage')
implementation project(':libs:datalayout')
compileOnly project(':integrations:spark:openhouse-spark-runtime_2.12')
implementation ('org.apache.spark:spark-core_2.12:' + sparkVersion) {
exclude group: 'io.netty'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public final class DataCompactionConfig {
RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT;
public static final int DEFAULT_PARTIAL_PROGRESS_MAX_COMMITS =
RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT;
public static final double DEFAULT_ENTROPY_THRESHOLD = 10000.0; // 100MB variance

private long targetByteSize;
private long minByteSize;
Expand All @@ -30,4 +31,5 @@ public final class DataCompactionConfig {
private int maxConcurrentFileGroupRewrites;
private boolean partialProgressEnabled;
private int partialProgressMaxCommits;
private double entropyThreshold;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.linkedin.openhouse.jobs.spark;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.linkedin.openhouse.datalayout.layoutselection.DataOptimizationLayout;
import com.linkedin.openhouse.jobs.config.DataCompactionConfig;
import com.linkedin.openhouse.jobs.spark.state.StateManager;
import com.linkedin.openhouse.jobs.util.AppConstants;
Expand All @@ -10,6 +13,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.iceberg.actions.RewriteDataFiles;

Expand All @@ -33,6 +37,16 @@ protected DataCompactionSparkApp(
@Override
protected void runInner(Operations ops) {
log.info("Rewrite data files app start for table {}, config {}", fqtn, config);
Gson gson = new GsonBuilder().create();
String serializedLayout =
ops.spark()
.sql(String.format("SHOW TBLPROPERTIES %s ('data-layout')", fqtn))
.collectAsList()
.get(0)
.getString(1);
DataOptimizationLayout dataCompactionLayout =
gson.fromJson(
StringEscapeUtils.unescapeJava(serializedLayout), DataOptimizationLayout.class);
RewriteDataFiles.Result result =
ops.rewriteDataFiles(
ops.getTable(fqtn),
Expand Down Expand Up @@ -127,6 +141,8 @@ public static DataCompactionSparkApp createApp(String[] args) {
"partialProgressMaxCommits",
true,
"Maximum amount of commits that this rewrite is allowed to produce if partial progress is enabled"));
extraOptions.add(
new Option(null, "entropyThreshold", true, "Entropy threshold for file rewrite"));
CommandLine cmdLine = createCommandLine(args, extraOptions);
long targetByteSize =
NumberUtils.toLong(
Expand All @@ -146,6 +162,10 @@ public static DataCompactionSparkApp createApp(String[] args) {
if (maxByteSizeRatio <= 1.0) {
throw new RuntimeException("maxByteSizeRatio must be greater than 1.0");
}
double entropyThreshold =
NumberUtils.toDouble(
cmdLine.getOptionValue("entropyThreshold"),
DataCompactionConfig.DEFAULT_ENTROPY_THRESHOLD);
return new DataCompactionSparkApp(
getJobId(cmdLine),
createStateManager(cmdLine),
Expand All @@ -168,6 +188,7 @@ public static DataCompactionSparkApp createApp(String[] args) {
cmdLine.getOptionValue("partialProgressMaxCommits"),
com.linkedin.openhouse.jobs.config.DataCompactionConfig
.DEFAULT_PARTIAL_PROGRESS_MAX_COMMITS))
.entropyThreshold(entropyThreshold)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@

@SuppressWarnings("unchecked")
public class JobsClientTest {
private static final String testClusterId = "test_cluster_id";
private static final String testJobName = "test_job";
private static final String testJobId = "test_job_id";
private static final String testProxyUser = "test_proxy_user";
private static final JobConf.JobTypeEnum testJobType = JobConf.JobTypeEnum.RETENTION;
private final JobsClientFactory clientFactory = new JobsClientFactory("base_path", testClusterId);
private static final String TEST_CLUSTER_ID = "test_cluster_id";
private static final String TEST_JOB_NAME = "test_job";
private static final String TEST_JOB_ID = "test_job_id";
private static final String TEST_PROXY_USER = "test_proxy_user";
private static final JobConf.JobTypeEnum TEST_JOB_TYPE = JobConf.JobTypeEnum.RETENTION;
private final JobsClientFactory clientFactory =
new JobsClientFactory("base_path", TEST_CLUSTER_ID);
private JobApi apiMock;
private JobsClient client;

Expand All @@ -44,22 +45,23 @@ void setup() {
void testLaunch() {
// check success
JobResponseBody responseBody = new JobResponseBody();
responseBody.setJobId(testJobId);
responseBody.setJobId(TEST_JOB_ID);
Mono<JobResponseBody> responseMock = (Mono<JobResponseBody>) Mockito.mock(Mono.class);
Mockito.when(responseMock.block(Mockito.any(Duration.class))).thenReturn(responseBody);
Mockito.when(apiMock.createJob(Mockito.any(CreateJobRequestBody.class)))
.thenReturn(responseMock);

List<String> testArgs = Arrays.asList("-t", "test_table");
Assertions.assertEquals(
testJobId, client.launch(testJobName, testJobType, testProxyUser, testArgs).orElse(null));
TEST_JOB_ID,
client.launch(TEST_JOB_NAME, TEST_JOB_TYPE, TEST_PROXY_USER, testArgs).orElse(null));

ArgumentCaptor<CreateJobRequestBody> argumentCaptor =
ArgumentCaptor.forClass(CreateJobRequestBody.class);
Mockito.verify(apiMock, Mockito.times(1)).createJob(argumentCaptor.capture());
CreateJobRequestBody actualRequest = argumentCaptor.getValue();
Assertions.assertEquals(testClusterId, actualRequest.getClusterId());
Assertions.assertEquals(testJobName, actualRequest.getJobName());
Assertions.assertEquals(TEST_CLUSTER_ID, actualRequest.getClusterId());
Assertions.assertEquals(TEST_JOB_NAME, actualRequest.getJobName());
Assertions.assertNotNull(actualRequest.getJobConf());
Assertions.assertEquals(testArgs, actualRequest.getJobConf().getArgs());
Assertions.assertEquals(JobConf.JobTypeEnum.RETENTION, actualRequest.getJobConf().getJobType());
Expand All @@ -69,7 +71,7 @@ void testLaunch() {
Mockito.when(apiMock.createJob(Mockito.any(CreateJobRequestBody.class)))
.thenReturn(responseMock);
Assertions.assertFalse(
client.launch(testJobName, testJobType, testProxyUser, testArgs).isPresent());
client.launch(TEST_JOB_NAME, TEST_JOB_TYPE, TEST_PROXY_USER, testArgs).isPresent());
}

@Test
Expand All @@ -79,32 +81,32 @@ void testGetState() {
responseBody.setState(JobResponseBody.StateEnum.RUNNING);
Mono<JobResponseBody> responseMock = (Mono<JobResponseBody>) Mockito.mock(Mono.class);
Mockito.when(responseMock.block(Mockito.any(Duration.class))).thenReturn(responseBody);
Mockito.when(apiMock.getJob(testJobId)).thenReturn(responseMock);
Assertions.assertEquals(JobState.RUNNING, client.getState(testJobId).orElse(null));
Mockito.when(apiMock.getJob(TEST_JOB_ID)).thenReturn(responseMock);
Assertions.assertEquals(JobState.RUNNING, client.getState(TEST_JOB_ID).orElse(null));

// check succeeded
responseBody = new JobResponseBody();
responseBody.setState(JobResponseBody.StateEnum.SUCCEEDED);
Mockito.when(responseMock.block(Mockito.any(Duration.class))).thenReturn(responseBody);
Mockito.when(apiMock.getJob(testJobId)).thenReturn(responseMock);
Assertions.assertEquals(JobState.SUCCEEDED, client.getState(testJobId).orElse(null));
Mockito.when(apiMock.getJob(TEST_JOB_ID)).thenReturn(responseMock);
Assertions.assertEquals(JobState.SUCCEEDED, client.getState(TEST_JOB_ID).orElse(null));

// check no response
Mockito.when(responseMock.block(Mockito.any(Duration.class))).thenReturn(null);
Mockito.when(apiMock.getJob(testJobId)).thenReturn(responseMock);
Assertions.assertFalse(client.getState(testJobId).isPresent());
Mockito.when(apiMock.getJob(TEST_JOB_ID)).thenReturn(responseMock);
Assertions.assertFalse(client.getState(TEST_JOB_ID).isPresent());
}

@Test
void testCancel() {
// check success
Mono<Void> responseMock = (Mono<Void>) Mockito.mock(Mono.class);
Mockito.when(responseMock.block(Mockito.any(Duration.class))).thenReturn(null);
Mockito.when(apiMock.cancelJob(testJobId)).thenReturn(responseMock);
Assertions.assertTrue(client.cancelJob(testJobId));
Mockito.when(apiMock.cancelJob(TEST_JOB_ID)).thenReturn(responseMock);
Assertions.assertTrue(client.cancelJob(TEST_JOB_ID));

// check failure
Mockito.when(apiMock.cancelJob(testJobId)).thenThrow(WebClientResponseException.class);
Assertions.assertFalse(client.cancelJob(testJobId));
Mockito.when(apiMock.cancelJob(TEST_JOB_ID)).thenThrow(WebClientResponseException.class);
Assertions.assertFalse(client.cancelJob(TEST_JOB_ID));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -706,12 +706,12 @@ private static List<Long> getSnapshotIds(Operations ops, String tableName) {

private static List<String> getSnapshotOperationTypes(Operations ops, String tableName) {
log.info("Getting snapshot Operations");
List<Row> ordered_snapshots =
List<Row> orderedSnapshots =
ops.spark()
.sql(String.format("SELECT * FROM %s.snapshots order by committed_at desc", tableName))
.collectAsList();
ordered_snapshots.forEach(s -> log.info(s.toString()));
return ordered_snapshots.stream()
orderedSnapshots.forEach(s -> log.info(s.toString()));
return orderedSnapshots.stream()
.map(r -> r.getString(r.fieldIndex("operation")))
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

@SuppressWarnings("unchecked")
public class StateManagerTest {
private static final String testJobId = "test_job_id";
private static final String TEST_JOB_ID = "test_job_id";
private JobApi apiMock;

@BeforeEach
Expand All @@ -35,18 +35,18 @@ void testSuccess() {

Mockito.when(responseEntityMock.getEntity()).thenReturn(jobMock);
Mockito.when(responseMock.block(Mockito.any())).thenReturn(responseEntityMock);
Mockito.when(apiMock.getJob(Mockito.refEq(testJobId))).thenReturn(responseMock);
Mockito.when(apiMock.getJob(Mockito.refEq(TEST_JOB_ID))).thenReturn(responseMock);
Mockito.when(apiMock.putJob(Mockito.any())).thenReturn(responseMock);

Instant now = Instant.now();

StateManager stateManager = createStateManager(0);
try (MockedStatic<Instant> instant = Mockito.mockStatic(Instant.class)) {
instant.when(Instant::now).thenReturn(now);
stateManager.updateStartTime(testJobId);
stateManager.updateFinishTime(testJobId);
stateManager.sendHeartbeat(testJobId);
stateManager.updateState(testJobId, JobState.RUNNING);
stateManager.updateStartTime(TEST_JOB_ID);
stateManager.updateFinishTime(TEST_JOB_ID);
stateManager.sendHeartbeat(TEST_JOB_ID);
stateManager.updateState(TEST_JOB_ID, JobState.RUNNING);
Mockito.verify(jobMock).setStartTimeMs(now.toEpochMilli());
Mockito.verify(jobMock).setFinishTimeMs(now.toEpochMilli());
Mockito.verify(jobMock).setHeartbeatTimeMs(now.toEpochMilli());
Expand All @@ -61,18 +61,18 @@ void testGetJobFailure() {
(Mono<EntityResponseBodyJob>) Mockito.mock(Mono.class);

Mockito.when(responseMock.block(Mockito.any())).thenReturn(null);
Mockito.when(apiMock.getJob(Mockito.refEq(testJobId))).thenReturn(responseMock);
Mockito.when(apiMock.getJob(Mockito.refEq(TEST_JOB_ID))).thenReturn(responseMock);

Instant now = Instant.now();
StateManager stateManager = createStateManager(numAttempts);
try (MockedStatic<Instant> instant = Mockito.mockStatic(Instant.class)) {
instant.when(Instant::now).thenReturn(now);
stateManager.updateStartTime(testJobId);
stateManager.updateFinishTime(testJobId);
stateManager.sendHeartbeat(testJobId);
stateManager.updateState(testJobId, JobState.RUNNING);
stateManager.updateStartTime(TEST_JOB_ID);
stateManager.updateFinishTime(TEST_JOB_ID);
stateManager.sendHeartbeat(TEST_JOB_ID);
stateManager.updateState(TEST_JOB_ID, JobState.RUNNING);
Mockito.verify(apiMock, Mockito.never()).putJob(Mockito.any());
Mockito.verify(apiMock, Mockito.times(numAttempts * 4)).getJob(Mockito.eq(testJobId));
Mockito.verify(apiMock, Mockito.times(numAttempts * 4)).getJob(Mockito.eq(TEST_JOB_ID));
}
}

Expand All @@ -83,19 +83,19 @@ void testException() {
(Mono<EntityResponseBodyJob>) Mockito.mock(Mono.class);

Mockito.when(responseMock.block(Mockito.any())).thenThrow(ReadTimeoutException.class);
Mockito.when(apiMock.getJob(Mockito.refEq(testJobId))).thenReturn(responseMock);
Mockito.when(apiMock.getJob(Mockito.refEq(TEST_JOB_ID))).thenReturn(responseMock);

Instant now = Instant.now();

StateManager stateManager = createStateManager(numAttempts);
try (MockedStatic<Instant> instant = Mockito.mockStatic(Instant.class)) {
instant.when(Instant::now).thenReturn(now);
stateManager.updateStartTime(testJobId);
stateManager.updateFinishTime(testJobId);
stateManager.sendHeartbeat(testJobId);
stateManager.updateState(testJobId, JobState.RUNNING);
stateManager.updateStartTime(TEST_JOB_ID);
stateManager.updateFinishTime(TEST_JOB_ID);
stateManager.sendHeartbeat(TEST_JOB_ID);
stateManager.updateState(TEST_JOB_ID, JobState.RUNNING);
Mockito.verify(apiMock, Mockito.never()).putJob(Mockito.any());
Mockito.verify(apiMock, Mockito.times(numAttempts * 4)).getJob(Mockito.eq(testJobId));
Mockito.verify(apiMock, Mockito.times(numAttempts * 4)).getJob(Mockito.eq(TEST_JOB_ID));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,46 @@

public class DeleteStagedFilesTest extends OpenHouseSparkITest {
static java.nio.file.Path baseDir = FileSystems.getDefault().getPath(".").toAbsolutePath();
static final String testDir = "oh_delete_test";
static final Path testPath = new Path(baseDir.toString(), testDir);
static final String dirPrefix = "dir_";
static final String filePrefix = "file_";
static final String TEST_DIR = "oh_delete_test";
static final Path TEST_PATH = new Path(baseDir.toString(), TEST_DIR);
static final String DIR_PREFIX = "dir_";
static final String FILE_PREFIX = "file_";
private final Meter meter = OtelConfig.getMeter(this.getClass().getName());

@BeforeEach
@AfterEach
void cleanUpTestFiles() throws Exception {
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
FileSystem fs = ops.fs();
fs.delete(testPath, true);
Assertions.assertFalse(fs.exists(testPath));
fs.delete(TEST_PATH, true);
Assertions.assertFalse(fs.exists(TEST_PATH));
}
}

@Test
void testShouldDeleteFilesOlderThanXDaysInDir() throws Exception {
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
FileSystem fs = ops.fs();
generateDirStructure(fs, testPath.toString(), 3, 3);
SparkJobUtil.setModifiedTimeStamp(fs, new Path(testPath.toString(), "dir_2"), 4);
ops.deleteStagedFiles(testPath, 3, true);
Assertions.assertTrue(fs.exists(new Path(testPath + "/dir_0/" + "file_0")));
Assertions.assertFalse(fs.exists(new Path(testPath + "/dir_2/" + "file_0")));
generateDirStructure(fs, TEST_PATH.toString(), 3, 3);
SparkJobUtil.setModifiedTimeStamp(fs, new Path(TEST_PATH.toString(), "dir_2"), 4);
ops.deleteStagedFiles(TEST_PATH, 3, true);
Assertions.assertTrue(fs.exists(new Path(TEST_PATH + "/dir_0/" + "file_0")));
Assertions.assertFalse(fs.exists(new Path(TEST_PATH + "/dir_2/" + "file_0")));
}
}

@Test
void testShouldFindMatchingFilesRecursively() throws Exception {
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
FileSystem fs = ops.fs();
generateDirStructure(fs, testPath.toString(), 4, 2);
SparkJobUtil.setModifiedTimeStamp(fs, new Path(testPath.toString(), "dir_1"), 7);
SparkJobUtil.setModifiedTimeStamp(fs, new Path(testPath.toString(), "dir_3"), 7);
generateDirStructure(fs, TEST_PATH.toString(), 4, 2);
SparkJobUtil.setModifiedTimeStamp(fs, new Path(TEST_PATH.toString(), "dir_1"), 7);
SparkJobUtil.setModifiedTimeStamp(fs, new Path(TEST_PATH.toString(), "dir_3"), 7);
Predicate<FileStatus> predicate =
file ->
file.getModificationTime() < System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7);
List<Path> matchingFiles = Lists.newArrayList();
ops.listFiles(testPath, predicate, true, matchingFiles);
ops.listFiles(TEST_PATH, predicate, true, matchingFiles);
Assertions.assertEquals(4, matchingFiles.size());
}
}
Expand All @@ -67,10 +67,10 @@ private void generateDirStructure(FileSystem fs, String startDir, int depth, int
throws IOException {
fs.delete(new Path(startDir), true);
for (int row = 0; row < depth; ++row) {
Path basePath = new Path(startDir, dirPrefix + row);
Path basePath = new Path(startDir, DIR_PREFIX + row);
fs.mkdirs(basePath);
for (int count = 0; count < filesAtEach; ++count) {
Path fp = new Path(basePath.toString(), filePrefix + count);
Path fp = new Path(basePath.toString(), FILE_PREFIX + count);
fs.createNewFile(fp);
}
}
Expand Down
34 changes: 34 additions & 0 deletions libs/datalayout/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
plugins {
id 'openhouse.java-conventions'
id 'openhouse.hadoop-conventions'
id 'openhouse.iceberg-conventions'
id 'openhouse.maven-publish'
}

ext {
icebergVersion = '1.2.0'
sparkVersion = '3.1.1'
springVersion = '2.6.6'
hadoopVersion = '2.10.0'
}

dependencies {
compileOnly project(':integrations:spark:openhouse-spark-runtime_2.12')
implementation ('org.apache.spark:spark-core_2.12:' + sparkVersion) {
exclude group: 'io.netty'
}
implementation ('org.apache.spark:spark-sql_2.12:' + sparkVersion) {
exclude group: 'io.netty'
}
implementation 'org.apache.hadoop:hadoop-common:2.10.0'
implementation 'org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:' + icebergVersion

testImplementation project(':integrations:spark:openhouse-spark-runtime_2.12')
// Otherwise throws the error: Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and < 2.11.0
testImplementation 'com.fasterxml.jackson.module:jackson-module-scala_2.12:2.13.1'
testImplementation 'org.mockito:mockito-inline:4.11.0'
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
testImplementation 'org.powermock:powermock-api-mockito2:2.0.9'
testImplementation(project(':tables-test-fixtures_2.12'))
testRuntime("org.eclipse.jetty:jetty-server:11.0.2")
}