Skip to content
This repository has been archived by the owner on Jan 12, 2020. It is now read-only.

Commit

Permalink
Apply patch from Christian Weisskopf
Browse files Browse the repository at this point in the history
  • Loading branch information
mgodave committed Dec 5, 2014
1 parent 7a04847 commit 0f2caa9
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package org.robotninjas.barge;

import com.google.common.base.Optional;

public class NotLeaderException extends RaftException {

private final Replica leader;
private final Optional<Replica> leader;

public NotLeaderException(Replica leader) {
public NotLeaderException(Optional<Replica> leader) {
this.leader = leader;
}

public Replica getLeader() {
public Optional<Replica> getLeader() {
return leader;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ public Builder withLogDir(File logDir) {
return this;
}

public Builder withExecutor(Executor executor) {
this.executor = Optional.of(executor);

return this;
}

public RaftCoreModule build() {
checkState(config.isPresent());
checkState(stateMachine.isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import journal.io.api.Journal;
import journal.io.api.JournalBuilder;
import org.robotninjas.barge.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.io.File;
Expand All @@ -33,6 +35,8 @@

public class LogModule extends PrivateModule {

private static final Logger LOGGER = LoggerFactory.getLogger(LogModule.class);

private final File logDirectory;
private final StateMachine stateMachine;
private final Executor executor;
Expand Down Expand Up @@ -68,11 +72,10 @@ Journal getJournal() {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
//noinspection EmptyCatchBlock
try {
journal.close();
} catch (IOException e) {
//TODO log it
LOGGER.error("Error closing journal", e);
}
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ public void replay(Visitor visitor) {

}

public void close() {
try {
journal.close();
} catch (IOException e) {
propagate(e);
}
}

static interface Visitor {

void term(Mark mark, long term);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public void append(RaftJournal.Mark mark, Entry entry, long index) {
lastLogIndex, currentTerm, commitIndex, votedFor.orNull());
}

public void close() {
LOGGER.debug("Closing raft log and journal");
journal.close();
}

private SettableFuture<Object> storeEntry(final long index, @Nonnull Entry entry) {
LOGGER.debug("{} storing {}", config.local(), entry);
RaftJournal.Mark mark = journal.appendEntry(entry, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public abstract class BaseState implements State {

private final StateType type;
private final RaftLog log;
private Optional<Replica> leader;
private Optional<Replica> leader = Optional.absent();

protected BaseState(@Nullable StateType type, @Nonnull RaftLog log) {
this.log = checkNotNull(log);
Expand All @@ -43,6 +43,10 @@ protected RaftLog getLog() {
return log;
}

public Optional<Replica> getLeader() {
return leader;
}

@Override
public void destroy(RaftStateContext ctx) {
}
Expand Down Expand Up @@ -173,7 +177,7 @@ public ListenableFuture<Object> commitOperation(@Nonnull RaftStateContext ctx, @
StateType stateType = ctx.type();
Preconditions.checkNotNull(stateType);
if (stateType.equals(FOLLOWER)) {
throw new NotLeaderException(leader.get());
throw new NotLeaderException(leader);
} else if (stateType.equals(CANDIDATE)) {
throw new NoLeaderException();
}
Expand All @@ -183,6 +187,7 @@ public ListenableFuture<Object> commitOperation(@Nonnull RaftStateContext ctx, @
@Override
public void doStop(RaftStateContext ctx) {
ctx.setState(this, STOPPED);
log.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.robotninjas.barge.log.RaftLog;
import static org.robotninjas.barge.state.Raft.StateType.FOLLOWER;
import static org.robotninjas.barge.state.Raft.StateType.LEADER;
import static org.robotninjas.barge.state.Raft.StateType.STOPPED;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -181,6 +182,10 @@ List<ListenableFuture<AppendEntriesResponse>> sendRequests(final RaftStateContex
Futures.addCallback(response, new FutureCallback<AppendEntriesResponse>() {
@Override
public void onSuccess(@Nullable AppendEntriesResponse result) {
if (ctx.type().equals(STOPPED)) {
return;
}

updateCommitted();
checkTermOnResponse(ctx, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
class RaftPredicates {

@Nonnull
static Predicate<AppendEntriesResponse> appendSuccessul() {
static Predicate<AppendEntriesResponse> appendSuccessful() {
return AppendSuccessPredicate.Success;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package org.robotninjas.barge.jaxrs;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import org.robotninjas.barge.ClusterConfig;
import org.robotninjas.barge.NotLeaderException;
import org.robotninjas.barge.Replica;
import org.robotninjas.barge.api.AppendEntries;
import org.robotninjas.barge.api.AppendEntriesResponse;
import org.robotninjas.barge.api.RequestVote;
Expand Down Expand Up @@ -97,7 +99,11 @@ public Response commit(byte[] operation) {

return Response.noContent().build();
} catch (NotLeaderException e) {
return Response.status(Response.Status.FOUND).location(((HttpReplica) e.getLeader()).getUri()).build();
Optional<Replica> leader = e.getLeader();
if (leader.isPresent() == false) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); // FIXME find better solution for missing leader

This comment has been minimized.

Copy link
@abailly

abailly Dec 6, 2014

Collaborator

Given there is a test case for returning 302, would be good to have a test case for 500 also.

}
return Response.status(Response.Status.FOUND).location(((HttpReplica) leader.get()).getUri()).build();
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.robotninjas.barge.jaxrs;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
Expand All @@ -27,6 +28,7 @@
import org.junit.Test;
import org.robotninjas.barge.ClusterConfig;
import org.robotninjas.barge.NotLeaderException;
import org.robotninjas.barge.Replica;
import org.robotninjas.barge.api.AppendEntriesResponse;
import org.robotninjas.barge.api.RequestVoteResponse;
import org.robotninjas.barge.state.Raft;
Expand Down Expand Up @@ -94,7 +96,8 @@ public void onPOSTCommitReturn204GivenServiceReturnsResponse() throws Exception
@Test
public void onPOSTCommitReturn302WithLeaderURIGivenRaftThrowsNotLeaderException() throws Exception {
URI leaderURI = new URI("http://localhost:1234");
when(raftService.commitOperation("foo".getBytes())).thenThrow(new NotLeaderException(new HttpReplica(leaderURI)));
Replica replica = new HttpReplica(leaderURI);
when(raftService.commitOperation("foo".getBytes())).thenThrow(new NotLeaderException(Optional.of(replica)));

Response value = client().target("/commit")
.request()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import com.google.inject.PrivateModule;

import org.robotninjas.barge.RaftCoreModule.Builder;
import org.robotninjas.barge.state.AbstractListenersModule;

import java.io.File;
import java.util.concurrent.Executor;


public class NettyRaftModule extends PrivateModule {
Expand All @@ -13,12 +15,14 @@ public class NettyRaftModule extends PrivateModule {
private final File logDir;
private final StateMachine stateMachine;
private final long timeout;
private final Executor executor;

public NettyRaftModule(NettyClusterConfig config, File logDir, StateMachine stateMachine, long timeout) {
public NettyRaftModule(NettyClusterConfig config, File logDir, StateMachine stateMachine, long timeout, Executor executor) {
this.config = config;
this.logDir = logDir;
this.stateMachine = stateMachine;
this.timeout = timeout;
this.executor = executor;
}

@Override
Expand All @@ -29,12 +33,17 @@ protected void configureListeners() {
}
});

install(RaftCoreModule.builder()
.withTimeout(timeout)
.withConfig(config)
.withLogDir(logDir)
.withStateMachine(stateMachine)
.build());
Builder builder = RaftCoreModule.builder()
.withTimeout(timeout)
.withConfig(config)
.withLogDir(logDir)
.withStateMachine(stateMachine);

if (executor != null) {
builder.withExecutor(executor);
}

install(builder.build());

install(new RaftProtoRpcModule(config.local()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import javax.inject.Inject;
import java.io.File;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate;
Expand Down Expand Up @@ -129,6 +130,7 @@ public static class Builder {
private final NettyClusterConfig config;
private File logDir = Files.createTempDir();
private long timeout = TIMEOUT;
private Executor executor;
private StateTransitionListener listener;

protected Builder(NettyClusterConfig config) {
Expand All @@ -145,9 +147,14 @@ public Builder logDir(File logDir) {
return this;
}

public Builder executor(Executor executor) {
this.executor = executor;
return this;
}

public NettyRaftService build(StateMachine stateMachine) {
NettyRaftService nettyRaftService = Guice.createInjector(
new NettyRaftModule(config, logDir, stateMachine, timeout))
new NettyRaftModule(config, logDir, stateMachine, timeout, executor))
.getInstance(NettyRaftService.class);

if (listener != null) {
Expand Down

0 comments on commit 0f2caa9

Please sign in to comment.