Skip to content

Commit c1370aa

Browse files
remove Spark 2 support
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 6e18732 commit c1370aa

30 files changed

+12
-1317
lines changed

.circleci/workflows/openlineage-spark.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ workflows:
2222
matrix:
2323
parameters:
2424
env-variant: [
25-
'java:8-spark:2.4.8-scala:2.12',
2625
'java:8-spark:3.2.4-scala:2.12-full-tests',
2726
'java:8-spark:3.2.4-scala:2.13-full-tests',
2827
'java:8-spark:3.3.4-scala:2.12-full-tests',
@@ -114,7 +113,6 @@ workflows:
114113
matrix:
115114
parameters:
116115
env-variant: [
117-
'java:8-spark:2.4.8-scala:2.12',
118116
'java:8-spark:3.2.4-scala:2.12-full-tests',
119117
'java:8-spark:3.2.4-scala:2.13-full-tests',
120118
'java:8-spark:3.3.4-scala:2.12-full-tests',

integration/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
|Apache Airflow| 2.5 - 2.7 |https://github.com/apache/airflow/releases|[README](./airflow/README.md)|Support for Airflow 2.7+ is maintained in official Apache Airflow repository|
55
|Dagster| 0.13.8+ |https://github.com/dagster-io/dagster/releases|[README](./dagster/README.md)| |
66
|dbt| 0.20+ |https://github.com/dbt-labs/dbt-core/releases|[README](./dbt/README.md)| |
7-
|Apache Spark| 2.4 - 4.0 |https://github.com/apache/spark/tags|[README](./spark/README.md)| |
8-
|Apache Flink| 1.15 - 1.19 |https://github.com/apache/flink/tags|[README](./flink/README.md)| |
7+
|Apache Spark| 3.0 - 4.0 |https://github.com/apache/spark/tags|[README](./spark/README.md)| |
8+
|Apache Flink| 1.15 - 1.19, 2.0 |https://github.com/apache/flink/tags|[README](./flink/README.md)| |
99

1010
----
1111
SPDX-License-Identifier: Apache-2.0\

integration/spark/app/build.gradle

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,6 @@ addDependenciesToConfiguration(additionalJarsConfigurationName, spark, scala, th
8585
dependencies {
8686
implementation(project(path: ":shared"))
8787
implementation(project(path: ":shared", configuration: activeRuntimeElementsConfiguration))
88-
implementation(project(path: ":spark2"))
89-
implementation(project(path: ":spark2", configuration: "scala212RuntimeElements"))
9088
implementation(project(path: ":spark3"))
9189
implementation(project(path: ":spark3", configuration: activeRuntimeElementsConfiguration))
9290

@@ -253,7 +251,6 @@ tasks.named("shadowJar", ShadowJar.class) {
253251

254252
minimize() {
255253
exclude(project(path: ":shared", configuration: activeRuntimeElementsConfiguration))
256-
exclude(project(path: ":spark2", configuration: "scala212RuntimeElements"))
257254
exclude(project(path: ":spark3", configuration: activeRuntimeElementsConfiguration))
258255
exclude(project(path: ":spark31", configuration: "scala212RuntimeElements"))
259256
exclude(project(path: ":spark32", configuration: activeRuntimeElementsConfiguration))

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/DatasetBuilderFactoryProvider.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99

1010
public class DatasetBuilderFactoryProvider {
1111

12-
private static final String SPARK2_FACTORY_NAME =
13-
"io.openlineage.spark.agent.lifecycle.Spark2DatasetBuilderFactory";
1412
private static final String SPARK3_FACTORY_NAME =
1513
"io.openlineage.spark.agent.lifecycle.Spark3DatasetBuilderFactory";
1614
private static final String SPARK32_FACTORY_NAME =
@@ -38,9 +36,7 @@ public static DatasetBuilderFactory getInstance() {
3836
}
3937

4038
static String getDatasetBuilderFactoryForVersion(String version) {
41-
if (version.startsWith("2.")) {
42-
return SPARK2_FACTORY_NAME;
43-
} else if (version.startsWith("3.2")) {
39+
if (version.startsWith("3.2")) {
4440
return SPARK32_FACTORY_NAME;
4541
} else if (version.startsWith("3.3")) {
4642
return SPARK33_FACTORY_NAME;

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,9 @@ public void setActiveJob(ActiveJob activeJob) {
149149
*/
150150
private Field getConfigField(ResultStage resultStage) throws NoSuchFieldException {
151151
try {
152-
return resultStage.func().getClass().getDeclaredField("config$1");
153-
} catch (NoSuchFieldException e) {
154152
return resultStage.func().getClass().getDeclaredField("arg$1");
153+
} catch (NoSuchFieldException e) {
154+
return resultStage.func().getClass().getDeclaredField("config$1");
155155
}
156156
}
157157

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark2DatasetBuilderFactory.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark2VisitorFactoryImpl.java

Lines changed: 0 additions & 39 deletions
This file was deleted.

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/VisitorFactoryProvider.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@
99

1010
class VisitorFactoryProvider {
1111

12-
private static final String SPARK2_FACTORY_NAME =
13-
"io.openlineage.spark.agent.lifecycle.Spark2VisitorFactoryImpl";
14-
1512
private static final String SPARK3_FACTORY_NAME =
1613
"io.openlineage.spark.agent.lifecycle.Spark3VisitorFactoryImpl";
1714

@@ -29,9 +26,7 @@ static VisitorFactory getInstance() {
2926
}
3027

3128
static String getVisitorFactoryForVersion(String version) {
32-
if (version.startsWith("2.")) {
33-
return SPARK2_FACTORY_NAME;
34-
} else if (version.startsWith("3.2")) {
29+
if (version.startsWith("3.2")) {
3530
return SPARK32_FACTORY_NAME;
3631
} else {
3732
return SPARK3_FACTORY_NAME;

integration/spark/app/src/test/java/io/openlineage/spark/agent/SparkContainerIntegrationTest.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@ class SparkContainerIntegrationTest {
5454
private static final MockServerContainer openLineageClientMockContainer =
5555
SparkContainerUtils.makeMockServerContainer(network);
5656

57-
private static final String GREATER_THAN_SPARK2 = "([34].*)";
5857
private static final String PACKAGES = "--packages";
59-
private static final String SPARK_VERSION = "spark.version";
6058

6159
private static GenericContainer<?> pyspark;
6260
private static GenericContainer<?> kafka;
@@ -103,9 +101,6 @@ public static void tearDownMockServer() {
103101
}
104102

105103
@Test
106-
@EnabledIfSystemProperty(
107-
named = SPARK_VERSION,
108-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
109104
void testPysparkWordCountWithCliArgs() {
110105
SparkContainerUtils.runPysparkContainerWithDefaultConf(
111106
network,
@@ -120,9 +115,6 @@ void testPysparkWordCountWithCliArgs() {
120115
}
121116

122117
@Test
123-
@EnabledIfSystemProperty(
124-
named = SPARK_VERSION,
125-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
126118
void testPysparkRddToTable() {
127119
SparkContainerUtils.runPysparkContainerWithDefaultConf(
128120
network, openLineageClientMockContainer, "testPysparkRddToTable", "spark_rdd_to_table.py");
@@ -188,9 +180,6 @@ void testPysparkKafkaReadAssign() {
188180
}
189181

190182
@Test
191-
@EnabledIfSystemProperty(
192-
named = SPARK_VERSION,
193-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
194183
void testPysparkSQLHiveTest() {
195184
SparkContainerUtils.runPysparkContainerWithDefaultConf(
196185
network, openLineageClientMockContainer, "testPysparkSQLHiveTest", "spark_hive.py");
@@ -204,9 +193,6 @@ void testPysparkSQLHiveTest() {
204193
}
205194

206195
@Test
207-
@EnabledIfSystemProperty(
208-
named = SPARK_VERSION,
209-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
210196
void testPysparkSQLHadoopFSTest() {
211197
SparkContainerUtils.runPysparkContainerWithDefaultConf(
212198
network,
@@ -219,9 +205,6 @@ void testPysparkSQLHadoopFSTest() {
219205
}
220206

221207
@Test
222-
@EnabledIfSystemProperty(
223-
named = SPARK_VERSION,
224-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
225208
void testOverwriteName() {
226209
SparkContainerUtils.runPysparkContainerWithDefaultConf(
227210
network,
@@ -239,9 +222,6 @@ void testOverwriteName() {
239222
}
240223

241224
@Test
242-
@EnabledIfSystemProperty(
243-
named = SPARK_VERSION,
244-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
245225
void testPysparkSQLOverwriteDirHiveTest() {
246226
SparkContainerUtils.runPysparkContainerWithDefaultConf(
247227
network,
@@ -255,9 +235,6 @@ void testPysparkSQLOverwriteDirHiveTest() {
255235
}
256236

257237
@Test
258-
@EnabledIfSystemProperty(
259-
named = SPARK_VERSION,
260-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
261238
void testCreateAsSelectAndLoad() {
262239
SparkContainerUtils.runPysparkContainerWithDefaultConf(
263240
network, openLineageClientMockContainer, "testCreateAsSelectAndLoad", "spark_ctas_load.py");
@@ -268,26 +245,17 @@ void testCreateAsSelectAndLoad() {
268245
"pysparkLoadStart.json",
269246
"pysparkLoadComplete.json");
270247

271-
if (System.getProperty(SPARK_VERSION).matches(GREATER_THAN_SPARK2)) {
272-
// verify CTAS contains column level lineage
273-
verifyEvents(mockServerClient, "pysparkCTASWithColumnLineageEnd.json");
274-
}
248+
verifyEvents(mockServerClient, "pysparkCTASWithColumnLineageEnd.json");
275249
}
276250

277251
@Test
278-
@EnabledIfSystemProperty(
279-
named = SPARK_VERSION,
280-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
281252
void testCachedDataset() {
282253
SparkContainerUtils.runPysparkContainerWithDefaultConf(
283254
network, openLineageClientMockContainer, "cachedDataset", "spark_cached.py");
284255
verifyEvents(mockServerClient, "pysparkCachedDatasetComplete.json");
285256
}
286257

287258
@Test
288-
@EnabledIfSystemProperty(
289-
named = SPARK_VERSION,
290-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
291259
void testSymlinksFacetForHiveCatalog() {
292260
SparkContainerUtils.runPysparkContainerWithDefaultConf(
293261
network, openLineageClientMockContainer, "symlinks", "spark_hive_catalog.py");
@@ -335,7 +303,6 @@ void testTruncateTable() {
335303
}
336304

337305
@Test
338-
@EnabledIfSystemProperty(named = SPARK_VERSION, matches = GREATER_THAN_SPARK2)
339306
void testOptimizedCreateAsSelectAndLoad() {
340307
SparkContainerUtils.runPysparkContainerWithDefaultConf(
341308
network,
@@ -346,7 +313,6 @@ void testOptimizedCreateAsSelectAndLoad() {
346313
}
347314

348315
@Test
349-
@EnabledIfSystemProperty(named = SPARK_VERSION, matches = GREATER_THAN_SPARK2)
350316
void testColumnLevelLineage() {
351317
SparkContainerUtils.runPysparkContainerWithDefaultConf(
352318
network, openLineageClientMockContainer, "testColumnLevelLineage", "spark_cll.py");

integration/spark/app/src/test/java/io/openlineage/spark/agent/lifecycle/SparkReadWriteIntegTest.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ class SparkReadWriteIntegTest {
101101
private static final String NAME = "name";
102102
private static final String AGE = "age";
103103
private static final String FILE_URI_PREFIX = "file://";
104-
private static final String GREATER_THAN_SPARK2 = "([34].*)";
105104
private static final String SPARK_VERSION = "spark.version";
106105

107106
private final KafkaContainer kafkaContainer =
@@ -127,9 +126,6 @@ public void tearDown() {
127126
}
128127

129128
@Test
130-
@EnabledIfSystemProperty(
131-
named = SPARK_VERSION,
132-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
133129
void testReadFromFileWriteToJdbc(@TempDir Path writeDir, SparkSession spark)
134130
throws InterruptedException, TimeoutException, IOException {
135131
Path testFile = writeTestDataToFile(writeDir);
@@ -356,9 +352,6 @@ void testWithLogicalRdd(@TempDir Path tmpDir, SparkSession spark)
356352
}
357353

358354
@Test
359-
@EnabledIfSystemProperty(
360-
named = SPARK_VERSION,
361-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
362355
void testCreateDataSourceTableAsSelect(@TempDir Path tmpDir, SparkSession spark)
363356
throws InterruptedException, TimeoutException, IOException {
364357
Path testFile = writeTestDataToFile(tmpDir);
@@ -602,9 +595,6 @@ void testSingleFileDatasets(@TempDir Path writeDir, SparkSession spark)
602595

603596
@Test
604597
@EnabledIfEnvironmentVariable(named = "CI", matches = "true")
605-
@EnabledIfSystemProperty(
606-
named = SPARK_VERSION,
607-
matches = GREATER_THAN_SPARK2) // Spark version >= 3.*
608598
void testExternalRDDWithS3Bucket(SparkSession spark)
609599
throws InterruptedException, TimeoutException, IOException, URISyntaxException {
610600
spark.conf().set("fs.s3a.secret.key", System.getenv("S3_SECRET_KEY"));

0 commit comments

Comments
 (0)