Skip to content
This repository has been archived by the owner on Jan 12, 2020. It is now read-only.

Apply patch from Christian Weisskopf #79

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion barge-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.10</version>
<version>2.13</version>
</plugin>
</plugins>
</reporting>
Expand All @@ -112,6 +112,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.7.2.201409121644</version>
<executions>
<execution>
<id>jacoco-initialize</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class BatchExecutor extends BatchExecutorImpl {
@Override
public void execute(EventReader toExecute) {

Optional<Map> oldContext = Optional.fromNullable(MDC.getCopyOfContextMap());
Optional<Map<String, String>> oldContext = Optional.fromNullable(MDC.getCopyOfContextMap());
MDC.setContextMap(contextMap);

super.execute(toExecute);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package org.robotninjas.barge;

import com.google.common.base.Optional;

public class NotLeaderException extends RaftException {

private final Replica leader;
private final Optional<Replica> leader;

public NotLeaderException(Replica leader) {
public NotLeaderException(Optional<Replica> leader) {
this.leader = leader;
}

public Replica getLeader() {
public Optional<Replica> getLeader() {
return leader;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ public Builder withLogDir(File logDir) {
return this;
}

public Builder withExecutor(Executor executor) {
this.executor = Optional.of(executor);

return this;
}

public RaftCoreModule build() {
checkState(config.isPresent());
checkState(stateMachine.isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import journal.io.api.Journal;
import journal.io.api.JournalBuilder;
import org.robotninjas.barge.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.io.File;
Expand All @@ -33,6 +35,8 @@

public class LogModule extends PrivateModule {

private static final Logger LOGGER = LoggerFactory.getLogger(LogModule.class);

private final File logDirectory;
private final StateMachine stateMachine;
private final Executor executor;
Expand Down Expand Up @@ -68,11 +72,10 @@ Journal getJournal() {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
//noinspection EmptyCatchBlock
try {
journal.close();
} catch (IOException e) {
//TODO log it
LOGGER.error("Error closing journal", e);
}
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ public void replay(Visitor visitor) {

}

public void close() {
try {
journal.close();
} catch (IOException e) {
propagate(e);
}
}

static interface Visitor {

void term(Mark mark, long term);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public void append(RaftJournal.Mark mark, Entry entry, long index) {
lastLogIndex, currentTerm, commitIndex, votedFor.orNull());
}

public void close() {
LOGGER.debug("Closing raft log and journal");
journal.close();
}

private SettableFuture<Object> storeEntry(final long index, @Nonnull Entry entry) {
LOGGER.debug("{} storing {}", config.local(), entry);
RaftJournal.Mark mark = journal.appendEntry(entry, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public abstract class BaseState implements State {

private final StateType type;
private final RaftLog log;
private Optional<Replica> leader;
private Optional<Replica> leader = Optional.absent();

protected BaseState(@Nullable StateType type, @Nonnull RaftLog log) {
this.log = checkNotNull(log);
Expand All @@ -43,6 +43,10 @@ protected RaftLog getLog() {
return log;
}

public Optional<Replica> getLeader() {
return leader;
}

@Override
public void destroy(RaftStateContext ctx) {
}
Expand Down Expand Up @@ -173,7 +177,7 @@ public ListenableFuture<Object> commitOperation(@Nonnull RaftStateContext ctx, @
StateType stateType = ctx.type();
Preconditions.checkNotNull(stateType);
if (stateType.equals(FOLLOWER)) {
throw new NotLeaderException(leader.get());
throw new NotLeaderException(leader);
} else if (stateType.equals(CANDIDATE)) {
throw new NoLeaderException();
}
Expand All @@ -183,6 +187,7 @@ public ListenableFuture<Object> commitOperation(@Nonnull RaftStateContext ctx, @
@Override
public void doStop(RaftStateContext ctx) {
ctx.setState(this, STOPPED);
log.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.robotninjas.barge.log.RaftLog;
import static org.robotninjas.barge.state.Raft.StateType.FOLLOWER;
import static org.robotninjas.barge.state.Raft.StateType.LEADER;
import static org.robotninjas.barge.state.Raft.StateType.STOPPED;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -181,6 +182,10 @@ List<ListenableFuture<AppendEntriesResponse>> sendRequests(final RaftStateContex
Futures.addCallback(response, new FutureCallback<AppendEntriesResponse>() {
@Override
public void onSuccess(@Nullable AppendEntriesResponse result) {
if (ctx.type().equals(STOPPED)) {
return;
}

updateCommitted();
checkTermOnResponse(ctx, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
class RaftPredicates {

@Nonnull
static Predicate<AppendEntriesResponse> appendSuccessul() {
static Predicate<AppendEntriesResponse> appendSuccessful() {
return AppendSuccessPredicate.Success;
}

Expand Down
16 changes: 11 additions & 5 deletions barge-jax-rs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<version>1.7.4</version>
<version>${slf4j.version}</version>
</dependency>

<dependency>
Expand All @@ -62,7 +62,7 @@
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-api</artifactId>
<version>9.1.0.v20131115</version>
<version>${jetty.version}</version>
</dependency>

<dependency>
Expand All @@ -75,13 +75,13 @@
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<version>9.1.0.v20131115</version>
<version>${jetty.version}</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
<version>9.1.0.v20131115</version>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>

Expand Down Expand Up @@ -131,14 +131,20 @@
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>2.1</version>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.glassfish.hk2.external:aopalliance-repackaged</exclude>
<exclude>org.glassfish.hk2.external:javax.inject</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package org.robotninjas.barge.jaxrs;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import org.robotninjas.barge.ClusterConfig;
import org.robotninjas.barge.NotLeaderException;
import org.robotninjas.barge.Replica;
import org.robotninjas.barge.api.AppendEntries;
import org.robotninjas.barge.api.AppendEntriesResponse;
import org.robotninjas.barge.api.RequestVote;
Expand Down Expand Up @@ -97,7 +99,11 @@ public Response commit(byte[] operation) {

return Response.noContent().build();
} catch (NotLeaderException e) {
return Response.status(Response.Status.FOUND).location(((HttpReplica) e.getLeader()).getUri()).build();
Optional<Replica> leader = e.getLeader();
if (leader.isPresent() == false) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); // FIXME find better solution for missing leader
}
return Response.status(Response.Status.FOUND).location(((HttpReplica) leader.get()).getUri()).build();
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.robotninjas.barge.jaxrs;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
Expand All @@ -27,6 +28,7 @@
import org.junit.Test;
import org.robotninjas.barge.ClusterConfig;
import org.robotninjas.barge.NotLeaderException;
import org.robotninjas.barge.Replica;
import org.robotninjas.barge.api.AppendEntriesResponse;
import org.robotninjas.barge.api.RequestVoteResponse;
import org.robotninjas.barge.state.Raft;
Expand Down Expand Up @@ -94,7 +96,8 @@ public void onPOSTCommitReturn204GivenServiceReturnsResponse() throws Exception
@Test
public void onPOSTCommitReturn302WithLeaderURIGivenRaftThrowsNotLeaderException() throws Exception {
URI leaderURI = new URI("http://localhost:1234");
when(raftService.commitOperation("foo".getBytes())).thenThrow(new NotLeaderException(new HttpReplica(leaderURI)));
Replica replica = new HttpReplica(leaderURI);
when(raftService.commitOperation("foo".getBytes())).thenThrow(new NotLeaderException(Optional.of(replica)));

Response value = client().target("/commit")
.request()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import com.google.inject.PrivateModule;

import org.robotninjas.barge.RaftCoreModule.Builder;
import org.robotninjas.barge.state.AbstractListenersModule;

import java.io.File;
import java.util.concurrent.Executor;


public class NettyRaftModule extends PrivateModule {
Expand All @@ -13,12 +15,14 @@ public class NettyRaftModule extends PrivateModule {
private final File logDir;
private final StateMachine stateMachine;
private final long timeout;
private final Executor executor;

public NettyRaftModule(NettyClusterConfig config, File logDir, StateMachine stateMachine, long timeout) {
public NettyRaftModule(NettyClusterConfig config, File logDir, StateMachine stateMachine, long timeout, Executor executor) {
this.config = config;
this.logDir = logDir;
this.stateMachine = stateMachine;
this.timeout = timeout;
this.executor = executor;
}

@Override
Expand All @@ -29,12 +33,17 @@ protected void configureListeners() {
}
});

install(RaftCoreModule.builder()
.withTimeout(timeout)
.withConfig(config)
.withLogDir(logDir)
.withStateMachine(stateMachine)
.build());
Builder builder = RaftCoreModule.builder()
.withTimeout(timeout)
.withConfig(config)
.withLogDir(logDir)
.withStateMachine(stateMachine);

if (executor != null) {
builder.withExecutor(executor);
}

install(builder.build());

install(new RaftProtoRpcModule(config.local()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import javax.inject.Inject;
import java.io.File;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate;
Expand Down Expand Up @@ -129,6 +130,7 @@ public static class Builder {
private final NettyClusterConfig config;
private File logDir = Files.createTempDir();
private long timeout = TIMEOUT;
private Executor executor;
private StateTransitionListener listener;

protected Builder(NettyClusterConfig config) {
Expand All @@ -145,9 +147,14 @@ public Builder logDir(File logDir) {
return this;
}

public Builder executor(Executor executor) {
this.executor = executor;
return this;
}

public NettyRaftService build(StateMachine stateMachine) {
NettyRaftService nettyRaftService = Guice.createInjector(
new NettyRaftModule(config, logDir, stateMachine, timeout))
new NettyRaftModule(config, logDir, stateMachine, timeout, executor))
.getInstance(NettyRaftService.class);

if (listener != null) {
Expand Down