Skip to content

Commit

Permalink
Merge pull request #1905 from tapdata/merge-develop-into-main
Browse files Browse the repository at this point in the history
Merge develop into main
  • Loading branch information
jackin-code committed May 8, 2024
2 parents 7f9d385 + eee5614 commit 568554f
Show file tree
Hide file tree
Showing 90 changed files with 5,593 additions and 1,333 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/mr-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,18 @@ jobs:
run: |
apt install -y jq
- name: SonarQube Quality Gate check
id: sonar
uses: sonarsource/sonarqube-quality-gate-action@master
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
with:
scanMetadataReportFile: tapdata/target/sonar/report-task.txt
- name: Send SonarQube Quality Gate to Pr Comment
if: ${{ always() && steps.sonar.outcome == 'failure' }}
run: |
cd tapdata/build/ && bash check_sonarqube.sh --project-key=tapdata --branch=${{ env.OPENSOURCE_BRANCH }} \
--sonar-token=${{ secrets.SONAR_TOKEN }} --github-token=${{ secrets.TAPDATA_ENT_CICD_TOKEN }} \
--repo=tapdata --pr-number=${{ github.event.pull_request.number }}
Build-And-Deploy:
if: ${{ github.event_name == 'schedule' || inputs.mrci_run }}
Expand Down
90 changes: 90 additions & 0 deletions build/check_sonarqube.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#!/bin/bash

. log.sh

BASE_URI='http://58.251.34.123:29000'
PROJECT_KEY=''
BRANCH=''
GITHUB_TOKEN=""
OWNER="tapdata"
REPO=""
PR_NUMBER=

which jq > /dev/null
if [[ $? -ne 0 ]]; then
error "jq is not install."
fi

for arg in "$@"
do
case $arg in
--project-key=*)
PROJECT_KEY="${arg#*=}"
shift
;;
--branch=*)
BRANCH="${arg#*=}"
shift
;;
--sonar-token=*)
SONAR_TOKEN="${arg#*=}"
shift
;;
--github-token=*)
GITHUB_TOKEN="${arg#*=}"
shift
;;
--repo=*)
REPO="${arg#*=}"
shift
;;
--pr-number=*)
PR_NUMBER="${arg#*=}"
shift
;;
esac
done

if [[ -z $PROJECT_KEY ]]; then
error 'Project Key is not set.'
fi

if [[ -z $BRANCH ]]; then
error 'Branch is not set.'
fi

if [[ -z $SONAR_TOKEN ]]; then
error 'variable $SONAR_TOKEN is not set.'
fi

info "Get Sonar Scan Result"
result=$(curl -L "$BASE_URI/api/qualitygates/project_status?projectKey=$PROJECT_KEY&branch=$BRANCH" -u "$SONAR_TOKEN:" 2>/dev/null)

QUALITY_GATE_STATUS=$(echo $result | jq -r .projectStatus.status)
CONDITIONS=$(echo $result | jq -c .projectStatus.conditions[])

COMMENT="SonarQube Quality Gate Status: **$QUALITY_GATE_STATUS**\n\n"

