Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/transactions #175

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
implementation group: 'io.netty', name: 'netty-transport', version: 'latest.release'
implementation group: 'io.netty', name: 'netty-codec', version: 'latest.release'
implementation group: 'io.netty', name: 'netty-handler', version: 'latest.release'
api group: 'com.h2database', name: 'h2', version: 'latest.release'

testImplementation group: 'org.mongodb', name: 'mongo-java-driver', version: 'latest.release'
testImplementation "org.mockito:mockito-core:latest.release"
Expand Down
1 change: 1 addition & 0 deletions core/gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# This file is expected to be part of source control.
ch.qos.logback:logback-classic:1.2.3=testRuntimeClasspath
ch.qos.logback:logback-core:1.2.3=testRuntimeClasspath
com.h2database:h2:1.4.200=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-buffer:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-codec:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-common:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.bwaldvogel.mongo;

import java.io.Serializable;
import java.util.concurrent.CompletionStage;

import de.bwaldvogel.mongo.backend.QueryParameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.CompletionStage;

import de.bwaldvogel.mongo.backend.MongoSession;
import de.bwaldvogel.mongo.backend.QueryResult;
import de.bwaldvogel.mongo.bson.Document;
import de.bwaldvogel.mongo.oplog.Oplog;
Expand All @@ -13,7 +14,7 @@

