Skip to content

Commit

Permalink
[FLINK-34549][API] Implement applyToAllPartitions for non-partitioned…
Browse files Browse the repository at this point in the history
… context
  • Loading branch information
reswqa committed May 15, 2024
1 parent ce1d119 commit 90ebaa4
Show file tree
Hide file tree
Showing 22 changed files with 744 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,61 @@

package org.apache.flink.datastream.impl.context;

import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.JobInfo;
import org.apache.flink.datastream.api.context.NonPartitionedContext;
import org.apache.flink.datastream.api.context.TaskInfo;
import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
import org.apache.flink.metrics.MetricGroup;

import java.util.Set;

/** The default implementation of {@link NonPartitionedContext}. */
public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext<OUT> {
private final DefaultRuntimeContext context;

public DefaultNonPartitionedContext(DefaultRuntimeContext context) {
private final DefaultPartitionedContext partitionedContext;

private final Collector<OUT> collector;

private final boolean isKeyed;

private final Set<Object> keySet;

public DefaultNonPartitionedContext(
DefaultRuntimeContext context,
DefaultPartitionedContext partitionedContext,
Collector<OUT> collector,
boolean isKeyed,
Set<Object> keySet) {
this.context = context;
this.partitionedContext = partitionedContext;
this.collector = collector;
this.isKeyed = isKeyed;
this.keySet = keySet;
}

@Override
public void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction) {
// TODO implements this method.
public void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction)
throws Exception {
if (isKeyed) {
for (Object key : keySet) {
partitionedContext
.getStateManager()
.executeInKeyContext(
() -> {
try {
applyPartitionFunction.apply(collector, partitionedContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
key);
}
} else {
// non-keyed operator has only one partition.
applyPartitionFunction.apply(collector, partitionedContext);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,69 @@

package org.apache.flink.datastream.impl.context;

import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.JobInfo;
import org.apache.flink.datastream.api.context.TaskInfo;
import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
import org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction;
import org.apache.flink.metrics.MetricGroup;

import java.util.Set;

/** The default implementation of {@link TwoOutputNonPartitionedContext}. */
public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
implements TwoOutputNonPartitionedContext<OUT1, OUT2> {
private final DefaultRuntimeContext context;
protected final DefaultRuntimeContext context;

private final DefaultPartitionedContext partitionedContext;

protected final Collector<OUT1> firstCollector;

protected final Collector<OUT2> secondCollector;

private final boolean isKeyed;

private final Set<Object> keySet;

public DefaultTwoOutputNonPartitionedContext(DefaultRuntimeContext context) {
public DefaultTwoOutputNonPartitionedContext(
DefaultRuntimeContext context,
DefaultPartitionedContext partitionedContext,
Collector<OUT1> firstCollector,
Collector<OUT2> secondCollector,
boolean isKeyed,
Set<Object> keySet) {
this.context = context;
this.partitionedContext = partitionedContext;
this.firstCollector = firstCollector;
this.secondCollector = secondCollector;
this.isKeyed = isKeyed;
this.keySet = keySet;
}

@Override
public void applyToAllPartitions(
TwoOutputApplyPartitionFunction<OUT1, OUT2> applyPartitionFunction) {
// TODO implements this method.
TwoOutputApplyPartitionFunction<OUT1, OUT2> applyPartitionFunction) throws Exception {
if (isKeyed) {
for (Object key : keySet) {
partitionedContext
.getStateManager()
.executeInKeyContext(
() -> {
try {
applyPartitionFunction.apply(
firstCollector,
secondCollector,
partitionedContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
key);
}
} else {
// non-keyed operator has only one partition.
applyPartitionFunction.apply(firstCollector, secondCollector, partitionedContext);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,36 @@
package org.apache.flink.datastream.impl.operators;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.context.NonPartitionedContext;
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
import org.apache.flink.datastream.impl.common.OutputCollector;
import org.apache.flink.datastream.impl.common.TimestampCollector;
import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import javax.annotation.Nullable;

import java.util.HashSet;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Operator for {@link OneInputStreamProcessFunction} in {@link KeyedPartitionStream}. */
public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, OUT>
implements Triggerable<KEY, VoidNamespace> {
private transient InternalTimerService<VoidNamespace> timerService;

private transient Set<Object> keySet;

@Nullable private final KeySelector<OUT, KEY> outKeySelector;

public KeyedProcessOperator(OneInputStreamProcessFunction<IN, OUT> userFunction) {
Expand All @@ -56,6 +66,7 @@ public KeyedProcessOperator(
public void open() throws Exception {
this.timerService =
getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
this.keySet = new HashSet<>();
super.open();
}

Expand Down Expand Up @@ -95,4 +106,24 @@ public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exc
protected ProcessingTimeManager getProcessingTimeManager() {
return new DefaultProcessingTimeManager(timerService);
}

@Override
protected NonPartitionedContext<OUT> getNonPartitionedContext() {
return new DefaultNonPartitionedContext<>(
context, partitionedContext, outputCollector, true, keySet);
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void setKeyContextElement1(StreamRecord record) throws Exception {
setKeyContextElement(record, getStateKeySelector1());
}

private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)
throws Exception {
checkNotNull(selector);
Object key = selector.getKey(record.getValue());
setCurrentKey(key);
keySet.add(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,35 @@
package org.apache.flink.datastream.impl.operators;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.context.NonPartitionedContext;
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
import org.apache.flink.datastream.impl.common.OutputCollector;
import org.apache.flink.datastream.impl.common.TimestampCollector;
import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import javax.annotation.Nullable;

import java.util.HashSet;
import java.util.Set;

/** Operator for {@link TwoInputBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}. */
public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, IN2, OUT>
extends TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
implements Triggerable<KEY, VoidNamespace> {
private transient InternalTimerService<VoidNamespace> timerService;

private transient Set<Object> keySet;

@Nullable private final KeySelector<OUT, KEY> outKeySelector;

public KeyedTwoInputBroadcastProcessOperator(
Expand All @@ -58,6 +66,7 @@ public KeyedTwoInputBroadcastProcessOperator(
public void open() throws Exception {
this.timerService =
getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
this.keySet = new HashSet<>();
super.open();
}

Expand Down Expand Up @@ -96,4 +105,27 @@ public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exc
partitionedContext),
timer.getKey());
}

@Override
protected NonPartitionedContext<OUT> getNonPartitionedContext() {
return new DefaultNonPartitionedContext<>(
context, partitionedContext, collector, true, keySet);
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
// Only element from input1 should be considered as the other side is broadcast input.
public void setKeyContextElement1(StreamRecord record) throws Exception {
setKeyContextElement(record, getStateKeySelector1());
}

private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)
throws Exception {
if (selector == null) {
return;
}
Object key = selector.getKey(record.getValue());
setCurrentKey(key);
keySet.add(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,27 @@
package org.apache.flink.datastream.impl.operators;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.context.NonPartitionedContext;
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
import org.apache.flink.datastream.impl.common.OutputCollector;
import org.apache.flink.datastream.impl.common.TimestampCollector;
import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import javax.annotation.Nullable;

import java.util.HashSet;
import java.util.Set;

/**
* Operator for {@link TwoInputNonBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}.
*/
Expand All @@ -42,6 +48,8 @@ public class KeyedTwoInputNonBroadcastProcessOperator<KEY, IN1, IN2, OUT>
implements Triggerable<KEY, VoidNamespace> {
private transient InternalTimerService<VoidNamespace> timerService;

private transient Set<Object> keySet;

@Nullable private final KeySelector<OUT, KEY> outKeySelector;

public KeyedTwoInputNonBroadcastProcessOperator(
Expand All @@ -60,6 +68,7 @@ public KeyedTwoInputNonBroadcastProcessOperator(
public void open() throws Exception {
this.timerService =
getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
this.keySet = new HashSet<>();
super.open();
}

Expand Down Expand Up @@ -98,4 +107,32 @@ public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exc
partitionedContext),
timer.getKey());
}

@Override
protected NonPartitionedContext<OUT> getNonPartitionedContext() {
return new DefaultNonPartitionedContext<>(
context, partitionedContext, collector, true, keySet);
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void setKeyContextElement1(StreamRecord record) throws Exception {
setKeyContextElement(record, getStateKeySelector1());
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void setKeyContextElement2(StreamRecord record) throws Exception {
setKeyContextElement(record, getStateKeySelector2());
}

private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)
throws Exception {
if (selector == null) {
return;
}
Object key = selector.getKey(record.getValue());
setCurrentKey(key);
keySet.add(key);
}
}

0 comments on commit 90ebaa4

Please sign in to comment.