--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
+
+public class ClientActor extends UntypedActor {
+ protected final LoggingAdapter LOG =
+ Logging.getLogger(getContext().system(), this);
+
+ private final ActorRef target;
+
+ public ClientActor(ActorRef target){
+ this.target = target;
+ }
+
+ public static Props props(final ActorRef target){
+ return Props.create(new Creator<ClientActor>(){
+
+ @Override public ClientActor create() throws Exception {
+ return new ClientActor(target);
+ }
+ });
+ }
+
+ @Override public void onReceive(Object message) throws Exception {
+ if(message instanceof KeyValue) {
+ target.tell(message, getSelf());
+ } else if(message instanceof KeyValueSaved){
+ LOG.info("KeyValue saved");
+ }
+ }
+}
package org.opendaylight.controller.cluster.example;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* A sample actor showing how the RaftActor is to be extended
*/
public class ExampleActor extends RaftActor {
- public ExampleActor(String id) {
- super(id);
+
+ private final Map<String, String> state = new HashMap();
+
+ private long persistIdentifier = 1;
+
+
+ public ExampleActor(String id, Map<String, String> peerAddresses) {
+ super(id, peerAddresses);
+ }
+
+ public static Props props(final String id, final Map<String, String> peerAddresses){
+ return Props.create(new Creator<ExampleActor>(){
+
+ @Override public ExampleActor create() throws Exception {
+ return new ExampleActor(id, peerAddresses);
+ }
+ });
}
@Override public void onReceiveCommand(Object message){
- /*
- Here the extended class does whatever it needs to do.
- If it cannot handle a message then it passes it on to the super
- class for handling
- */
+ if(message instanceof KeyValue){
+
+ if(isLeader()) {
+ String persistId = Long.toString(persistIdentifier++);
+ persistData(getSender(), persistId, message);
+ } else {
+ getLeader().forward(message, getContext());
+ }
+ }
super.onReceiveCommand(message);
}
+ @Override protected void applyState(ActorRef clientActor, String identifier,
+ Object data) {
+ if(data instanceof KeyValue){
+ KeyValue kv = (KeyValue) data;
+ state.put(kv.getKey(), kv.getValue());
+ if(clientActor != null) {
+ clientActor.tell(new KeyValueSaved(), getSelf());
+ }
+ }
+ }
+
@Override public void onReceiveRecover(Object message) {
super.onReceiveRecover(message);
}
+ @Override public String persistenceId() {
+ return getId();
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Main {
+ private static final ActorSystem actorSystem = ActorSystem.create();
+ // Create three example actors
+ private static Map<String, String> allPeers = new HashMap<>();
+
+ static {
+ allPeers.put("example-1", "akka://default/user/example-1");
+ allPeers.put("example-2", "akka://default/user/example-2");
+ allPeers.put("example-3", "akka://default/user/example-3");
+ }
+
+ public static void main(String[] args) throws Exception{
+ ActorRef example1Actor =
+ actorSystem.actorOf(ExampleActor.props("example-1",
+ withoutPeer("example-1")), "example-1");
+
+ ActorRef example2Actor =
+ actorSystem.actorOf(ExampleActor.props("example-2",
+ withoutPeer("example-2")), "example-2");
+
+ ActorRef example3Actor =
+ actorSystem.actorOf(ExampleActor.props("example-3",
+ withoutPeer("example-3")), "example-3");
+
+ ActorRef clientActor = actorSystem.actorOf(ClientActor.props(example1Actor));
+ BufferedReader br =
+ new BufferedReader(new InputStreamReader(System.in));
+
+ while(true) {
+ System.out.print("Enter Integer (0 to exit):");
+ try {
+ int i = Integer.parseInt(br.readLine());
+ if(i == 0){
+ System.exit(0);
+ }
+ clientActor.tell(new KeyValue("key " + i, "value " + i), null);
+ } catch (NumberFormatException nfe) {
+ System.err.println("Invalid Format!");
+ }
+ }
+ }
+
+ private static Map<String, String> withoutPeer(String peerId) {
+ Map<String, String> without = new HashMap<>(allPeers);
+ without.remove(peerId);
+ return without;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example.messages;
+
+import java.io.Serializable;
+
+public class KeyValue implements Serializable{
+ private final String key;
+ private final String value;
+
+ public KeyValue(String key, String value){
+ this.key = key;
+ this.value = value;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example.messages;
+
+public class KeyValueSaved {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+public interface ClientRequestTracker {
+ /**
+ * The client actor who is waiting for a response
+ *
+ * @return
+ */
+ ActorRef getClientActor();
+
+ /**
+ *
+ * @return
+ */
+ String getIdentifier();
+
+ /**
+ * The index of the log entry which needs to be replicated
+ *
+ * @return
+ */
+ long getIndex();
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+public class ClientRequestTrackerImpl implements ClientRequestTracker {
+
+ private final ActorRef clientActor;
+ private final String identifier;
+ private final long logIndex;
+
+ public ClientRequestTrackerImpl(ActorRef clientActor, String identifier,
+ long logIndex) {
+
+ this.clientActor = clientActor;
+
+ this.identifier = identifier;
+
+ this.logIndex = logIndex;
+ }
+
+ @Override public ActorRef getClientActor() {
+ return clientActor;
+ }
+
+ @Override public long getIndex() {
+ return logIndex;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+}
*/
public long incrNextIndex();
+ /**
+ * Decrement the value of the nextIndex
+ * @return
+ */
+ public long decrNextIndex();
+
+ /**
+ *
+ * @param nextIndex
+ */
+ void setNextIndex(long nextIndex);
+
/**
* Increment the value of the matchIndex
* @return
*/
public long incrMatchIndex();
+ public void setMatchIndex(long matchIndex);
+
/**
* The identifier of the follower
* This could simply be the url of the remote actor
return nextIndex.incrementAndGet();
}
+ @Override public long decrNextIndex() {
+ return nextIndex.decrementAndGet();
+ }
+
+ @Override public void setNextIndex(long nextIndex) {
+ this.nextIndex.set(nextIndex);
+ }
+
public long incrMatchIndex(){
return matchIndex.incrementAndGet();
}
+ @Override public void setMatchIndex(long matchIndex) {
+ this.matchIndex.set(matchIndex);
+ }
+
public String getId() {
return id;
}
package org.opendaylight.controller.cluster.raft;
-import akka.persistence.UntypedEventsourcedProcessor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.UntypedPersistentActor;
import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
-import java.util.Collections;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
*
* <a href="http://doc.akka.io/api/akka/2.3.3/index.html#akka.persistence.UntypedEventsourcedProcessor">UntypeEventSourceProcessor</a>
*/
-public abstract class RaftActor extends UntypedEventsourcedProcessor {
+public abstract class RaftActor extends UntypedPersistentActor {
+ protected final LoggingAdapter LOG =
+ Logging.getLogger(getContext().system(), this);
/**
* The current state determines the current behavior of a RaftActor
*/
private RaftActorContext context;
- public RaftActor(String id){
+ /**
+ * The in-memory journal
+ */
+ private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
+
+
+
+ public RaftActor(String id, Map<String, String> peerAddresses){
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(),
- id, new ElectionTermImpl(id),
- 0, 0, new ReplicatedLogImpl());
+ id, new ElectionTermImpl(getSelf().path().toString()),
+ -1, -1, replicatedLog, peerAddresses, LOG);
currentBehavior = switchBehavior(RaftState.Follower);
}
@Override public void onReceiveRecover(Object message) {
- throw new UnsupportedOperationException("onReceiveRecover");
+ if(message instanceof ReplicatedLogEntry) {
+ replicatedLog.append((ReplicatedLogEntry) message);
+ } else if(message instanceof RecoveryCompleted){
+ LOG.debug("Log now has messages to index : " + replicatedLog.lastIndex());
+ }
}
@Override public void onReceiveCommand(Object message) {
- RaftState state = currentBehavior.handleMessage(getSender(), message);
- currentBehavior = switchBehavior(state);
+ if(message instanceof ApplyState){
+
+ ApplyState applyState = (ApplyState) message;
+
+ LOG.debug("Applying state for log index {}", applyState.getReplicatedLogEntry().getIndex());
+
+ applyState(applyState.getClientActor(), applyState.getIdentifier(),
+ applyState.getReplicatedLogEntry().getData());
+ } else if(message instanceof FindLeader){
+ getSender().tell(new FindLeaderReply(
+ context.getPeerAddress(currentBehavior.getLeaderId())),
+ getSelf());
+ } else {
+ RaftState state =
+ currentBehavior.handleMessage(getSender(), message);
+ currentBehavior = switchBehavior(state);
+ }
}
private RaftActorBehavior switchBehavior(RaftState state){
+ if(currentBehavior != null) {
+ if (currentBehavior.state() == state) {
+ return currentBehavior;
+ }
+ LOG.info("Switching from state " + currentBehavior.state() + " to "
+ + state);
+
+ try {
+ currentBehavior.close();
+ } catch (Exception e) {
+ LOG.error(e, "Failed to close behavior : " + currentBehavior.state());
+ }
+
+ } else {
+ LOG.info("Switching behavior to " + state);
+ }
RaftActorBehavior behavior = null;
if(state == RaftState.Candidate){
- behavior = new Candidate(context, Collections.EMPTY_LIST);
+ behavior = new Candidate(context);
} else if(state == RaftState.Follower){
behavior = new Follower(context);
} else {
- behavior = new Leader(context, Collections.EMPTY_LIST);
+ behavior = new Leader(context);
}
return behavior;
}
+ /**
+ * When a derived RaftActor needs to persist something it must call
+ * persistData.
+ *
+ * @param clientActor
+ * @param identifier
+ * @param data
+ */
+ protected void persistData(ActorRef clientActor, String identifier, Object data){
+ LOG.debug("Persist data " + identifier);
+ ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+ context.getReplicatedLog().lastIndex() + 1,
+ context.getTermInformation().getCurrentTerm(), data);
+
+ replicatedLog.appendAndPersist(clientActor, identifier, replicatedLogEntry);
+ }
+
+ protected abstract void applyState(ActorRef clientActor, String identifier, Object data);
+
+ protected String getId(){
+ return context.getId();
+ }
+
+ protected boolean isLeader(){
+ return context.getId().equals(currentBehavior.getLeaderId());
+ }
+
+ protected ActorSelection getLeader(){
+ String leaderId = currentBehavior.getLeaderId();
+ String peerAddress = context.getPeerAddress(leaderId);
+ LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + peerAddress);
+ return context.actorSelection(peerAddress);
+ }
+
private class ReplicatedLogImpl implements ReplicatedLog {
+ private final List<ReplicatedLogEntry> journal = new ArrayList();
+ private long snapshotIndex = 0;
+ private Object snapShot = null;
+
@Override public ReplicatedLogEntry get(long index) {
- throw new UnsupportedOperationException("get");
+ if(index < 0 || index >= journal.size()){
+ return null;
+ }
+
+ return journal.get((int) (index - snapshotIndex));
}
@Override public ReplicatedLogEntry last() {
- throw new UnsupportedOperationException("last");
+ if(journal.size() == 0){
+ return null;
+ }
+ return get(journal.size() - 1);
}
+ @Override public long lastIndex() {
+ if(journal.size() == 0){
+ return -1;
+ }
+
+ return last().getIndex();
+ }
+
+ @Override public long lastTerm() {
+ if(journal.size() == 0){
+ return -1;
+ }
+
+ return last().getTerm();
+ }
+
+
@Override public void removeFrom(long index) {
- throw new UnsupportedOperationException("removeFrom");
+ if(index < 0 || index >= journal.size()){
+ return;
+ }
+ for(int i= (int) (index - snapshotIndex) ; i < journal.size() ; i++){
+ deleteMessage(i);
+ journal.remove(i);
+ }
+ }
+
+ @Override public void append(final ReplicatedLogEntry replicatedLogEntry) {
+ journal.add(replicatedLogEntry);
+ }
+
+ @Override public List<ReplicatedLogEntry> getFrom(long index) {
+ List<ReplicatedLogEntry> entries = new ArrayList<>(100);
+ if(index < 0 || index >= journal.size()){
+ return entries;
+ }
+ for(int i= (int) (index - snapshotIndex); i < journal.size() ; i++){
+ entries.add(journal.get(i));
+ }
+ return entries;
+ }
+
+ @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry){
+ appendAndPersist(null, null, replicatedLogEntry);
+ }
+
+ public void appendAndPersist(final ActorRef clientActor, final String identifier, final ReplicatedLogEntry replicatedLogEntry){
+ context.getLogger().debug("Append log entry and persist" + replicatedLogEntry.getIndex());
+ // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
+ journal.add(replicatedLogEntry);
+ persist(replicatedLogEntry,
+ new Procedure<ReplicatedLogEntry>() {
+ public void apply(ReplicatedLogEntry evt) throws Exception {
+ // Send message for replication
+ if(clientActor != null) {
+ currentBehavior.handleMessage(getSelf(),
+ new Replicate(clientActor, identifier,
+ replicatedLogEntry));
+ }
+ }
+ });
}
- @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
- throw new UnsupportedOperationException("append");
+ @Override public long size() {
+ return journal.size() + snapshotIndex;
}
}
+
+ private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
+ Serializable {
+
+ private final long index;
+ private final long term;
+ private final Object payload;
+
+ public ReplicatedLogImplEntry(long index, long term, Object payload){
+
+ this.index = index;
+ this.term = term;
+ this.payload = payload;
+ }
+
+ @Override public Object getData() {
+ return payload;
+ }
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public long getIndex() {
+ return index;
+ }
+ }
+
+
}
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import akka.event.LoggingAdapter;
+
+import java.util.Map;
/**
* The RaftActorContext contains that portion of the RaftActors state that
* @return The ActorSystem associated with this context
*/
ActorSystem getActorSystem();
+
+ /**
+ *
+ * @return
+ */
+ LoggingAdapter getLogger();
+
+ /**
+ * Get a mapping of peer id's their addresses
+ * @return
+ */
+ Map<String, String> getPeerAddresses();
+
+ /**
+ *
+ * @param peerId
+ * @return
+ */
+ String getPeerAddress(String peerId);
}
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActorContext;
+import akka.event.LoggingAdapter;
+
+import java.util.Map;
public class RaftActorContextImpl implements RaftActorContext{
private final ReplicatedLog replicatedLog;
+ private final Map<String, String> peerAddresses;
+
+ private final LoggingAdapter LOG;
+
+
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
String id,
ElectionTerm termInformation, long commitIndex,
- long lastApplied, ReplicatedLog replicatedLog) {
+ long lastApplied, ReplicatedLog replicatedLog, Map<String, String> peerAddresses, LoggingAdapter logger) {
this.actor = actor;
this.context = context;
this.id = id;
this.commitIndex = commitIndex;
this.lastApplied = lastApplied;
this.replicatedLog = replicatedLog;
+ this.peerAddresses = peerAddresses;
+ this.LOG = logger;
}
public ActorRef actorOf(Props props){
@Override public ActorSystem getActorSystem() {
return context.system();
}
+
+ @Override public LoggingAdapter getLogger() {
+ return this.LOG;
+ }
+
+ @Override public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ @Override public String getPeerAddress(String peerId) {
+ return peerAddresses.get(peerId);
+ }
}
package org.opendaylight.controller.cluster.raft;
+import java.util.List;
+
/**
* Represents the ReplicatedLog that needs to be kept in sync by the RaftActor
*/
*/
ReplicatedLogEntry last();
+ /**
+ *
+ * @return
+ */
+ long lastIndex();
+
+ /**
+ *
+ * @return
+ */
+ long lastTerm();
+
/**
* Remove all the entries from the logs >= index
*
* @param replicatedLogEntry
*/
void append(ReplicatedLogEntry replicatedLogEntry);
+
+ /**
+ *
+ * @param replicatedLogEntry
+ */
+ void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
+
+ /**
+ *
+ * @param index
+ */
+ List<ReplicatedLogEntry> getFrom(long index);
+
+
+ /**
+ *
+ * @return
+ */
+ long size();
}
import akka.actor.ActorRef;
import akka.actor.Cancellable;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
/**
*
*/
-
private Cancellable electionCancel = null;
+ /**
+ *
+ */
+ protected String leaderId = null;
+
protected AbstractRaftActorBehavior(RaftActorContext context) {
this.context = context;
protected RaftState appendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState raftState){
+ AppendEntries appendEntries, RaftState raftState) {
- // 1. Reply false if term < currentTerm (§5.1)
- if(appendEntries.getTerm() < currentTerm()){
- sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
- return state();
+ if (raftState != state()) {
+ context.getLogger().debug("Suggested state is " + raftState
+ + " current behavior state is " + state());
}
- // 2. Reply false if log doesn’t contain an entry at prevLogIndex
- // whose term matches prevLogTerm (§5.3)
- ReplicatedLogEntry previousEntry = context.getReplicatedLog()
- .get(appendEntries.getPrevLogIndex());
-
- if(previousEntry == null || previousEntry.getTerm() != appendEntries.getPrevLogTerm()){
- sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
+ // 1. Reply false if term < currentTerm (§5.1)
+ if (appendEntries.getTerm() < currentTerm()) {
+ context.getLogger().debug(
+ "Cannot append entries because sender term " + appendEntries
+ .getTerm() + " is less than " + currentTerm());
+ sender.tell(
+ new AppendEntriesReply(context.getId(), currentTerm(), false,
+ lastIndex(), lastTerm()), actor()
+ );
return state();
}
- if(appendEntries.getEntries() != null) {
- // 3. If an existing entry conflicts with a new one (same index
- // but different terms), delete the existing entry and all that
- // follow it (§5.3)
- int addEntriesFrom = 0;
- for (int i = 0;
- i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
- ReplicatedLogEntry newEntry = context.getReplicatedLog()
- .get(i + 1);
-
- if (newEntry != null && newEntry.getTerm() == appendEntries.getEntries().get(i).getTerm()){
- break;
- }
- if (newEntry != null && newEntry.getTerm() != appendEntries
- .getEntries().get(i).getTerm()) {
- context.getReplicatedLog().removeFrom(i + 1);
- break;
- }
- }
-
- // 4. Append any new entries not already in the log
- for (int i = addEntriesFrom;
- i < appendEntries.getEntries().size(); i++) {
- context.getReplicatedLog()
- .append(appendEntries.getEntries().get(i));
- }
- }
-
-
- // 5. If leaderCommit > commitIndex, set commitIndex =
- // min(leaderCommit, index of last new entry)
- context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
- context.getReplicatedLog().last().getIndex()));
-
- // If commitIndex > lastApplied: increment lastApplied, apply
- // log[lastApplied] to state machine (§5.3)
- if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
- applyLogToStateMachine(appendEntries.getLeaderCommit());
- }
-
- sender.tell(new AppendEntriesReply(currentTerm(), true), actor());
return handleAppendEntries(sender, appendEntries, raftState);
}
if (requestVote.getLastLogTerm() > lastTerm()) {
candidateLatest = true;
} else if ((requestVote.getLastLogTerm() == lastTerm())
- && requestVote.getLastLogIndex() >= lastTerm()) {
+ && requestVote.getLastLogIndex() >= lastIndex()) {
candidateLatest = true;
}
protected abstract RaftState handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply, RaftState suggestedState);
- /**
- * @return The derived class should return the state that corresponds to
- * it's behavior
- */
- protected abstract RaftState state();
-
protected FiniteDuration electionDuration() {
long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
TimeUnit.MILLISECONDS);
}
- protected void scheduleElection(FiniteDuration interval) {
-
+ protected void stopElection() {
if (electionCancel != null && !electionCancel.isCancelled()) {
electionCancel.cancel();
}
+ }
+
+ protected void scheduleElection(FiniteDuration interval) {
+
+ stopElection();
// Schedule an election. When the scheduler triggers an ElectionTimeout
// message is sent to itself
}
protected long lastTerm() {
- return context.getReplicatedLog().last().getTerm();
+ return context.getReplicatedLog().lastTerm();
}
protected long lastIndex() {
- return context.getReplicatedLog().last().getIndex();
+ return context.getReplicatedLog().lastIndex();
}
+ protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+ return null;
+ }
+
+ protected void applyLogToStateMachine(long index) {
+ // Now maybe we apply to the state machine
+ for (long i = context.getLastApplied() + 1;
+ i < index + 1; i++) {
+ ActorRef clientActor = null;
+ String identifier = null;
+ ClientRequestTracker tracker = findClientRequestTracker(i);
+
+ if (tracker != null) {
+ clientActor = tracker.getClientActor();
+ identifier = tracker.getIdentifier();
+ }
+ ReplicatedLogEntry replicatedLogEntry =
+ context.getReplicatedLog().get(i);
+
+ if (replicatedLogEntry != null) {
+ actor().tell(new ApplyState(clientActor, identifier,
+ replicatedLogEntry), actor());
+ } else {
+ context.getLogger().error(
+ "Missing index " + i + " from log. Cannot apply state.");
+ }
+ }
+ // Send a local message to the local RaftActor (it's derived class to be
+ // specific to apply the log to it's index)
+ context.setLastApplied(index);
+ }
@Override
public RaftState handleMessage(ActorRef sender, Object message) {
return raftState;
}
+ @Override public String getLeaderId() {
+ return leaderId;
+ }
+
private RaftState applyTerm(RaftRPC rpc) {
// If RPC request or response contains term T > currentTerm:
// set currentTerm = T, convert to follower (§5.1)
return state();
}
- private void applyLogToStateMachine(long index) {
- // Send a local message to the local RaftActor (it's derived class to be
- // specific to apply the log to it's index)
- context.setLastApplied(index);
- }
-
-
}
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
/**
* The behavior of a RaftActor when it is in the CandidateState
- * <p>
+ * <p/>
* Candidates (§5.2):
* <ul>
* <li> On conversion to candidate, start election:
private final int votesRequired;
- public Candidate(RaftActorContext context, List<String> peerPaths) {
+ public Candidate(RaftActorContext context) {
super(context);
+ Collection<String> peerPaths = context.getPeerAddresses().values();
+
for (String peerPath : peerPaths) {
peerToActor.put(peerPath,
context.actorSelection(peerPath));
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries, RaftState suggestedState) {
- // There is some peer who thinks it's a leader but is not
- // I will not accept this append entries
- sender.tell(new AppendEntriesReply(
- context.getTermInformation().getCurrentTerm(), false),
- context.getActor());
+ context.getLogger().error("An unexpected AppendEntries received in state " + state());
return suggestedState;
}
@Override protected RaftState handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply, RaftState suggestedState) {
- if(suggestedState == RaftState.Follower) {
+ if (suggestedState == RaftState.Follower) {
// If base class thinks I should be follower then I am
return suggestedState;
}
- if(requestVoteReply.isVoteGranted()){
+ if (requestVoteReply.isVoteGranted()) {
voteCount++;
}
- if(voteCount >= votesRequired){
+ if (voteCount >= votesRequired) {
return RaftState.Leader;
}
return state();
}
- @Override protected RaftState state() {
+ @Override public RaftState state() {
return RaftState.Candidate;
}
@Override
public RaftState handleMessage(ActorRef sender, Object message) {
- if(message instanceof ElectionTimeout){
- if(votesRequired == 0){
+ if (message instanceof ElectionTimeout) {
+ if (votesRequired == 0) {
// If there are no peers then we should be a Leader
// We wait for the election timeout to occur before declare
// ourselves the leader. This gives enough time for a leader
}
- private void startNewTerm(){
+ private void startNewTerm() {
+
+
// set voteCount back to 1 (that is voting for self)
voteCount = 1;
// Increment the election term and vote for self
long currentTerm = context.getTermInformation().getCurrentTerm();
- context.getTermInformation().update(currentTerm+1, context.getId());
+ context.getTermInformation().update(currentTerm + 1, context.getId());
+
+ context.getLogger().debug("Starting new term " + (currentTerm+1));
// Request for a vote
- for(ActorSelection peerActor : peerToActor.values()){
+ for (ActorSelection peerActor : peerToActor.values()) {
peerActor.tell(new RequestVote(
context.getTermInformation().getCurrentTerm(),
- context.getId(), context.getReplicatedLog().last().getIndex(),
- context.getReplicatedLog().last().getTerm()),
- context.getActor());
+ context.getId(),
+ context.getReplicatedLog().lastIndex(),
+ context.getReplicatedLog().lastTerm()),
+ context.getActor()
+ );
}
}
+ @Override public void close() throws Exception {
+ stopElection();
+ }
}
import akka.actor.ActorRef;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries, RaftState suggestedState) {
+
+ // If we got here then we do appear to be talking to the leader
+ leaderId = appendEntries.getLeaderId();
+
+ // 2. Reply false if log doesn’t contain an entry at prevLogIndex
+ // whose term matches prevLogTerm (§5.3)
+ ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+ .get(appendEntries.getPrevLogIndex());
+
+
+ if (lastIndex() > -1 && previousEntry != null
+ && previousEntry.getTerm() != appendEntries
+ .getPrevLogTerm()) {
+
+ context.getLogger().debug(
+ "Cannot append entries because previous entry term "
+ + previousEntry.getTerm()
+ + " is not equal to append entries prevLogTerm "
+ + appendEntries.getPrevLogTerm());
+
+ sender.tell(
+ new AppendEntriesReply(context.getId(), currentTerm(), false,
+ lastIndex(), lastTerm()), actor()
+ );
+ return state();
+ }
+
+ if (appendEntries.getEntries() != null
+ && appendEntries.getEntries().size() > 0) {
+ context.getLogger().debug(
+ "Number of entries to be appended = " + appendEntries
+ .getEntries().size());
+
+ // 3. If an existing entry conflicts with a new one (same index
+ // but different terms), delete the existing entry and all that
+ // follow it (§5.3)
+ int addEntriesFrom = 0;
+ if (context.getReplicatedLog().size() > 0) {
+ for (int i = 0;
+ i < appendEntries.getEntries()
+ .size(); i++, addEntriesFrom++) {
+ ReplicatedLogEntry matchEntry =
+ appendEntries.getEntries().get(i);
+ ReplicatedLogEntry newEntry = context.getReplicatedLog()
+ .get(matchEntry.getIndex());
+
+ if (newEntry == null) {
+ //newEntry not found in the log
+ break;
+ }
+
+ if (newEntry != null && newEntry.getTerm() == matchEntry
+ .getTerm()) {
+ continue;
+ }
+ if (newEntry != null && newEntry.getTerm() != matchEntry
+ .getTerm()) {
+ context.getLogger().debug(
+ "Removing entries from log starting at "
+ + matchEntry.getIndex());
+ context.getReplicatedLog()
+ .removeFrom(matchEntry.getIndex());
+ break;
+ }
+ }
+ }
+
+ context.getLogger().debug(
+ "After cleanup entries to be added from = " + (addEntriesFrom
+ + lastIndex()));
+
+ // 4. Append any new entries not already in the log
+ for (int i = addEntriesFrom;
+ i < appendEntries.getEntries().size(); i++) {
+ context.getLogger().debug(
+ "Append entry to log " + appendEntries.getEntries().get(i)
+ .toString());
+ context.getReplicatedLog()
+ .appendAndPersist(appendEntries.getEntries().get(i));
+ }
+
+ context.getLogger().debug(
+ "Log size is now " + context.getReplicatedLog().size());
+ }
+
+
+ // 5. If leaderCommit > commitIndex, set commitIndex =
+ // min(leaderCommit, index of last new entry)
+
+ long prevCommitIndex = context.getCommitIndex();
+
+ context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
+ context.getReplicatedLog().lastIndex()));
+
+ if (prevCommitIndex != context.getCommitIndex()) {
+ context.getLogger()
+ .debug("Commit index set to " + context.getCommitIndex());
+ }
+
+ // If commitIndex > lastApplied: increment lastApplied, apply
+ // log[lastApplied] to state machine (§5.3)
+ if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+ applyLogToStateMachine(appendEntries.getLeaderCommit());
+ }
+
+ sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
+ lastIndex(), lastTerm()), actor());
+
return suggestedState;
}
return suggestedState;
}
- @Override protected RaftState state() {
+ @Override public RaftState state() {
return RaftState.Follower;
}
return super.handleMessage(sender, message);
}
+
+ @Override public void close() throws Exception {
+ stopElection();
+ }
}
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
+import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import scala.concurrent.duration.FiniteDuration;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
private Cancellable heartbeatCancel = null;
- public Leader(RaftActorContext context, List<String> followerPaths) {
+ private List<ClientRequestTracker> trackerList = new ArrayList<>();
+
+ private final int minReplicationCount;
+
+ public Leader(RaftActorContext context) {
super(context);
- for (String followerPath : followerPaths) {
+ if(lastIndex() >= 0) {
+ context.setCommitIndex(lastIndex());
+ }
+
+ for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(followerPath,
- new AtomicLong(0),
- new AtomicLong(0));
+ new FollowerLogInformationImpl(followerId,
+ new AtomicLong(lastIndex()),
+ new AtomicLong(-1));
- followerToActor.put(followerPath,
- context.actorSelection(followerLogInformation.getId()));
- followerToLog.put(followerPath, followerLogInformation);
+ followerToActor.put(followerId,
+ context.actorSelection(context.getPeerAddress(followerId)));
+
+ followerToLog.put(followerId, followerLogInformation);
+
+ }
+ if (followerToActor.size() > 0) {
+ minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
+ } else {
+ minReplicationCount = 0;
}
+
// Immediately schedule a heartbeat
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries, RaftState suggestedState) {
+
+ context.getLogger()
+ .error("An unexpected AppendEntries received in state " + state());
+
return suggestedState;
}
@Override protected RaftState handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
+
+ // Do not take any other action since a behavior change is coming
+ if (suggestedState != state())
+ return suggestedState;
+
+ // Update the FollowerLogInformation
+ String followerId = appendEntriesReply.getFollowerId();
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+ if (appendEntriesReply.isSuccess()) {
+ followerLogInformation
+ .setMatchIndex(appendEntriesReply.getLogLastIndex());
+ followerLogInformation
+ .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
+ } else {
+ followerLogInformation.decrNextIndex();
+ }
+
+ // Now figure out if this reply warrants a change in the commitIndex
+ // If there exists an N such that N > commitIndex, a majority
+ // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
+ // set commitIndex = N (§5.3, §5.4).
+ for (long N = context.getCommitIndex() + 1; ; N++) {
+ int replicatedCount = 1;
+
+ for (FollowerLogInformation info : followerToLog.values()) {
+ if (info.getMatchIndex().get() >= N) {
+ replicatedCount++;
+ }
+ }
+
+ if (replicatedCount >= minReplicationCount){
+ ReplicatedLogEntry replicatedLogEntry =
+ context.getReplicatedLog().get(N);
+ if (replicatedLogEntry != null
+ && replicatedLogEntry.getTerm()
+ == currentTerm()) {
+ context.setCommitIndex(N);
+ }
+ } else {
+ break;
+ }
+ }
+
+ if(context.getCommitIndex() > context.getLastApplied()){
+ applyLogToStateMachine(context.getCommitIndex());
+ }
+
return suggestedState;
}
+ protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+ for (ClientRequestTracker tracker : trackerList) {
+ if (tracker.getIndex() == logIndex) {
+ return tracker;
+ }
+ }
+
+ return null;
+ }
+
@Override protected RaftState handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply, RaftState suggestedState) {
return suggestedState;
}
- @Override protected RaftState state() {
+ @Override public RaftState state() {
return RaftState.Leader;
}
@Override public RaftState handleMessage(ActorRef sender, Object message) {
Preconditions.checkNotNull(sender, "sender should not be null");
- scheduleHeartBeat(HEART_BEAT_INTERVAL);
-
- if (message instanceof SendHeartBeat) {
- for (ActorSelection follower : followerToActor.values()) {
- follower.tell(new AppendEntries(
- context.getTermInformation().getCurrentTerm(),
- context.getId(),
- context.getReplicatedLog().last().getIndex(),
- context.getReplicatedLog().last().getTerm(),
- Collections.EMPTY_LIST, context.getCommitIndex()),
- context.getActor());
+ try {
+ if (message instanceof SendHeartBeat) {
+ return sendHeartBeat();
+ } else if (message instanceof Replicate) {
+
+ Replicate replicate = (Replicate) message;
+ long logIndex = replicate.getReplicatedLogEntry().getIndex();
+
+ context.getLogger().debug("Replicate message " + logIndex);
+
+ if (followerToActor.size() == 0) {
+ context.setCommitIndex(
+ replicate.getReplicatedLogEntry().getIndex());
+
+ context.getActor()
+ .tell(new ApplyState(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ replicate.getReplicatedLogEntry()),
+ context.getActor()
+ );
+ } else {
+
+ trackerList.add(
+ new ClientRequestTrackerImpl(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ logIndex)
+ );
+
+ ReplicatedLogEntry prevEntry =
+ context.getReplicatedLog().get(lastIndex() - 1);
+ long prevLogIndex = -1;
+ long prevLogTerm = -1;
+ if (prevEntry != null) {
+ prevLogIndex = prevEntry.getIndex();
+ prevLogTerm = prevEntry.getTerm();
+ }
+ // Send an AppendEntries to all followers
+ for (String followerId : followerToActor.keySet()) {
+ ActorSelection followerActor =
+ followerToActor.get(followerId);
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+ followerActor.tell(
+ new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex, prevLogTerm,
+ context.getReplicatedLog().getFrom(
+ followerLogInformation.getNextIndex()
+ .get()
+ ), context.getCommitIndex()
+ ),
+ actor()
+ );
+ }
+ }
}
- return state();
+ } finally {
+ scheduleHeartBeat(HEART_BEAT_INTERVAL);
}
+
return super.handleMessage(sender, message);
}
- private void scheduleHeartBeat(FiniteDuration interval) {
+ private RaftState sendHeartBeat() {
+ if (followerToActor.size() > 0) {
+ for (String follower : followerToActor.keySet()) {
+
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(follower);
+
+ AtomicLong nextIndex =
+ followerLogInformation.getNextIndex();
+
+ List<ReplicatedLogEntry> entries =
+ context.getReplicatedLog().getFrom(nextIndex.get());
+
+ followerToActor.get(follower).tell(new AppendEntries(
+ context.getTermInformation().getCurrentTerm(),
+ context.getId(),
+ context.getReplicatedLog().lastIndex(),
+ context.getReplicatedLog().lastTerm(),
+ entries, context.getCommitIndex()),
+ context.getActor()
+ );
+ }
+ }
+ return state();
+ }
+
+ private void stopHeartBeat() {
if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
heartbeatCancel.cancel();
}
+ }
+
+ private void scheduleHeartBeat(FiniteDuration interval) {
+ stopHeartBeat();
// Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
// message is sent to itself.
// need to be sent if there are other messages being sent to the remote
// actor.
heartbeatCancel =
- context.getActorSystem().scheduler().scheduleOnce(interval,
+ context.getActorSystem().scheduler().scheduleOnce(
+ interval,
context.getActor(), new SendHeartBeat(),
context.getActorSystem().dispatcher(), context.getActor());
}
+ @Override public void close() throws Exception {
+ stopHeartBeat();
+ }
+
+ @Override public String getLeaderId() {
+ return context.getId();
+ }
+
}
* In each of these behaviors the Raft Actor handles the same Raft messages
* differently.
*/
-public interface RaftActorBehavior {
+public interface RaftActorBehavior extends AutoCloseable{
/**
* Handle a message. If the processing of the message warrants a state
* change then a new state should be returned otherwise this method should
* @return The new state or self (this)
*/
RaftState handleMessage(ActorRef sender, Object message);
+
+ /**
+ * The state associated with a given behavior
+ *
+ * @return
+ */
+ RaftState state();
+
+ /**
+ *
+ * @return
+ */
+ String getLeaderId();
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+public class FindLeader {
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+public class FindLeaderReply {
+ private final String leaderActor;
+
+ public FindLeaderReply(String leaderActor) {
+ this.leaderActor = leaderActor;
+ }
+
+ public String getLeaderActor() {
+ return leaderActor;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
+public class ApplyState {
+ private final ActorRef clientActor;
+ private final String identifier;
+ private final ReplicatedLogEntry replicatedLogEntry;
+
+ public ApplyState(ActorRef clientActor, String identifier,
+ ReplicatedLogEntry replicatedLogEntry) {
+ this.clientActor = clientActor;
+ this.identifier = identifier;
+ this.replicatedLogEntry = replicatedLogEntry;
+ }
+
+ public ActorRef getClientActor() {
+ return clientActor;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public ReplicatedLogEntry getReplicatedLogEntry() {
+ return replicatedLogEntry;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
+public class Replicate {
+ private final ActorRef clientActor;
+ private final String identifier;
+ private final ReplicatedLogEntry replicatedLogEntry;
+
+ public Replicate(ActorRef clientActor, String identifier,
+ ReplicatedLogEntry replicatedLogEntry) {
+
+ this.clientActor = clientActor;
+ this.identifier = identifier;
+ this.replicatedLogEntry = replicatedLogEntry;
+ }
+
+ public ActorRef getClientActor() {
+ return clientActor;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public ReplicatedLogEntry getReplicatedLogEntry() {
+ return replicatedLogEntry;
+ }
+}
public long getLeaderCommit() {
return leaderCommit;
}
+
+ @Override public String toString() {
+ return "AppendEntries{" +
+ "leaderId='" + leaderId + '\'' +
+ ", prevLogIndex=" + prevLogIndex +
+ ", prevLogTerm=" + prevLogTerm +
+ ", entries=" + entries +
+ ", leaderCommit=" + leaderCommit +
+ '}';
+ }
}
// prevLogIndex and prevLogTerm
private final boolean success;
- public AppendEntriesReply(long term, boolean success) {
+ // The index of the last entry in the followers log
+ // This will be used to set the matchIndex for the follower on the
+ // Leader
+ private final long logLastIndex;
+
+ private final long logLastTerm;
+
+ // The followerId - this will be used to figure out which follower is
+ // responding
+ private final String followerId;
+
+ public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm) {
super(term);
+
+ this.followerId = followerId;
this.success = success;
+ this.logLastIndex = logLastIndex;
+ this.logLastTerm = logLastTerm;
}
public long getTerm() {
public boolean isSuccess() {
return success;
}
+
+ public long getLogLastIndex() {
+ return logLastIndex;
+ }
+
+ public long getLogLastTerm() {
+ return logLastTerm;
+ }
+
+ public String getFollowerId() {
+ return followerId;
+ }
}
--- /dev/null
+akka {
+ loglevel = "DEBUG"
+ actor {
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ }
+
+ serialization-bindings {
+ "org.opendaylight.controller.cluster.raft.RaftActor$ReplicatedLogImplEntry" = java
+ }
+ }
+}
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class MockRaftActorContext implements RaftActorContext {
private long lastApplied = 0;
private final ElectionTerm electionTerm;
private ReplicatedLog replicatedLog;
+ private Map<String, String> peerAddresses = new HashMap();
public MockRaftActorContext(){
electionTerm = null;
return this.system;
}
+ @Override public LoggingAdapter getLogger() {
+ return Logging.getLogger(system, this);
+ }
+
+ @Override public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ @Override public String getPeerAddress(String peerId) {
+ return peerAddresses.get(peerId);
+ }
+
+ public void setPeerAddresses(Map<String, String> peerAddresses) {
+ this.peerAddresses = peerAddresses;
+ }
+
public static class MockReplicatedLog implements ReplicatedLog {
private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, "");
return last;
}
+ @Override public long lastIndex() {
+ return last.getIndex();
+ }
+
+ @Override public long lastTerm() {
+ return last.getTerm();
+ }
+
@Override public void removeFrom(long index) {
}
@Override public void append(ReplicatedLogEntry replicatedLogEntry) {
}
+ @Override public void appendAndPersist(
+ ReplicatedLogEntry replicatedLogEntry) {
+ }
+
+ @Override public List<ReplicatedLogEntry> getFrom(long index) {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override public long size() {
+ if(replicatedLogEntry != null){
+ return 1;
+ }
+ return 0;
+ }
+
public void setReplicatedLogEntry(
ReplicatedLogEntry replicatedLogEntry) {
this.replicatedLogEntry = replicatedLogEntry;
private final List<ReplicatedLogEntry> log = new ArrayList<>(10000);
@Override public ReplicatedLogEntry get(long index) {
+ if(index >= log.size() || index < 0){
+ return null;
+ }
return log.get((int) index);
}
@Override public ReplicatedLogEntry last() {
+ if(log.size() == 0){
+ return null;
+ }
return log.get(log.size()-1);
}
+ @Override public long lastIndex() {
+ if(log.size() == 0){
+ return -1;
+ }
+
+ return last().getIndex();
+ }
+
+ @Override public long lastTerm() {
+ if(log.size() == 0){
+ return -1;
+ }
+
+ return last().getTerm();
+ }
+
@Override public void removeFrom(long index) {
for(int i=(int) index ; i < log.size() ; i++) {
log.remove(i);
@Override public void append(ReplicatedLogEntry replicatedLogEntry) {
log.add(replicatedLogEntry);
}
+
+ @Override public void appendAndPersist(
+ ReplicatedLogEntry replicatedLogEntry) {
+ append(replicatedLogEntry);
+ }
+
+ @Override public List<ReplicatedLogEntry> getFrom(long index) {
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ for(int i=(int) index ; i < log.size() ; i++) {
+ entries.add(get(i));
+ }
+ return entries;
+ }
+
+ @Override public long size() {
+ return log.size();
+ }
}
public static class MockReplicatedLogEntry implements ReplicatedLogEntry {
import java.util.List;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
}};
}
- /**
- * This test verifies that when an AppendEntries RPC is received by a RaftActor
- * with a commitIndex that is greater than what has been applied to the
- * state machine of the RaftActor, the RaftActor applies the state and
- * sets it current applied state to the commitIndex of the sender.
- *
- * @throws Exception
- */
- @Test
- public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
- new JavaTestKit(getSystem()) {{
-
- RaftActorContext context =
- createActorContext();
-
- context.setLastApplied(100);
- setLastLogEntry((MockRaftActorContext) context, 0, 0, "");
-
- // The new commitIndex is 101
- AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
-
- RaftState raftState =
- createBehavior(context).handleMessage(getRef(), appendEntries);
-
- assertEquals(101L, context.getLastApplied());
-
- }};
- }
/**
* This test verifies that when an AppendEntries is received with a term that
}};
}
- /**
- * This test verifies that when an AppendEntries is received a specific prevLogTerm
- * which does not match the term that is in RaftActors log entry at prevLogIndex
- * then the RaftActor does not change it's state and it returns a failure.
- *
- * @throws Exception
- */
- @Test
- public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
- throws Exception {
- new JavaTestKit(getSystem()) {{
-
- MockRaftActorContext context = (MockRaftActorContext)
- createActorContext();
-
- // First set the receivers term to lower number
- context.getTermInformation().update(95, "test");
-
- // Set the last log entry term for the receiver to be greater than
- // what we will be sending as the prevLogTerm in AppendEntries
- MockRaftActorContext.MockReplicatedLog mockReplicatedLog =
- setLastLogEntry(context, 20, 0, "");
-
- // Also set the entry at index 0 with term 20 which will be greater
- // than the prevLogTerm sent by the sender
- mockReplicatedLog.setReplicatedLogEntry(
- new MockRaftActorContext.MockReplicatedLogEntry(20, 0, ""));
-
- // AppendEntries is now sent with a bigger term
- // this will set the receivers term to be the same as the sender's term
- AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
-
- RaftActorBehavior behavior = createBehavior(context);
-
- // Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
-
- RaftState raftState =
- behavior.handleMessage(getRef(), appendEntries);
-
- assertEquals(expected, raftState);
-
- // Also expect an AppendEntriesReply to be sent where success is false
- final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
- "AppendEntriesReply") {
- // do not put code outside this method, will run afterwards
- protected Boolean match(Object in) {
- if (in instanceof AppendEntriesReply) {
- AppendEntriesReply reply = (AppendEntriesReply) in;
- return reply.isSuccess();
- } else {
- throw noMatch();
- }
- }
- }.get();
-
- assertEquals(false, out);
-
- }};
- }
-
- /**
- * This test verifies that when a new AppendEntries message is received with
- * new entries and the logs of the sender and receiver match that the new
- * entries get added to the log and the log is incremented by the number of
- * entries received in appendEntries
- *
- * @throws Exception
- */
@Test
- public void testHandleAppendEntriesAddNewEntries() throws Exception {
- new JavaTestKit(getSystem()) {{
+ public void testHandleAppendEntriesAddSameEntryToLog(){
+ new JavaTestKit(getSystem()) {
+ {
- MockRaftActorContext context = (MockRaftActorContext)
- createActorContext();
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
- // First set the receivers term to lower number
- context.getTermInformation().update(1, "test");
-
- // Prepare the receivers log
- MockRaftActorContext.SimpleReplicatedLog log =
- new MockRaftActorContext.SimpleReplicatedLog();
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
-
- context.setReplicatedLog(log);
-
- // Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 3, "three"));
- entries.add(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 4, "four"));
-
- // Send appendEntries with the same term as was set on the receiver
- // before the new behavior was created (1 in this case)
- // This will not work for a Candidate because as soon as a Candidate
- // is created it increments the term
- AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 2, 1, entries, 101);
+ // First set the receivers term to lower number
+ context.getTermInformation().update(2, "test");
- RaftActorBehavior behavior = createBehavior(context);
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
- if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
- // Resetting the Candidates term to make sure it will match
- // the term sent by AppendEntries. If this was not done then
- // the test will fail because the Candidate will assume that
- // the message was sent to it from a lower term peer and will
- // thus respond with a failure
- context.getTermInformation().update(1, "test");
- }
+ context.setReplicatedLog(log);
- // Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
- RaftState raftState =
- behavior.handleMessage(getRef(), appendEntries);
+ AppendEntries appendEntries =
+ new AppendEntries(2, "leader-1", -1, 1, entries, 0);
- assertEquals(expected, raftState);
- assertEquals(5, log.last().getIndex() + 1);
- assertNotNull(log.get(3));
- assertNotNull(log.get(4));
+ RaftActorBehavior behavior = createBehavior(context);
- // Also expect an AppendEntriesReply to be sent where success is false
- final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
- "AppendEntriesReply") {
- // do not put code outside this method, will run afterwards
- protected Boolean match(Object in) {
- if (in instanceof AppendEntriesReply) {
- AppendEntriesReply reply = (AppendEntriesReply) in;
- return reply.isSuccess();
- } else {
- throw noMatch();
- }
+ if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
+ // Resetting the Candidates term to make sure it will match
+ // the term sent by AppendEntries. If this was not done then
+ // the test will fail because the Candidate will assume that
+ // the message was sent to it from a lower term peer and will
+ // thus respond with a failure
+ context.getTermInformation().update(2, "test");
}
- }.get();
- assertEquals(true, out);
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftState raftState =
+ behavior.handleMessage(getRef(), appendEntries);
- }};
- }
+ assertEquals(expected, raftState);
- /**
- * This test verifies that when a new AppendEntries message is received with
- * new entries and the logs of the sender and receiver are out-of-sync that
- * the log is first corrected by removing the out of sync entries from the
- * log and then adding in the new entries sent with the AppendEntries message
- *
- * @throws Exception
- */
- @Test
- public void testHandleAppendEntriesCorrectReceiverLogEntries()
- throws Exception {
- new JavaTestKit(getSystem()) {{
+ assertEquals(1, log.size());
- MockRaftActorContext context = (MockRaftActorContext)
- createActorContext();
- // First set the receivers term to lower number
- context.getTermInformation().update(2, "test");
-
- // Prepare the receivers log
- MockRaftActorContext.SimpleReplicatedLog log =
- new MockRaftActorContext.SimpleReplicatedLog();
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
-
- context.setReplicatedLog(log);
-
- // Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(
- new MockRaftActorContext.MockReplicatedLogEntry(2, 2, "two-1"));
- entries.add(
- new MockRaftActorContext.MockReplicatedLogEntry(2, 3, "three"));
-
- // Send appendEntries with the same term as was set on the receiver
- // before the new behavior was created (1 in this case)
- // This will not work for a Candidate because as soon as a Candidate
- // is created it increments the term
- AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", 1, 1, entries, 101);
-
- RaftActorBehavior behavior = createBehavior(context);
-
- if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
- // Resetting the Candidates term to make sure it will match
- // the term sent by AppendEntries. If this was not done then
- // the test will fail because the Candidate will assume that
- // the message was sent to it from a lower term peer and will
- // thus respond with a failure
- context.getTermInformation().update(2, "test");
- }
-
- // Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
-
- RaftState raftState =
- behavior.handleMessage(getRef(), appendEntries);
-
- assertEquals(expected, raftState);
-
- // The entry at index 2 will be found out-of-sync with the leader
- // and will be removed
- // Then the two new entries will be added to the log
- // Thus making the log to have 4 entries
- assertEquals(4, log.last().getIndex() + 1);
- assertNotNull(log.get(2));
-
- // Check that the entry at index 2 has the new data
- assertEquals("two-1", log.get(2).getData());
- assertNotNull(log.get(3));
-
- // Also expect an AppendEntriesReply to be sent where success is false
- final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
- "AppendEntriesReply") {
- // do not put code outside this method, will run afterwards
- protected Boolean match(Object in) {
- if (in instanceof AppendEntriesReply) {
- AppendEntriesReply reply = (AppendEntriesReply) in;
- return reply.isSuccess();
- } else {
- throw noMatch();
- }
- }
- }.get();
-
- assertEquals(true, out);
-
-
- }};
+ }};
}
/**
}
protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
- return new AppendEntriesReply(100, false);
+ return new AppendEntriesReply("follower-1", 100, false, 100, 100);
}
protected RequestVote createRequestVoteWithNewerTerm() {
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import junit.framework.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
private final ActorRef peerActor4 = getSystem().actorOf(Props.create(
DoNothingActor.class));
+ private final Map<String, String> onePeer = new HashMap<>();
+ private final Map<String, String> twoPeers = new HashMap<>();
+ private final Map<String, String> fourPeers = new HashMap<>();
+
+ @Before
+ public void setUp(){
+ onePeer.put(peerActor1.path().toString(),
+ peerActor1.path().toString());
+
+ twoPeers.put(peerActor1.path().toString(),
+ peerActor1.path().toString());
+ twoPeers.put(peerActor2.path().toString(),
+ peerActor2.path().toString());
+
+ fourPeers.put(peerActor1.path().toString(),
+ peerActor1.path().toString());
+ fourPeers.put(peerActor2.path().toString(),
+ peerActor2.path().toString());
+ fourPeers.put(peerActor3.path().toString(),
+ peerActor3.path().toString());
+ fourPeers.put(peerActor4.path().toString(),
+ peerActor3.path().toString());
+
+
+ }
+
@Test
public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){
RaftActorContext raftActorContext = createActorContext();
long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm();
- new Candidate(raftActorContext, Collections.EMPTY_LIST);
+ new Candidate(raftActorContext);
assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm());
assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor());
new Within(duration("1 seconds")) {
protected void run() {
- Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+ Candidate candidate = new Candidate(createActorContext(getTestActor()));
final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
// do not put code outside this method, will run afterwards
public void testHandleElectionTimeoutWhenThereAreZeroPeers(){
RaftActorContext raftActorContext = createActorContext();
Candidate candidate =
- new Candidate(raftActorContext, Collections.EMPTY_LIST);
+ new Candidate(raftActorContext);
RaftState raftState =
candidate.handleMessage(candidateActor, new ElectionTimeout());
}
@Test
- public void testHandleElectionTimeoutWhenThereAreTwoPeers(){
- RaftActorContext raftActorContext = createActorContext();
+ public void testHandleElectionTimeoutWhenThereAreTwoNodesInCluster(){
+ MockRaftActorContext raftActorContext =
+ (MockRaftActorContext) createActorContext();
+ raftActorContext.setPeerAddresses(onePeer);
Candidate candidate =
- new Candidate(raftActorContext, Arrays
- .asList(peerActor1.path().toString(),
- peerActor2.path().toString()));
+ new Candidate(raftActorContext);
RaftState raftState =
candidate.handleMessage(candidateActor, new ElectionTimeout());
}
@Test
- public void testBecomeLeaderOnReceivingMajorityVotesInThreePeerCluster(){
- RaftActorContext raftActorContext = createActorContext();
+ public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodesInCluster(){
+ MockRaftActorContext raftActorContext =
+ (MockRaftActorContext) createActorContext();
+ raftActorContext.setPeerAddresses(twoPeers);
Candidate candidate =
- new Candidate(raftActorContext, Arrays
- .asList(peerActor1.path().toString(),
- peerActor2.path().toString()));
+ new Candidate(raftActorContext);
RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
}
@Test
- public void testBecomeLeaderOnReceivingMajorityVotesInFivePeerCluster(){
- RaftActorContext raftActorContext = createActorContext();
+ public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodesInCluster(){
+ MockRaftActorContext raftActorContext =
+ (MockRaftActorContext) createActorContext();
+ raftActorContext.setPeerAddresses(fourPeers);
Candidate candidate =
- new Candidate(raftActorContext, Arrays
- .asList(peerActor1.path().toString(),
- peerActor2.path().toString(),
- peerActor3.path().toString()));
+ new Candidate(raftActorContext);
RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
- RaftState stateOnSecondVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+ RaftState stateOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true));
Assert.assertEquals(RaftState.Candidate, stateOnFirstVote);
Assert.assertEquals(RaftState.Leader, stateOnSecondVote);
new Within(duration("1 seconds")) {
protected void run() {
- Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+ Candidate candidate = new Candidate(createActorContext(getTestActor()));
candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.EMPTY_LIST, 0));
new Within(duration("1 seconds")) {
protected void run() {
- Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+ Candidate candidate = new Candidate(createActorContext(getTestActor()));
candidate.handleMessage(getTestActor(), new RequestVote(0, "test", 0, 0));
@Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
- return new Candidate(actorContext, Collections.EMPTY_LIST);
+ return new Candidate(actorContext);
}
@Override protected RaftActorContext createActorContext() {
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import java.util.ArrayList;
+import java.util.List;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
public class FollowerTest extends AbstractRaftActorBehaviorTest {
}};
}
+ /**
+ * This test verifies that when an AppendEntries RPC is received by a RaftActor
+ * with a commitIndex that is greater than what has been applied to the
+ * state machine of the RaftActor, the RaftActor applies the state and
+ * sets it current applied state to the commitIndex of the sender.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ RaftActorContext context =
+ createActorContext();
+
+ context.setLastApplied(100);
+ setLastLogEntry((MockRaftActorContext) context, 0, 0, "");
+
+ // The new commitIndex is 101
+ AppendEntries appendEntries =
+ new AppendEntries(100, "leader-1", 0, 0, null, 101);
+
+ RaftState raftState =
+ createBehavior(context).handleMessage(getRef(), appendEntries);
+
+ assertEquals(101L, context.getLastApplied());
+
+ }};
+ }
+
+ /**
+ * This test verifies that when an AppendEntries is received a specific prevLogTerm
+ * which does not match the term that is in RaftActors log entry at prevLogIndex
+ * then the RaftActor does not change it's state and it returns a failure.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
+ throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+ // First set the receivers term to lower number
+ context.getTermInformation().update(95, "test");
+
+ // Set the last log entry term for the receiver to be greater than
+ // what we will be sending as the prevLogTerm in AppendEntries
+ MockRaftActorContext.MockReplicatedLog mockReplicatedLog =
+ setLastLogEntry(context, 20, 0, "");
+
+ // Also set the entry at index 0 with term 20 which will be greater
+ // than the prevLogTerm sent by the sender
+ mockReplicatedLog.setReplicatedLogEntry(
+ new MockRaftActorContext.MockReplicatedLogEntry(20, 0, ""));
+
+ // AppendEntries is now sent with a bigger term
+ // this will set the receivers term to be the same as the sender's term
+ AppendEntries appendEntries =
+ new AppendEntries(100, "leader-1", 0, 0, null, 101);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftState raftState =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftState);
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+
+
+ }};
+ }
+
+
+
+ /**
+ * This test verifies that when a new AppendEntries message is received with
+ * new entries and the logs of the sender and receiver match that the new
+ * entries get added to the log and the log is incremented by the number of
+ * entries received in appendEntries
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleAppendEntriesAddNewEntries() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+ // First set the receivers term to lower number
+ context.getTermInformation().update(1, "test");
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 3, "three"));
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 4, "four"));
+
+ // Send appendEntries with the same term as was set on the receiver
+ // before the new behavior was created (1 in this case)
+ // This will not work for a Candidate because as soon as a Candidate
+ // is created it increments the term
+ AppendEntries appendEntries =
+ new AppendEntries(1, "leader-1", 2, 1, entries, 4);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftState raftState =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftState);
+ assertEquals(5, log.last().getIndex() + 1);
+ assertNotNull(log.get(3));
+ assertNotNull(log.get(4));
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+
+
+ }};
+ }
+
+
+
+ /**
+ * This test verifies that when a new AppendEntries message is received with
+ * new entries and the logs of the sender and receiver are out-of-sync that
+ * the log is first corrected by removing the out of sync entries from the
+ * log and then adding in the new entries sent with the AppendEntries message
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleAppendEntriesCorrectReceiverLogEntries()
+ throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+ // First set the receivers term to lower number
+ context.getTermInformation().update(2, "test");
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(2, 2, "two-1"));
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(2, 3, "three"));
+
+ // Send appendEntries with the same term as was set on the receiver
+ // before the new behavior was created (1 in this case)
+ // This will not work for a Candidate because as soon as a Candidate
+ // is created it increments the term
+ AppendEntries appendEntries =
+ new AppendEntries(2, "leader-1", 1, 1, entries, 3);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftState raftState =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftState);
+
+ // The entry at index 2 will be found out-of-sync with the leader
+ // and will be removed
+ // Then the two new entries will be added to the log
+ // Thus making the log to have 4 entries
+ assertEquals(4, log.last().getIndex() + 1);
+ assertNotNull(log.get(2));
+
+ // Check that the entry at index 2 has the new data
+ assertEquals("two-1", log.get(2).getData());
+ assertNotNull(log.get(3));
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+
+
+ }};
+ }
+
}
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
public void testHandleMessageForUnknownMessage() throws Exception {
new JavaTestKit(getSystem()) {{
Leader leader =
- new Leader(createActorContext(), Collections.EMPTY_LIST);
+ new Leader(createActorContext());
// handle message should return the Leader state when it receives an
// unknown message
ActorRef followerActor = getTestActor();
- List<String> followers = new ArrayList();
+ MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
- followers.add(followerActor.path().toString());
+ Map<String, String> peerAddresses = new HashMap();
- Leader leader = new Leader(createActorContext(), followers);
+ peerAddresses.put(followerActor.path().toString(), followerActor.path().toString());
+
+ actorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(actorContext);
leader.handleMessage(senderActor, new SendHeartBeat());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
}
@Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
- return new Leader(actorContext, Collections.EMPTY_LIST);
+ return new Leader(actorContext);
}
@Override protected RaftActorContext createActorContext() {