public interface AsyncMongoDatabase {

CompletionStage<Document> handleCommandAsync(Channel channel, String command, Document query, Oplog oplog);
CompletionStage<Document> handleCommandAsync(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession);

CompletionStage<QueryResult> handleQueryAsync(MongoQuery query);

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/de/bwaldvogel/mongo/MongoBackend.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,6 @@ default CompletionStage<Void> closeAsync() {

MongoBackend version(ServerVersion version);

void setServerAddress(String serverAddress);

}
13 changes: 12 additions & 1 deletion core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import de.bwaldvogel.mongo.backend.ArrayFilters;
import de.bwaldvogel.mongo.backend.Index;
import de.bwaldvogel.mongo.backend.MongoSession;
import de.bwaldvogel.mongo.backend.QueryParameters;
import de.bwaldvogel.mongo.backend.QueryResult;
import de.bwaldvogel.mongo.bson.Document;
Expand Down Expand Up @@ -67,6 +68,8 @@ default QueryResult handleQuery(Document query, int numberToSkip, int limit) {

QueryResult handleQuery(QueryParameters queryParameters);

QueryResult handleQuery(QueryParameters queryParameters, MongoSession mongoSession);

@Override
default CompletionStage<QueryResult> handleQueryAsync(QueryParameters queryParameters) {
return FutureUtils.wrap(() -> handleQuery(queryParameters));
Expand All @@ -81,6 +84,11 @@ default List<Document> insertDocuments(List<Document> documents) {
Document updateDocuments(Document selector, Document update, ArrayFilters arrayFilters,
boolean isMulti, boolean isUpsert, Oplog oplog);

default Document updateDocuments(Document selector, Document update, ArrayFilters arrayFilters,
boolean isMulti, boolean isUpsert, Oplog oplog, MongoSession mongoSession) {
return updateDocuments(selector, update, arrayFilters, isMulti, isUpsert, oplog, mongoSession);
}

default int deleteDocuments(Document selector, int limit) {
return deleteDocuments(selector, limit, NoopOplog.get());
}
Expand All @@ -89,6 +97,8 @@ default int deleteDocuments(Document selector, int limit) {

Document handleDistinct(Document query);

Document handleDistinct(Document query, MongoSession mongoSession);

Document getStats();

Document validate();
Expand All @@ -97,6 +107,8 @@ default int deleteDocuments(Document selector, int limit) {

int count(Document query, int skip, int limit);

int count(Document query, int skip, int limit, MongoSession mongoSession);

default boolean isEmpty() {
return count() == 0;
}
Expand All @@ -112,5 +124,4 @@ default int getNumIndexes() {
void renameTo(MongoDatabase newDatabase, String newCollectionName);

void drop();

}
9 changes: 6 additions & 3 deletions core/src/main/java/de/bwaldvogel/mongo/MongoDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import de.bwaldvogel.mongo.backend.CollectionOptions;
import de.bwaldvogel.mongo.backend.QueryResult;
import de.bwaldvogel.mongo.backend.MongoSession;
import de.bwaldvogel.mongo.bson.Document;
import de.bwaldvogel.mongo.oplog.Oplog;
import de.bwaldvogel.mongo.util.FutureUtils;
Expand All @@ -19,11 +20,11 @@ public interface MongoDatabase extends AsyncMongoDatabase {

void handleClose(Channel channel);

Document handleCommand(Channel channel, String command, Document query, Oplog oplog);
Document handleCommand(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession);

@Override
default CompletionStage<Document> handleCommandAsync(Channel channel, String command, Document query, Oplog oplog) {
return FutureUtils.wrap(() -> handleCommand(channel, command, query, oplog));
default CompletionStage<Document> handleCommandAsync(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession) {
return FutureUtils.wrap(() -> handleCommand(channel, command, query, oplog, mongoSession));
}

QueryResult handleQuery(MongoQuery query);
Expand Down Expand Up @@ -55,6 +56,8 @@ default CompletionStage<Void> handleDeleteAsync(MongoDelete delete, Oplog oplog)

void handleUpdate(MongoUpdate update, Oplog oplog);

void handleUpdate(MongoUpdate update, Oplog oplog, MongoSession mongoSession);

@Override
default CompletionStage<Void> handleUpdateAsync(MongoUpdate update, Oplog oplog) {
return FutureUtils.wrap(() -> {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/de/bwaldvogel/mongo/MongoServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public void initChannel(SocketChannel ch) throws Exception {
});

channel = bootstrap.bind().syncUninterruptibly().channel();
InetSocketAddress localAddress = getLocalAddress();
if (localAddress != null) {
backend.setServerAddress(String.format("%s:%d", localAddress.getHostName(), localAddress.getPort()));
}

log.info("started {}", this);
} catch (RuntimeException e) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/de/bwaldvogel/mongo/ServerVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

public enum ServerVersion {
MONGO_3_0(Arrays.asList(3, 0, 0), 2),
MONGO_3_6(Arrays.asList(3, 6, 0), 6);
MONGO_3_6(Arrays.asList(3, 6, 0), 6),
MONGO_4_2(Arrays.asList(4, 2, 0), 8);

private final List<Integer> versionArray;
private final int wireVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.h2.mvstore.tx.Transaction;
import org.h2.mvstore.tx.TransactionStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -52,7 +55,7 @@ public abstract class AbstractMongoBackend implements MongoBackend {

protected static final String OPLOG_COLLECTION_NAME = "oplog.rs";

static final String ADMIN_DB_NAME = "admin";
public static final String ADMIN_DB_NAME = "admin";

private final Map<String, MongoDatabase> databases = new ConcurrentHashMap<>();

Expand All @@ -64,6 +67,10 @@ public abstract class AbstractMongoBackend implements MongoBackend {
private final CursorRegistry cursorRegistry = new CursorRegistry();

protected Oplog oplog = NoopOplog.get();
private String serverAddress;

protected final ConcurrentHashMap<UUID, MongoSession> sessions = new ConcurrentHashMap<>();
protected TransactionStore transactionStore;

protected AbstractMongoBackend() {
this(defaultClock());
Expand Down Expand Up @@ -150,7 +157,7 @@ private Document getLog(String argument) {
return response;
}

private Document handleAdminCommand(String command, Document query) {
protected Document handleAdminCommand(String command, Document query) {
if (command.equalsIgnoreCase("listdatabases")) {
List<Document> databases = listDatabaseNames().stream()
.sorted()
Expand Down Expand Up @@ -201,13 +208,21 @@ private Document handleAdminCommand(String command, Document query) {
} else if (command.equalsIgnoreCase("ping")) {
return successResponse();
} else if (command.equalsIgnoreCase("endSessions")) {
log.debug("endSessions on admin database");
handleEndSessions(query);
return successResponse();
} else {
throw new NoSuchCommandException(command);
}
}

private void handleEndSessions(Document query) {
log.debug("endSessions");
ArrayList<Document> endingSessions = (ArrayList<Document>)query.get("endSessions");
endingSessions.stream().map(s -> s.get("id"))
.filter(sessions::containsKey)
.forEach(sessions::remove);
}

private static Document successResponse() {
Document response = new Document();
Utils.markOkay(response);
Expand Down Expand Up @@ -314,6 +329,12 @@ private Document handleCommandSync(Channel channel, String databaseName, String
response.put("maxWireVersion", Integer.valueOf(version.getWireVersion()));
response.put("minWireVersion", Integer.valueOf(0));
response.put("localTime", Instant.now(clock));
response.put("setName", "rs0");
response.put("hosts", Collections.singleton(serverAddress));
response.put("me", serverAddress);
response.put("primary", serverAddress);
response.put("logicalSessionTimeoutMinutes", 100);
response.put("connectionId", 21210);
Utils.markOkay(response);
return response;
} else if (command.equalsIgnoreCase("buildinfo")) {
Expand All @@ -328,6 +349,12 @@ private Document handleCommandSync(Channel channel, String databaseName, String
return handleGetMore(databaseName, command, query);
} else if (command.equalsIgnoreCase("killCursors")) {
return handleKillCursors(query);
} else if (command.equalsIgnoreCase("commitTransaction")) {
UUID sessionId = Utils.getSessionId(query);
sessions.get(sessionId).commit();
Document response = new Document("lsid", sessionId);
Utils.markOkay(response);
return response;
}
return null;
}
Expand All @@ -342,8 +369,26 @@ public Document handleCommand(Channel channel, String databaseName, String comma
if (databaseName.equals(ADMIN_DB_NAME)) {
return handleAdminCommand(command, query);
}
MongoSession mongoSession = MongoSession.NoopSession();
if (query != null) {
if ((boolean)query.getOrDefault("autocommit", true)) {
return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog, null);
}

return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog);
UUID sessionId = Utils.getSessionId(query);
if (sessionId == null) {
throw new RuntimeException("SessionId cannot be null. Make sure you are using a mongo driver version that support sessions and transactions.");
}
if (sessions.containsKey(sessionId)) {
mongoSession = sessions.get(sessionId);
} else {
Transaction transaction = transactionStore.begin();
log.info(String.format("Starting new transaction with id %d: %s", transaction.getId(), transaction.getName()));
mongoSession = new MongoSession(sessionId, transactionStore.begin());
sessions.put(sessionId, mongoSession);
}
}
return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog, mongoSession);
}

@Override
Expand Down Expand Up @@ -373,7 +418,7 @@ public CompletionStage<Document> handleCommandAsync(Channel channel, String data
return FutureUtils.wrap(() -> handleAdminCommand(command, query));
}

return resolveDatabase(database).handleCommandAsync(channel, command, query, oplog);
return resolveDatabase(database).handleCommandAsync(channel, command, query, oplog, null);
}

@Override
Expand Down Expand Up @@ -498,6 +543,11 @@ public void dropDatabase(String databaseName) {
}
}

@Override
public void setServerAddress(String serverAddress) {
this.serverAddress = serverAddress;
}

@Override
public void handleClose(Channel channel) {
for (MongoDatabase db : databases.values()) {
Expand Down