Skip to content

Commit

Permalink
Merge pull request #1567 from tapdata/sam/fix-develop-150260
Browse files Browse the repository at this point in the history
Sam/fix develop 150260
  • Loading branch information
jackin-code committed Dec 5, 2023
2 parents b0ed4b9 + 2332cb2 commit f253077
Show file tree
Hide file tree
Showing 11 changed files with 1,378 additions and 517 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ public ExactlyOnceWriteCleanerStateMap(String nodeId, HazelcastInstance hazelcas
super(nodeId, hazelcastInstance);
}

public ExactlyOnceWriteCleanerStateMap(String nodeId, HazelcastInstance hazelcastInstance, TaskDto taskDto, Node<?> node, ClientMongoOperator clientMongoOperator, String func) {
super(nodeId, hazelcastInstance, taskDto, node, clientMongoOperator, func);
public ExactlyOnceWriteCleanerStateMap(HazelcastInstance hazelcastInstance, Node<?> node) {
super(hazelcastInstance, node);
}

@Override
Expand Down Expand Up @@ -250,11 +250,6 @@ public void reset() {
public Object get(String key) {
return map.get(key);
}

@Override
public DocumentIMap<Document> getConstructIMap() {
return null;
}
}

private static class ExactlyOnceWriteCleanerLog implements Log {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ protected void createPdkConnectorNode(DataProcessorContext dataProcessorContext,
Map<String, Object> connectionConfig = dataProcessorContext.getConnectionConfig();
DatabaseTypeEnum.DatabaseType databaseType = dataProcessorContext.getDatabaseType();
PdkTableMap pdkTableMap = new PdkTableMap(dataProcessorContext.getTapTableMap());
PdkStateMap pdkStateMap = new PdkStateMap(dataProcessorContext.getNode().getId(), hazelcastInstance, taskDto, getNode(), clientMongoOperator, "processor");
PdkStateMap pdkStateMap = new PdkStateMap(hazelcastInstance, getNode());
PdkStateMap globalStateMap = PdkStateMap.globalStateMap(hazelcastInstance);
Node<?> node = dataProcessorContext.getNode();
ConnectorCapabilities connectorCapabilities = ConnectorCapabilities.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private void dataNodeDestroy(Node<?> node) {
databaseType = ConnectionUtil.getDatabaseType(clientMongoOperator, connections.getPdkHash());
if (null == databaseType) return;
Connections finalConnections = connections;
PdkStateMap pdkStateMap = new PdkStateMap(node.getId(), HazelcastTaskService.getHazelcastInstance(), taskDto, node, clientMongoOperator, "processor");
PdkStateMap pdkStateMap = new PdkStateMap(HazelcastTaskService.getHazelcastInstance(), node);
PdkStateMap globalStateMap = PdkStateMap.globalStateMap(HazelcastTaskService.getHazelcastInstance());
PdkUtil.downloadPdkFileIfNeed((HttpClientMongoOperator) clientMongoOperator,
databaseType.getPdkHash(), databaseType.getJarFile(), databaseType.getJarRid());
Expand Down
25 changes: 24 additions & 1 deletion iengine/iengine-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,32 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<artifactId>mockito-core</artifactId>
<version>4.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>4.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>4.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>1.12.23</version>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-agent</artifactId>
<version>1.12.23</version>
</dependency>

<dependency>
<groupId>com.github.jsqlparser</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ public void clear() throws Exception {

@Override
public boolean isEmpty() {
if (null == this.iMap) {
return true;
}
return this.iMap.isEmpty();
return PersistenceStorage.getInstance().isEmpty(ConstructType.IMAP, name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.tapdata.error;

import io.tapdata.exception.TapExClass;
import io.tapdata.exception.TapExCode;

/**
* @author samuel
* @Description
* @create 2023-11-30 15:59
**/
@TapExClass(code = 28, module = "Pdk State Map", prefix = "PSM")
public interface PdkStateMapExCode_28 {
@TapExCode
String UNKNOWN_ERROR = "28001";
@TapExCode(
describe = "Write Tap Info into PDK State Map failed"
)
String INSERT_TAPDATA_INFO_FAILED = "28002";
@TapExCode(describe = "Init PDK state map failed")
String INIT_PDK_STATE_MAP_FAILED = "28003";
}

0 comments on commit f253077

Please sign in to comment.