if [[ $QUALITY_GATE_STATUS == "ERROR" ]]; then
warn "Quality Gate Status: $QUALITY_GATE_STATUS"
for condition in $CONDITIONS
do
status=$(echo "$condition" | jq -r '.status')
metricKey=$(echo "$condition" | jq -r '.metricKey')
actualValue=$(echo "$condition" | jq -r '.actualValue')
if [[ $status == "ERROR" ]]; then
warn "Status: $status, MetricKey: $metricKey, ActualValue: $actualValue"
COMMENT+="- Status: **$status**, MetricKey: **$metricKey**, ActualValue: **$actualValue**\n"
fi
done
COMMENT+="\n\nSee Sonar Scan Result at: $BASE_URI/dashboard?branch=$BRANCH&id=$PROJECT_KEY"
info "Send message to Github Pr Comment"
if [[ -z $PR_NUMBER ]]; then
warn "variable PR_NUMBER is not set, sending termination."
else
curl -s -H "Authorization: token $GITHUB_TOKEN" -X POST -d "{\"body\": \"$COMMENT\"}" "https://api.github.com/repos/$OWNER/$REPO/issues/$PR_NUMBER/comments" > /dev/null
if [[ $? -ne 0 ]]; then
error "Send message to Github Pr Comment Failed"
fi
fi
fi
2 changes: 1 addition & 1 deletion build/image/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ wait_tm_start() {
local seconds_left=$((timeout - SECONDS))
printf "\r* Wait Starting, Left %02d / 300 Seconds..." "$seconds_left"
sleep 1
curl "http://localhost:3000" &> /dev/null
curl --fail "http://localhost:3000" &> /dev/null
if [[ $? -ne 0 ]]; then
continue
else
Expand Down
5 changes: 4 additions & 1 deletion iengine/build/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ ulimit -c unlimited
sbin_file="tapdata-agent.jar"
export app_type=DAAS
export backend_url=http://127.0.0.1:3000/api/
export TAPDATA_MONGO_URI='mongodb://127.0.0.1:27017/tapdata?authSource=admin'
if [[ -z $MONGO_URI ]]; then
MONGO_URI="mongodb://127.0.0.1:27017/tapdata"
fi
export TAPDATA_MONGO_URI=$MONGO_URI

mkdir -p logs/iengine && touch logs/iengine/$sbin_file.log
nohup java -jar components/$sbin_file &> logs/iengine/$sbin_file.log &
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.tapdata.utils.AppType;
import org.apache.commons.lang3.StringUtils;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -57,6 +58,7 @@ public void report(boolean isExit) {
value.put("usedMemory", usedMemory);
value.put("metricValues", connectorManager.getMetricValues());
value.put("worker_type", ConnectorConstant.WORKER_TYPE_CONNECTOR);
value.put("worker_date", new Date());

connectorManager.setPlatformInfo(value);
configIfNotBlank("version", value::put);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;

Expand All @@ -82,6 +79,7 @@ public abstract class HazelcastPdkBaseNode extends HazelcastDataBaseNode {

protected Integer readBatchSize;
protected Integer increaseReadSize;
private static final String DOUBLE_ACTIVE = "doubleActive";
protected TapRecordSkipDetector skipDetector;
private PdkStateMap pdkStateMap;

Expand Down Expand Up @@ -191,14 +189,7 @@ protected void createPdkConnectorNode(DataProcessorContext dataProcessorContext,
Node<?> node = dataProcessorContext.getNode();
ConnectorCapabilities connectorCapabilities = ConnectorCapabilities.create();
initDmlPolicy(node, connectorCapabilities);
Map<String, Object> nodeConfig = null;
if (node instanceof TableNode) {
nodeConfig = ((TableNode) node).getNodeConfig();
} else if (node instanceof DatabaseNode) {
nodeConfig = ((DatabaseNode) node).getNodeConfig();
} else if (node instanceof LogCollectorNode) {
nodeConfig = ((LogCollectorNode) node).getNodeConfig();
}
Map<String, Object> nodeConfig = generateNodeConfig(node, taskDto.getDoubleActive());
this.associateId = ConnectorNodeService.getInstance().putConnectorNode(
PdkUtil.createNode(taskDto.getId().toHexString(),
databaseType,
Expand All @@ -219,6 +210,22 @@ protected void createPdkConnectorNode(DataProcessorContext dataProcessorContext,
AspectUtils.executeAspect(PDKNodeInitAspect.class, () -> new PDKNodeInitAspect().dataProcessorContext((DataProcessorContext) processorBaseContext));
}

protected Map<String, Object> generateNodeConfig(Node<?> node, Boolean doubleActive) {
Map<String, Object> nodeConfig = null;
if (node instanceof TableNode) {
nodeConfig = ((TableNode) node).getNodeConfig();
} else if (node instanceof DatabaseNode) {
nodeConfig = ((DatabaseNode) node).getNodeConfig();
} else if (node instanceof LogCollectorNode) {
nodeConfig = ((LogCollectorNode) node).getNodeConfig();
}
if (null == nodeConfig) {
nodeConfig = new HashMap<>();
}
nodeConfig.put(DOUBLE_ACTIVE, doubleActive);
return nodeConfig;
}

protected void initDmlPolicy(Node<?> node, ConnectorCapabilities connectorCapabilities) {
if (node instanceof DataParentNode && null != ((DataParentNode<?>) node).getDmlPolicy()) {
DmlPolicy dmlPolicy = ((DataParentNode<?>) node).getDmlPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.tapdata.tm.commons.dag.nodes.CacheNode;
import com.tapdata.tm.commons.dag.nodes.TableNode;
import com.tapdata.tm.commons.task.dto.TaskDto;
import com.tapdata.tm.utils.TimeTransFormationUtil;
import io.tapdata.Runnable.LoadSchemaRunner;
import io.tapdata.aspect.*;
import io.tapdata.aspect.taskmilestones.*;
Expand Down Expand Up @@ -412,9 +413,9 @@ protected void doSnapshot(List<String> tableList) {
obsLogger.info("Execute result is null");
return;
}
List<Map<String, Object>> maps = (List<Map<String, Object>>) executeResult.getResult();
List<TapEvent> events = maps.stream().map(m -> TapSimplify.insertRecordEvent(m, tableName)).collect(Collectors.toList());
consumer.accept(events, null);

Object result= executeResult.getResult();
handleCustomCommandResult(result,tableName,consumer);
});
} else {
batchReadFunction.batchRead(connectorNode.getConnectorContext(), tapTable, tableOffset, readBatchSize, consumer);
Expand Down Expand Up @@ -466,6 +467,16 @@ protected void doSnapshot(List<String> tableList) {
executeAspect(new SnapshotReadEndAspect().dataProcessorContext(dataProcessorContext));
}

private void handleCustomCommandResult(Object result, String tableName, BiConsumer<List<TapEvent>, Object> consumer){
if (result instanceof List) {
List<Map<String, Object>> maps = (List<Map<String, Object>>) result;
List<TapEvent> events = maps.stream().map(m -> TapSimplify.insertRecordEvent(m, tableName)).collect(Collectors.toList());
consumer.accept(events, null);
}else {
obsLogger.info("The execution result is:{}, because the result is not a list it will be ignored.",result);
}
}

private void createTargetIndex(List<String> updateConditionFields, boolean createUnique, String tableId, TapTable tapTable) {
CreateIndexFunction createIndexFunction = getConnectorNode().getConnectorFunctions().getCreateIndexFunction();
if (null == createIndexFunction) {
Expand Down Expand Up @@ -1195,7 +1206,7 @@ private TapAdvanceFilter batchFilterRead() {
TableNode tableNode = (TableNode) dataProcessorContext.getNode();
TapAdvanceFilter tapAdvanceFilter = new TapAdvanceFilter();
if (isTableFilter(tableNode)) {
List<QueryOperator> conditions = tableNode.getConditions();
List<QueryOperator> conditions = timeTransformation(tableNode.getConditions(),tableNode.getOffsetHours());
if (CollectionUtils.isNotEmpty(conditions)) {
String tableName = tableNode.getTableName();
TapTable tapTable = dataProcessorContext.getTapTableMap().get(tableName);
Expand Down Expand Up @@ -1240,6 +1251,38 @@ private TapAdvanceFilter batchFilterRead() {
return tapAdvanceFilter;
}

protected List<QueryOperator> timeTransformation(List<QueryOperator> conditions,Long offsetHours){
List<QueryOperator> finalConditions = new ArrayList<>();
for(QueryOperator queryOperator : conditions){
LocalDateTime currentDateTime = LocalDateTime.now();
if(queryOperator.isFastQuery()){
List<String> timeList = TimeTransFormationUtil.calculatedTimeRange(currentDateTime,queryOperator,offsetHours);
List<QueryOperator> result = constructQueryOperator(timeList,queryOperator);
if(CollectionUtils.isNotEmpty(result))finalConditions.addAll(result);
}else{
finalConditions.add(queryOperator);
}
}
return finalConditions;
}

protected List<QueryOperator> constructQueryOperator(List<String> timeList,QueryOperator queryOperator){
List<QueryOperator> result = new ArrayList<>();
if(CollectionUtils.isEmpty(timeList))return result;
QueryOperator start = new QueryOperator();
start.setKey(queryOperator.getKey());
start.setOperator(QueryOperator.GTE);
start.setValue(timeList.get(0));
QueryOperator end = new QueryOperator();
end.setKey(queryOperator.getKey());
end.setOperator(QueryOperator.LTE);
end.setValue(timeList.get(1));
result.add(start);
result.add(end);
return result;
}


protected void excludeRemoveTable(List<String> tableNames) {
if (null == tableNames) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,12 @@ public List<String> apply(TapEvent tapEvent) {
}
}

private boolean isCDCConcurrent(Boolean cdcConcurrent) {
protected boolean isCDCConcurrent(Boolean cdcConcurrent) {
cdcConcurrent = cdcConcurrent && cdcConcurrentWriteNum > 1;
List<? extends Node<?>> predecessors = getNode().predecessors();
for (Node<?> predecessor : predecessors) {
if (predecessor instanceof MergeTableNode) {
obsLogger.info("CDC concurrent write is disabled because the node has a merge table node");
if (predecessor instanceof MergeTableNode || predecessor instanceof UnwindProcessNode) {
obsLogger.info("CDC concurrent write is disabled because the node has a merge table node or unwind process node");
cdcConcurrent = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,7 @@ protected TapdataEvent processDML(TapdataEvent tapdataEvent, AtomicBoolean singl
if (toSingleMode(tapEvent, partitionValue, singleMode)) {
partition = 0;
} else {
final List<Object> partitionOriginalValues = keySelector.convert2OriginValue(partitionValue);
final PartitionResult<TapdataEvent> partitionResult = partitioner.partition(partitionSize, tapdataEvent, partitionOriginalValues);
final PartitionResult<TapdataEvent> partitionResult = partitioner.partition(partitionSize, tapdataEvent, partitionValue);
partition = partitionResult.getPartition() < 0 ? DEFAULT_PARTITION : partitionResult.getPartition();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public List<Object> select(TapEvent tapEvent, Map<String, Object> row) {
getPartitionValue(partitionValue, keys, row);
}

if (null != partitionValue && !CollectionUtils.isEmpty(partitionValue)) {
partitionValue = convert2OriginValue(partitionValue);
}
return partitionValue;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package io.tapdata.flow.engine.V2.node.hazelcast.processor;

import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

import com.tapdata.entity.TapdataEvent;
import com.tapdata.entity.task.context.ProcessorBaseContext;
import com.tapdata.tm.commons.dag.ArrayModel;
import com.tapdata.tm.commons.dag.Node;
import com.tapdata.tm.commons.dag.UnwindModel;
import com.tapdata.tm.commons.dag.process.UnwindProcessNode;
import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.event.dml.TapDeleteRecordEvent;
import io.tapdata.entity.event.dml.TapRecordEvent;
import io.tapdata.exception.TapCodeException;
import io.tapdata.flow.engine.V2.node.hazelcast.processor.unwind.EventHandel;
import io.tapdata.flow.engine.V2.node.hazelcast.processor.unwind.UnWindNodeUtil;
import io.tapdata.flow.engine.V2.util.TapEventUtil;
import lombok.SneakyThrows;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -56,9 +62,7 @@ protected void tryProcess(TapdataEvent tapdataEvent, BiConsumer<TapdataEvent, Pr
}

List<TapEvent> eventList = EventHandel.getHandelResult(node, tapdataEvent.getTapEvent());
if (null == eventList || eventList.isEmpty()) {
consumer.accept(tapdataEvent, processResult);
} else {
if (CollectionUtils.isNotEmpty(eventList)) {
for (TapEvent e : eventList) {
TapdataEvent cloneTapdataEvent = (TapdataEvent) tapdataEvent.clone();
cloneTapdataEvent.setTapEvent(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,8 @@ public void copyEvent(List<TapEvent> events, Map<String, Object> item, TapEvent
public List<TapEvent> handel(UnwindProcessNode node, TapEvent event){
List<TapEvent> events = new ArrayList<>();
Map<String, Object> after = UnWindNodeUtil.getAfter(event);
Map<String, Object> before = UnWindNodeUtil.getBefore(event);
Long referenceTime = ((TapUpdateRecordEvent) event).getReferenceTime();
TapDeleteRecordEvent delete = TapDeleteRecordEvent.create();
if (null == before || before.isEmpty()) {
delete.before(after);
} else {
delete.before(before);
}
delete.referenceTime(referenceTime);
List<TapEvent> deletes = EventHandel.getHandelResult(node, delete);
List<TapEvent> deletes = EventHandel.getHandelResult(node, UnWindNodeUtil.genericDeleteEvent(event));
if (null != deletes) {
events.addAll(deletes);
}
Expand Down

0 comments on commit 568554f

Please sign in to comment.