Skip to content

Commit

Permalink
[FLINK-35293][hive] Hive source supports dynamic parallelism inference
Browse files Browse the repository at this point in the history
  • Loading branch information
SinBex authored and zhuzhurk committed May 14, 2024
1 parent 0158678 commit ddb5a53
Show file tree
Hide file tree
Showing 17 changed files with 632 additions and 48 deletions.
15 changes: 10 additions & 5 deletions docs/content.zh/docs/connectors/table/hive/hive_read_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,21 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig`
</thead>
<tbody>
<tr>
<td><h5>table.exec.hive.infer-source-parallelism</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>如果是 true,会根据 split 的数量推断 source 的并发度。如果是 false,source 的并发度由配置决定。</td>
<td><h5>table.exec.hive.infer-source-parallelism.mode</h5></td>
<td style="word-wrap: break-word;">dynamic</td>
<td>InferMode</td>
<td>选择Hive Source并行度推断模式的选项,根据splits数推断并行度。
'static' 代表静态推断,它会在作业创建阶段推断Source并行度。
'dynamic' 代表动态推断,它会在作业执行阶段利用运行时信息更准确地推断Source并行度。
'none' 代表禁用并行度推断。
注意,它仍然受到已弃用选项 'table.exec.hive.infer-source-parallelism' 的影响,需要其值为 true 才能启用并行度推断。
</td>
</tr>
<tr>
<td><h5>table.exec.hive.infer-source-parallelism.max</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>设置 source operator 推断的最大并发度。</td>
<td>设置 source operator 推断的最大并发度。请注意,默认值仅在静态并行性推断模式下有效。</td>
</tr>
</tbody>
</table>
Expand Down
17 changes: 12 additions & 5 deletions docs/content/docs/connectors/table/hive/hive_read_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,23 @@ following parameters in `TableConfig` (note that these parameters affect all sou
</thead>
<tbody>
<tr>
<td><h5>table.exec.hive.infer-source-parallelism</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>If is true, source parallelism is inferred according to splits number. If is false, parallelism of source are set by config.</td>
<td><h5>table.exec.hive.infer-source-parallelism.mode</h5></td>
<td style="word-wrap: break-word;">dynamic</td>
<td>InferMode</td>
<td>An option for selecting the hive source parallelism inference mode to infer parallelism according to splits number.
'static' represents static inference, which will infer source parallelism at job creation stage.
'dynamic' represents dynamic inference, which will infer parallelism at job execution stage and could more accurately infer the source parallelism.
'none' represents disabling parallelism inference.
Note that it is still affected by the deprecated option 'table.exec.hive.infer-source-parallelism', requiring its value to be true for enabling parallelism inference.
</td>
</tr>
<tr>
<td><h5>table.exec.hive.infer-source-parallelism.max</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>Sets max infer parallelism for source operator.</td>
<td>Sets max infer parallelism for source operator.
Note that the default value is effective only in the static parallelism inference mode.
</td>
</tr>
</tbody>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected AbstractFileSource(
// Getters
// ------------------------------------------------------------------------

FileEnumerator.Provider getEnumeratorFactory() {
protected FileEnumerator.Provider getEnumeratorFactory() {
return enumeratorFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ private boolean reachLimit() {
return globalNumberRead != null && globalNumberRead.get() >= limit;
}

public long getLimit() {
return this.limit;
}

@Override
public boolean isSplittable() {
return format.isSplittable();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connectors.hive;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.table.catalog.ObjectPath;

import org.apache.hadoop.mapred.JobConf;

/**
* The factory class for {@link HiveParallelismInference} to support Hive source dynamic parallelism
* inference.
*/
class HiveDynamicParallelismInferenceFactory implements HiveParallelismInference.Provider {

private final ObjectPath tablePath;
private final JobConf jobConf;
private final int globalMaxParallelism;

HiveDynamicParallelismInferenceFactory(
ObjectPath tablePath, JobConf jobConf, int globalMaxParallelism) {
this.tablePath = tablePath;
this.jobConf = jobConf;
this.globalMaxParallelism = globalMaxParallelism;
}

@Override
public HiveParallelismInference create() {
boolean inferEnabled =
jobConf.getBoolean(
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(),
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.defaultValue());
HiveOptions.InferMode inferMode =
jobConf.getEnum(
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(),
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.defaultValue());
// This logic should be fixed if config option `table.exec.hive.infer-source-parallelism`
// is removed.
boolean infer = inferEnabled && inferMode == HiveOptions.InferMode.DYNAMIC;
int inferMaxParallelism =
Math.min(
(int)
jobConf.getLong(
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX
.key(),
globalMaxParallelism),
globalMaxParallelism);
int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
return new HiveParallelismInference(tablePath, infer, inferMaxParallelism, parallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connectors.hive;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.MemorySize;
Expand Down Expand Up @@ -50,6 +51,8 @@ public class HiveOptions {
"If it is true, flink will read the files of partitioned hive table from subdirectories under the partition directory to be read.\n"
+ "If it is false, an exception that 'not a file: xxx' will be thrown when the partition directory contains any sub-directory.");

/** @deprecated Use {@link #TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} instead. */
@Deprecated
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
key("table.exec.hive.infer-source-parallelism")
.booleanType()
Expand All @@ -58,11 +61,33 @@ public class HiveOptions {
"If is false, parallelism of source are set by config.\n"
+ "If is true, source parallelism is inferred according to splits number.\n");

@PublicEvolving
public static final ConfigOption<InferMode> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE =
key("table.exec.hive.infer-source-parallelism.mode")
.enumType(InferMode.class)
.defaultValue(InferMode.DYNAMIC)
.withDescription(
Description.builder()
.text(
"An option for selecting the hive source parallelism inference mode to infer parallelism according to splits number.")
.list(
text(
"'static' represents static inference, which will infer source parallelism at job creation stage."),
text(
"'dynamic' represents dynamic inference, which will infer parallelism at job execution stage and could more accurately infer the source parallelism."),
text(
"'none' represents disabling parallelism inference."),
text(
"Note that it is still affected by the deprecated option 'table.exec.hive.infer-source-parallelism', requiring its value to be true for enabling parallelism inference."))
.build());

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
key("table.exec.hive.infer-source-parallelism.max")
.intType()
.defaultValue(1000)
.withDescription("Sets max infer parallelism for source operator.");
.withDescription(
"Sets max infer parallelism for source operator. "
+ "Note that the default value is effective only in the static parallelism inference mode.");

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER =
key("table.exec.hive.fallback-mapred-writer")
Expand Down Expand Up @@ -281,4 +306,30 @@ public InlineElement getDescription() {
return description;
}
}

/** Infer mode used for {@link #TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE}. */
public enum InferMode implements DescribedEnum {
STATIC("static", text("Static parallelism inference mode.")),
DYNAMIC("dynamic", text("Dynamic parallelism inference mode.")),
NONE("none", text("Disable parallelism inference."));

private final String value;

private final InlineElement description;

InferMode(String value, InlineElement description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return description;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

package org.apache.flink.connectors.hive;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

import org.slf4j.Logger;
Expand All @@ -40,18 +37,12 @@ class HiveParallelismInference {

private int parallelism;

HiveParallelismInference(ObjectPath tablePath, ReadableConfig flinkConf) {
HiveParallelismInference(
ObjectPath tablePath, boolean infer, int inferMaxParallelism, int parallelism) {
this.tablePath = tablePath;
this.infer = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM);
this.inferMaxParallelism =
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
Preconditions.checkArgument(
inferMaxParallelism >= 1,
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key()
+ " cannot be less than 1");

this.parallelism =
flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
this.infer = infer;
this.inferMaxParallelism = inferMaxParallelism;
this.parallelism = parallelism;
}

/**
Expand All @@ -73,7 +64,8 @@ int limit(Long limit) {

/**
* Infer parallelism by number of files and number of splits. If {@link
* HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is not set this method does nothing.
* HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is false or {@link
* HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} is none, this method does nothing.
*/
HiveParallelismInference infer(
SupplierWithException<Integer, IOException> numFiles,
Expand Down Expand Up @@ -113,4 +105,15 @@ private int logRunningTime(
result);
return result;
}

/** Factory for the {@code HiveParallelismInference}. */
interface Provider {

/**
* Creates a new {@code HiveParallelismInference}.
*
* @return a new {@code HiveParallelismInference} with designated factors.
*/
HiveParallelismInference create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.DynamicFilteringInfo;
import org.apache.flink.api.connector.source.DynamicParallelismInference;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.file.src.AbstractFileSource;
Expand All @@ -29,10 +31,12 @@
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.ContinuousPartitionFetcher;
import org.apache.flink.connector.file.table.LimitableBulkFormat;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;

Expand All @@ -55,7 +59,8 @@
* @param <T> the type of record returned by this source
*/
@PublicEvolving
public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit>
implements DynamicParallelismInference {

private static final long serialVersionUID = 1L;

Expand All @@ -68,6 +73,7 @@ public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
private final ContinuousPartitionFetcher<Partition, ?> fetcher;
private final HiveTableSource.HiveContinuousPartitionFetcherContext<?> fetcherContext;
private final ObjectPath tablePath;
private Long limit = null;

HiveSource(
Path[] inputPaths,
Expand Down Expand Up @@ -97,6 +103,9 @@ public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
this.partitionBytes = partitionBytes;
this.fetcher = fetcher;
this.fetcherContext = fetcherContext;
if (readerFormat instanceof LimitableBulkFormat) {
limit = ((LimitableBulkFormat<?, ?>) readerFormat).getLimit();
}
}

@Override
Expand Down Expand Up @@ -186,4 +195,48 @@ private boolean continuousPartitionedEnumerator() {
jobConfWrapper),
getAssignerFactory());
}

@Override
public int inferParallelism(Context dynamicParallelismContext) {
FileEnumerator fileEnumerator;
List<HiveTablePartition> partitions;
if (dynamicFilterPartitionKeys != null) {
fileEnumerator =
new HiveSourceDynamicFileEnumerator.Provider(
tablePath.getFullName(),
dynamicFilterPartitionKeys,
partitionBytes,
hiveVersion,
jobConfWrapper)
.create();
if (dynamicParallelismContext.getDynamicFilteringInfo().isPresent()) {
DynamicFilteringInfo dynamicFilteringInfo =
dynamicParallelismContext.getDynamicFilteringInfo().get();
if (dynamicFilteringInfo instanceof DynamicFilteringEvent) {
((HiveSourceDynamicFileEnumerator) fileEnumerator)
.setDynamicFilteringData(
((DynamicFilteringEvent) dynamicFilteringInfo).getData());
}
}
partitions = ((HiveSourceDynamicFileEnumerator) fileEnumerator).getFinalPartitions();
} else {
fileEnumerator = getEnumeratorFactory().create();
partitions = ((HiveSourceFileEnumerator) fileEnumerator).getPartitions();
}

return new HiveDynamicParallelismInferenceFactory(
tablePath,
jobConfWrapper.conf(),
dynamicParallelismContext.getParallelismInferenceUpperBound())
.create()
.infer(
() ->
HiveSourceFileEnumerator.getNumFiles(
partitions, jobConfWrapper.conf()),
() ->
HiveSourceFileEnumerator.createInputSplits(
0, partitions, jobConfWrapper.conf(), true)
.size())
.limit(limit);
}
}

0 comments on commit ddb5a53

Please sign in to comment.