You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
publicvoidprocessElement(StreamRecord<IN> element) throwsException {
// 通过windowAssiger分配windowfinalCollection<W> elementWindows =
windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
// if element is handled by none of assigned elementWindowsbooleanisSkippedElement = true;
finalKkey = this.<K>getKeyedStateBackend().getCurrentKey();
// 如果是merge窗口则进行合并操作if (windowAssignerinstanceofMergingWindowAssigner) {
// 获取merge windowMergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (Wwindow : elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we don't merge then// actualWindow == windowWactualWindow =
// 当前window加入merge window集合mergingWindows.addWindow(
window,
newMergingWindowSet.MergeFunction<W>() {
// window merge核心逻辑@Overridepublicvoidmerge(
WmergeResult,
Collection<W> mergedWindows,
WstateWindowResult,
Collection<W> mergedStateWindows)
throwsException {
// eventTime并且需要合并的window的最大时间戳+运行延迟的时间小于等于当前的watermark,如果大于则可以进行输出 无法合并if ((windowAssigner.isEventTime()
&& mergeResult.maxTimestamp() + allowedLateness
<= internalTimerService
.currentWatermark())) {
thrownewUnsupportedOperationException(
"The end timestamp of an "
+ "event-time window cannot become earlier than the current watermark "
+ "by merging. Current watermark: "
+ internalTimerService
.currentWatermark()
+ " window: "
+ mergeResult);
// 如果不是eventTime
} elseif (!windowAssigner.isEventTime()) {
longcurrentProcessingTime =
internalTimerService.currentProcessingTime();
// 判断是否在process window范围if (mergeResult.maxTimestamp()
<= currentProcessingTime) {
thrownewUnsupportedOperationException(
"The end timestamp of a "
+ "processing-time window cannot become earlier than the current processing time "
+ "by merging. Current processing time: "
+ currentProcessingTime
+ " window: "
+ mergeResult);
}
}
// 更新triggerContext信息,将key和合并的window已经mergewindow集合进行操作triggerContext.key = key;
triggerContext.window = mergeResult;
triggerContext.onMerge(mergedWindows);
for (Wm : mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting// state windowwindowMergingState.mergeNamespaces(
stateWindowResult, mergedStateWindows);
}
});
// drop if the window is already late 删除延迟的windowif (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
isSkippedElement = false;
WstateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
thrownewIllegalStateException(
"Window " + window + " is not in in-flight window set.");
}
// 更新状态windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = actualWindow;
// 放入触发器获取TriggerResultTriggerResulttriggerResult = triggerContext.onElement(element);
// 输出当前窗口数据if (triggerResult.isFire()) {
ACCcontents = windowState.get();
if (contents == null) {
continue;
}
// 数据调用windowFunctionemitWindowContents(actualWindow, contents);
}
// 清空当前窗口状态if (triggerResult.isPurge()) {
windowState.clear();
}
// 注册Trigger Window clean定时器registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in statemergingWindows.persist();
// 如果不是merge window
} else {
for (Wwindow : elementWindows) {
// merge后的逻辑类似// drop if the window is already lateif (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResulttriggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACCcontents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
}
// side output input event if// element not handled by any window// late arriving tag has been set// windowAssigner is event time and current timestamp + allowed lateness no less than// element timestamp// 延迟数据旁路输出if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null) {
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
获取element归属的windows
获取element对应的key
如果late data,跳过
将element存入window state
判断element是否触发trigger
获取window state,注入window function
清除window state
注册timer,到窗口结束时间清理window
Window State
ListState
process()/evitor()
全量状态计算
/** The state that holds the merging window metadata (the sets that describe what is merged). */privatetransientInternalListState<K, VoidNamespace, Tuple2<W, W>> mergingSetsState;
publicvoidprocessElement(StreamRecord<IN> element) throwsException {
// 获取元素窗口集合finalCollection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindowsbooleanisSkippedElement = true;
finalKkey = this.<K>getKeyedStateBackend().getCurrentKey();
// 如果为合并窗口if (windowAssignerinstanceofMergingWindowAssigner) {
// 获取MergingWindowSet<W> mergingWindows = getMergingWindowSet();
// 遍历窗口for (Wwindow: elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we don't merge then// actualWindow == window// 添加窗口WactualWindow = mergingWindows.addWindow(window, newMergingWindowSet.MergeFunction<W>() {
/** * merge函数 * @param mergeResult The newly resulting merged {@code Window}. * @param mergedWindows The merged {@code Window Windows}. * @param stateWindowResult The state window of the merge result. * @param mergedStateWindows The merged state windows. * @throws Exception */@Overridepublicvoidmerge(WmergeResult,
Collection<W> mergedWindows, WstateWindowResult,
Collection<W> mergedStateWindows) throwsException {
// 如果是事件时间,获取最大时间+延迟时间如果小于等于watermark则抛出一次,窗口的最新时间不能低于当前watermark在合并的时候 Evicting与正常窗口相反,过滤掉小于wm的数据if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
thrownewUnsupportedOperationException("The end timestamp of an " +
"event-time window cannot become earlier than the current watermark " +
"by merging. Current watermark: " + internalTimerService.currentWatermark() +
" window: " + mergeResult);
} elseif (!windowAssigner.isEventTime()
&& mergeResult.maxTimestamp()
<= internalTimerService
.currentProcessingTime()) {
thrownewUnsupportedOperationException(
"The end timestamp of a "
+ "processing-time window cannot become earlier than the current processing time "
+ "by merging. Current processing time: "
+ internalTimerService
.currentProcessingTime()
+ " window: "
+ mergeResult);
}
}
triggerContext.key = key;
triggerContext.window = mergeResult;
triggerContext.onMerge(mergedWindows);
for (Wm: mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
// 合并窗口,删除需要输出和清空的窗口deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state windowwindowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
// drop if the window is already late// 移除合并窗口if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
isSkippedElement = false;
WstateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
thrownewIllegalStateException("Window " + window + " is not in in-flight window set.");
}
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = actualWindow;
// 处理元素TriggerResulttriggerResult = triggerContext.onElement(element);
// 如果结果为输出if (triggerResult.isFire()) {
// 获取最窗口内容ACCcontents = windowState.get();
if (contents == null) {
continue;
}
// 输出窗口,数据放入udf,收集时间戳emitWindowContents(actualWindow, contents);
}
// 清空窗口if (triggerResult.isPurge()) {
windowState.clear();
}
// 注册清空时间器,用于清空窗口内容registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in statemergingWindows.persist();
} else {
for (Wwindow: elementWindows) {
// drop if the window is already lateif (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResulttriggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACCcontents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
// 输出并清空registerCleanupTimer(window);
}
}
// side output input event if// element not handled by any window// late arriving tag has been set// windowAssigner is event time and current timestamp + allowed lateness no less than element timestampif (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
registerCleanupTimer注册的定时器
// timer触发逻辑publicvoidonEventTime(InternalTimer<K, W> timer) throwsException {
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
evictorContext.key = timer.getKey();
evictorContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows = null;
if (windowAssignerinstanceofMergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
WstateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a// trigger did not clean up timers. We have already cleared the merging// window and therefore the Trigger state, however, so nothing to do.return;
} else {
evictingWindowState.setCurrentNamespace(stateWindow);
}
} else {
evictingWindowState.setCurrentNamespace(triggerContext.window);
}
TriggerResulttriggerResult = triggerContext.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents != null) {
emitWindowContents(triggerContext.window, contents, evictingWindowState);
}
}
// 清理状态if (triggerResult.isPurge()) {
evictingWindowState.clear();
}
// 清理全部状态if (windowAssigner.isEventTime()
&& isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
}
if (mergingWindows != null) {
// need to make sure to update the merging state in statemergingWindows.persist();
}
}