Test RaftActor using a simple program 98/8998/7
authorMoiz Raja <moraja@cisco.com>
Mon, 14 Jul 2014 01:52:50 +0000 (18:52 -0700)
committerMoiz Raja <moraja@cisco.com>
Sat, 26 Jul 2014 22:18:09 +0000 (15:18 -0700)
Change-Id: Id0a8091c8f6ee89178bf774be398647c60a25d03
Signed-off-by: Moiz Raja <moraja@cisco.com>
30 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplyState.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java
new file mode 100644 (file)
index 0000000..2560f16
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
+
+public class ClientActor extends UntypedActor {
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
+
+    private final ActorRef target;
+
+    public ClientActor(ActorRef target){
+        this.target = target;
+    }
+
+    public static Props props(final ActorRef target){
+        return Props.create(new Creator<ClientActor>(){
+
+            @Override public ClientActor create() throws Exception {
+                return new ClientActor(target);
+            }
+        });
+    }
+
+    @Override public void onReceive(Object message) throws Exception {
+        if(message instanceof KeyValue) {
+            target.tell(message, getSelf());
+        } else if(message instanceof KeyValueSaved){
+            LOG.info("KeyValue saved");
+        }
+    }
+}
index 3c8e12b..35a2c98 100644 (file)
@@ -8,27 +8,68 @@
 
 package org.opendaylight.controller.cluster.example;
 
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * A sample actor showing how the RaftActor is to be extended
  */
 public class ExampleActor extends RaftActor {
-    public ExampleActor(String id) {
-        super(id);
+
+    private final Map<String, String> state = new HashMap();
+
+    private long persistIdentifier = 1;
+
+
+    public ExampleActor(String id, Map<String, String> peerAddresses) {
+        super(id, peerAddresses);
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses){
+        return Props.create(new Creator<ExampleActor>(){
+
+            @Override public ExampleActor create() throws Exception {
+                return new ExampleActor(id, peerAddresses);
+            }
+        });
     }
 
     @Override public void onReceiveCommand(Object message){
-        /*
-            Here the extended class does whatever it needs to do.
-            If it cannot handle a message then it passes it on to the super
-            class for handling
-         */
+        if(message instanceof KeyValue){
+
+            if(isLeader()) {
+                String persistId = Long.toString(persistIdentifier++);
+                persistData(getSender(), persistId, message);
+            } else {
+                getLeader().forward(message, getContext());
+            }
+        }
         super.onReceiveCommand(message);
     }
 
+    @Override protected void applyState(ActorRef clientActor, String identifier,
+        Object data) {
+        if(data instanceof KeyValue){
+            KeyValue kv = (KeyValue) data;
+            state.put(kv.getKey(), kv.getValue());
+            if(clientActor != null) {
+                clientActor.tell(new KeyValueSaved(), getSelf());
+            }
+        }
+    }
+
     @Override public void onReceiveRecover(Object message) {
         super.onReceiveRecover(message);
     }
 
+    @Override public String persistenceId() {
+        return getId();
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java
new file mode 100644 (file)
index 0000000..27d083f
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Main {
+    private static final ActorSystem actorSystem = ActorSystem.create();
+    // Create three example actors
+    private static Map<String, String> allPeers = new HashMap<>();
+
+    static {
+        allPeers.put("example-1", "akka://default/user/example-1");
+        allPeers.put("example-2", "akka://default/user/example-2");
+        allPeers.put("example-3", "akka://default/user/example-3");
+    }
+
+    public static void main(String[] args) throws Exception{
+        ActorRef example1Actor =
+            actorSystem.actorOf(ExampleActor.props("example-1",
+                withoutPeer("example-1")), "example-1");
+
+        ActorRef example2Actor =
+            actorSystem.actorOf(ExampleActor.props("example-2",
+                withoutPeer("example-2")), "example-2");
+
+        ActorRef example3Actor =
+            actorSystem.actorOf(ExampleActor.props("example-3",
+                withoutPeer("example-3")), "example-3");
+
+        ActorRef clientActor = actorSystem.actorOf(ClientActor.props(example1Actor));
+        BufferedReader br =
+            new BufferedReader(new InputStreamReader(System.in));
+
+        while(true) {
+            System.out.print("Enter Integer (0 to exit):");
+            try {
+                int i = Integer.parseInt(br.readLine());
+                if(i == 0){
+                    System.exit(0);
+                }
+                clientActor.tell(new KeyValue("key " + i, "value " + i), null);
+            } catch (NumberFormatException nfe) {
+                System.err.println("Invalid Format!");
+            }
+        }
+    }
+
+    private static Map<String, String> withoutPeer(String peerId) {
+        Map<String, String> without = new HashMap<>(allPeers);
+        without.remove(peerId);
+        return without;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java
new file mode 100644 (file)
index 0000000..05e9ba7
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example.messages;
+
+import java.io.Serializable;
+
+public class KeyValue implements Serializable{
+    private final String key;
+    private final String value;
+
+    public KeyValue(String key, String value){
+        this.key = key;
+        this.value = value;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public String getValue() {
+        return value;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java
new file mode 100644 (file)
index 0000000..e10e5a7
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example.messages;
+
+public class KeyValueSaved {
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java
new file mode 100644 (file)
index 0000000..4972b34
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+public interface ClientRequestTracker {
+    /**
+     * The client actor who is waiting for a response
+     *
+     * @return
+     */
+    ActorRef getClientActor();
+
+    /**
+     *
+     * @return
+     */
+    String getIdentifier();
+
+    /**
+     * The index of the log entry which needs to be replicated
+     *
+     * @return
+     */
+    long getIndex();
+
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java
new file mode 100644 (file)
index 0000000..15de6d0
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+public class ClientRequestTrackerImpl implements ClientRequestTracker {
+
+    private final ActorRef clientActor;
+    private final String identifier;
+    private final long logIndex;
+
+    public ClientRequestTrackerImpl(ActorRef clientActor, String identifier,
+        long logIndex) {
+
+        this.clientActor = clientActor;
+
+        this.identifier = identifier;
+
+        this.logIndex = logIndex;
+    }
+
+    @Override public ActorRef getClientActor() {
+        return clientActor;
+    }
+
+    @Override public long getIndex() {
+        return logIndex;
+    }
+
+    public String getIdentifier() {
+        return identifier;
+    }
+}
index 5f185cb..f3de983 100644 (file)
@@ -21,12 +21,26 @@ public interface FollowerLogInformation {
      */
     public long incrNextIndex();
 
+    /**
+     * Decrement the value of the nextIndex
+     * @return
+     */
+    public long decrNextIndex();
+
+    /**
+     *
+     * @param nextIndex
+     */
+    void setNextIndex(long nextIndex);
+
     /**
      * Increment the value of the matchIndex
      * @return
      */
     public long incrMatchIndex();
 
+    public void setMatchIndex(long matchIndex);
+
     /**
      * The identifier of the follower
      * This could simply be the url of the remote actor
index 6aa7650..94f9a53 100644 (file)
@@ -29,10 +29,22 @@ public class FollowerLogInformationImpl implements FollowerLogInformation{
         return nextIndex.incrementAndGet();
     }
 
+    @Override public long decrNextIndex() {
+        return nextIndex.decrementAndGet();
+    }
+
+    @Override public void setNextIndex(long nextIndex) {
+        this.nextIndex.set(nextIndex);
+    }
+
     public long incrMatchIndex(){
         return matchIndex.incrementAndGet();
     }
 
+    @Override public void setMatchIndex(long matchIndex) {
+        this.matchIndex.set(matchIndex);
+    }
+
     public String getId() {
         return id;
     }
index 186de02..826faf7 100644 (file)
@@ -8,13 +8,26 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import akka.persistence.UntypedEventsourcedProcessor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.UntypedPersistentActor;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
 
-import java.util.Collections;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
@@ -58,7 +71,9 @@ import java.util.Collections;
  *
  * <a href="http://doc.akka.io/api/akka/2.3.3/index.html#akka.persistence.UntypedEventsourcedProcessor">UntypeEventSourceProcessor</a>
  */
-public abstract class RaftActor extends UntypedEventsourcedProcessor {
+public abstract class RaftActor extends UntypedPersistentActor {
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
 
     /**
      *  The current state determines the current behavior of a RaftActor
@@ -72,51 +87,226 @@ public abstract class RaftActor extends UntypedEventsourcedProcessor {
      */
     private RaftActorContext context;
 
-    public RaftActor(String id){
+    /**
+     * The in-memory journal
+     */
+    private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
+
+
+
+    public RaftActor(String id, Map<String, String> peerAddresses){
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(),
-            id, new ElectionTermImpl(id),
-            0, 0, new ReplicatedLogImpl());
+            id, new ElectionTermImpl(getSelf().path().toString()),
+            -1, -1, replicatedLog, peerAddresses, LOG);
         currentBehavior = switchBehavior(RaftState.Follower);
     }
 
     @Override public void onReceiveRecover(Object message) {
-        throw new UnsupportedOperationException("onReceiveRecover");
+        if(message instanceof ReplicatedLogEntry) {
+            replicatedLog.append((ReplicatedLogEntry) message);
+        } else if(message instanceof RecoveryCompleted){
+            LOG.debug("Log now has messages to index : " + replicatedLog.lastIndex());
+        }
     }
 
     @Override public void onReceiveCommand(Object message) {
-        RaftState state = currentBehavior.handleMessage(getSender(), message);
-        currentBehavior = switchBehavior(state);
+        if(message instanceof ApplyState){
+
+            ApplyState applyState = (ApplyState)  message;
+
+            LOG.debug("Applying state for log index {}", applyState.getReplicatedLogEntry().getIndex());
+
+            applyState(applyState.getClientActor(), applyState.getIdentifier(),
+                applyState.getReplicatedLogEntry().getData());
+        } else if(message instanceof FindLeader){
+            getSender().tell(new FindLeaderReply(
+                context.getPeerAddress(currentBehavior.getLeaderId())),
+                getSelf());
+        } else {
+            RaftState state =
+                currentBehavior.handleMessage(getSender(), message);
+            currentBehavior = switchBehavior(state);
+        }
     }
 
     private RaftActorBehavior switchBehavior(RaftState state){
+        if(currentBehavior != null) {
+            if (currentBehavior.state() == state) {
+                return currentBehavior;
+            }
+            LOG.info("Switching from state " + currentBehavior.state() + " to "
+                + state);
+
+            try {
+                currentBehavior.close();
+            } catch (Exception e) {
+                LOG.error(e, "Failed to close behavior : " + currentBehavior.state());
+            }
+
+        } else {
+            LOG.info("Switching behavior to " + state);
+        }
         RaftActorBehavior behavior = null;
         if(state == RaftState.Candidate){
-            behavior = new Candidate(context, Collections.EMPTY_LIST);
+            behavior = new Candidate(context);
         } else if(state == RaftState.Follower){
             behavior = new Follower(context);
         } else {
-            behavior = new Leader(context, Collections.EMPTY_LIST);
+            behavior = new Leader(context);
         }
         return behavior;
     }
 
+    /**
+     * When a derived RaftActor needs to persist something it must call
+     * persistData.
+     *
+     * @param clientActor
+     * @param identifier
+     * @param data
+     */
+    protected void persistData(ActorRef clientActor, String identifier, Object data){
+        LOG.debug("Persist data " + identifier);
+        ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+            context.getReplicatedLog().lastIndex() + 1,
+            context.getTermInformation().getCurrentTerm(), data);
+
+        replicatedLog.appendAndPersist(clientActor, identifier, replicatedLogEntry);
+    }
+
+    protected abstract void applyState(ActorRef clientActor, String identifier, Object data);
+
+    protected String getId(){
+        return context.getId();
+    }
+
+    protected boolean isLeader(){
+        return context.getId().equals(currentBehavior.getLeaderId());
+    }
+
+    protected ActorSelection getLeader(){
+        String leaderId = currentBehavior.getLeaderId();
+        String peerAddress = context.getPeerAddress(leaderId);
+        LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + peerAddress);
+        return context.actorSelection(peerAddress);
+    }
+
     private class ReplicatedLogImpl implements ReplicatedLog {
+        private final List<ReplicatedLogEntry> journal = new ArrayList();
+        private long snapshotIndex = 0;
+        private Object snapShot = null;
+
 
         @Override public ReplicatedLogEntry get(long index) {
-            throw new UnsupportedOperationException("get");
+            if(index < 0 || index >= journal.size()){
+                return null;
+            }
+
+            return journal.get((int) (index - snapshotIndex));
         }
 
         @Override public ReplicatedLogEntry last() {
-            throw new UnsupportedOperationException("last");
+            if(journal.size() == 0){
+                return null;
+            }
+            return get(journal.size() - 1);
         }
 
+        @Override public long lastIndex() {
+            if(journal.size() == 0){
+                return -1;
+            }
+
+            return last().getIndex();
+        }
+
+        @Override public long lastTerm() {
+            if(journal.size() == 0){
+                return -1;
+            }
+
+            return last().getTerm();
+        }
+
+
         @Override public void removeFrom(long index) {
-            throw new UnsupportedOperationException("removeFrom");
+            if(index < 0 || index >= journal.size()){
+                return;
+            }
+            for(int i= (int) (index - snapshotIndex) ; i < journal.size() ; i++){
+                deleteMessage(i);
+                journal.remove(i);
+            }
+        }
+
+        @Override public void append(final ReplicatedLogEntry replicatedLogEntry) {
+            journal.add(replicatedLogEntry);
+        }
+
+        @Override public List<ReplicatedLogEntry> getFrom(long index) {
+            List<ReplicatedLogEntry> entries = new ArrayList<>(100);
+            if(index < 0 || index >= journal.size()){
+                return entries;
+            }
+            for(int i= (int) (index - snapshotIndex); i < journal.size() ; i++){
+                entries.add(journal.get(i));
+            }
+            return entries;
+        }
+
+        @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry){
+            appendAndPersist(null, null, replicatedLogEntry);
+        }
+
+        public void appendAndPersist(final ActorRef clientActor, final String identifier, final ReplicatedLogEntry replicatedLogEntry){
+            context.getLogger().debug("Append log entry and persist" + replicatedLogEntry.getIndex());
+            // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
+            journal.add(replicatedLogEntry);
+            persist(replicatedLogEntry,
+                new Procedure<ReplicatedLogEntry>() {
+                    public void apply(ReplicatedLogEntry evt) throws Exception {
+                        // Send message for replication
+                        if(clientActor != null) {
+                            currentBehavior.handleMessage(getSelf(),
+                                new Replicate(clientActor, identifier,
+                                    replicatedLogEntry));
+                        }
+                    }
+                });
         }
 
-        @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
-            throw new UnsupportedOperationException("append");
+        @Override public long size() {
+            return journal.size() + snapshotIndex;
         }
     }
+
+    private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
+        Serializable {
+
+        private final long index;
+        private final long term;
+        private final Object payload;
+
+        public ReplicatedLogImplEntry(long index, long term, Object payload){
+
+            this.index = index;
+            this.term = term;
+            this.payload = payload;
+        }
+
+        @Override public Object getData() {
+            return payload;
+        }
+
+        @Override public long getTerm() {
+            return term;
+        }
+
+        @Override public long getIndex() {
+            return index;
+        }
+    }
+
+
 }
index 4bc9162..cd5865b 100644 (file)
@@ -12,6 +12,9 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.event.LoggingAdapter;
+
+import java.util.Map;
 
 /**
  * The RaftActorContext contains that portion of the RaftActors state that
@@ -91,4 +94,23 @@ public interface RaftActorContext {
      * @return The ActorSystem associated with this context
      */
     ActorSystem getActorSystem();
+
+    /**
+     *
+     * @return
+     */
+    LoggingAdapter getLogger();
+
+    /**
+     * Get a mapping of peer id's their addresses
+     * @return
+     */
+    Map<String, String> getPeerAddresses();
+
+    /**
+     *
+     * @param peerId
+     * @return
+     */
+    String getPeerAddress(String peerId);
 }
index 845011a..03534d6 100644 (file)
@@ -13,6 +13,9 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActorContext;
+import akka.event.LoggingAdapter;
+
+import java.util.Map;
 
 public class RaftActorContextImpl implements RaftActorContext{
 
@@ -30,10 +33,15 @@ public class RaftActorContextImpl implements RaftActorContext{
 
     private final ReplicatedLog replicatedLog;
 
+    private final Map<String, String> peerAddresses;
+
+    private final LoggingAdapter LOG;
+
+
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
         String id,
         ElectionTerm termInformation, long commitIndex,
-        long lastApplied, ReplicatedLog replicatedLog) {
+        long lastApplied, ReplicatedLog replicatedLog, Map<String, String> peerAddresses, LoggingAdapter logger) {
         this.actor = actor;
         this.context = context;
         this.id = id;
@@ -41,6 +49,8 @@ public class RaftActorContextImpl implements RaftActorContext{
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
         this.replicatedLog = replicatedLog;
+        this.peerAddresses = peerAddresses;
+        this.LOG = logger;
     }
 
     public ActorRef actorOf(Props props){
@@ -86,4 +96,16 @@ public class RaftActorContextImpl implements RaftActorContext{
     @Override public ActorSystem getActorSystem() {
         return context.system();
     }
+
+    @Override public LoggingAdapter getLogger() {
+        return this.LOG;
+    }
+
+    @Override public Map<String, String> getPeerAddresses() {
+        return peerAddresses;
+    }
+
+    @Override public String getPeerAddress(String peerId) {
+        return peerAddresses.get(peerId);
+    }
 }
index f12bc9a..34e7ac9 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import java.util.List;
+
 /**
  * Represents the ReplicatedLog that needs to be kept in sync by the RaftActor
  */
@@ -28,6 +30,18 @@ public interface ReplicatedLog {
      */
     ReplicatedLogEntry last();
 
+    /**
+     *
+     * @return
+     */
+    long lastIndex();
+
+    /**
+     *
+     * @return
+     */
+    long lastTerm();
+
     /**
      * Remove all the entries from the logs >= index
      *
@@ -40,4 +54,23 @@ public interface ReplicatedLog {
      * @param replicatedLogEntry
      */
     void append(ReplicatedLogEntry replicatedLogEntry);
+
+    /**
+     *
+     * @param replicatedLogEntry
+     */
+    void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
+
+    /**
+     *
+     * @param index
+     */
+    List<ReplicatedLogEntry> getFrom(long index);
+
+
+    /**
+     *
+     * @return
+     */
+    long size();
 }
index 1670827..304b2fd 100644 (file)
@@ -10,9 +10,11 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -65,9 +67,13 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     /**
      *
      */
-
     private Cancellable electionCancel = null;
 
+    /**
+     *
+     */
+    protected String leaderId = null;
+
 
     protected AbstractRaftActorBehavior(RaftActorContext context) {
         this.context = context;
@@ -93,65 +99,25 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
 
     protected RaftState appendEntries(ActorRef sender,
-        AppendEntries appendEntries, RaftState raftState){
+        AppendEntries appendEntries, RaftState raftState) {
 
-        // 1. Reply false if term < currentTerm (§5.1)
-        if(appendEntries.getTerm() < currentTerm()){
-            sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
-            return state();
+        if (raftState != state()) {
+            context.getLogger().debug("Suggested state is " + raftState
+                + " current behavior state is " + state());
         }
 
-        // 2. Reply false if log doesn’t contain an entry at prevLogIndex
-        // whose term matches prevLogTerm (§5.3)
-        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
-            .get(appendEntries.getPrevLogIndex());
-
-        if(previousEntry == null || previousEntry.getTerm() != appendEntries.getPrevLogTerm()){
-            sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
+        // 1. Reply false if term < currentTerm (§5.1)
+        if (appendEntries.getTerm() < currentTerm()) {
+            context.getLogger().debug(
+                "Cannot append entries because sender term " + appendEntries
+                    .getTerm() + " is less than " + currentTerm());
+            sender.tell(
+                new AppendEntriesReply(context.getId(), currentTerm(), false,
+                    lastIndex(), lastTerm()), actor()
+            );
             return state();
         }
 
-        if(appendEntries.getEntries() != null) {
-            // 3. If an existing entry conflicts with a new one (same index
-            // but different terms), delete the existing entry and all that
-            // follow it (§5.3)
-            int addEntriesFrom = 0;
-            for (int i = 0;
-                 i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
-                ReplicatedLogEntry newEntry = context.getReplicatedLog()
-                    .get(i + 1);
-
-                if (newEntry != null && newEntry.getTerm() == appendEntries.getEntries().get(i).getTerm()){
-                    break;
-                }
-                if (newEntry != null && newEntry.getTerm() != appendEntries
-                    .getEntries().get(i).getTerm()) {
-                    context.getReplicatedLog().removeFrom(i + 1);
-                    break;
-                }
-            }
-
-            // 4. Append any new entries not already in the log
-            for (int i = addEntriesFrom;
-                 i < appendEntries.getEntries().size(); i++) {
-                context.getReplicatedLog()
-                    .append(appendEntries.getEntries().get(i));
-            }
-        }
-
-
-        // 5. If leaderCommit > commitIndex, set commitIndex =
-        // min(leaderCommit, index of last new entry)
-        context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
-            context.getReplicatedLog().last().getIndex()));
-
-        // If commitIndex > lastApplied: increment lastApplied, apply
-        // log[lastApplied] to state machine (§5.3)
-        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
-            applyLogToStateMachine(appendEntries.getLeaderCommit());
-        }
-
-        sender.tell(new AppendEntriesReply(currentTerm(), true), actor());
 
         return handleAppendEntries(sender, appendEntries, raftState);
     }
@@ -201,7 +167,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             if (requestVote.getLastLogTerm() > lastTerm()) {
                 candidateLatest = true;
             } else if ((requestVote.getLastLogTerm() == lastTerm())
-                && requestVote.getLastLogIndex() >= lastTerm()) {
+                && requestVote.getLastLogIndex() >= lastIndex()) {
                 candidateLatest = true;
             }
 
@@ -236,23 +202,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected abstract RaftState handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply, RaftState suggestedState);
 
-    /**
-     * @return The derived class should return the state that corresponds to
-     * it's behavior
-     */
-    protected abstract RaftState state();
-
     protected FiniteDuration electionDuration() {
         long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
         return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
             TimeUnit.MILLISECONDS);
     }
 
-    protected void scheduleElection(FiniteDuration interval) {
-
+    protected void stopElection() {
         if (electionCancel != null && !electionCancel.isCancelled()) {
             electionCancel.cancel();
         }
+    }
+
+    protected void scheduleElection(FiniteDuration interval) {
+
+        stopElection();
 
         // Schedule an election. When the scheduler triggers an ElectionTimeout
         // message is sent to itself
@@ -275,13 +239,44 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     protected long lastTerm() {
-        return context.getReplicatedLog().last().getTerm();
+        return context.getReplicatedLog().lastTerm();
     }
 
     protected long lastIndex() {
-        return context.getReplicatedLog().last().getIndex();
+        return context.getReplicatedLog().lastIndex();
     }
 
+    protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+        return null;
+    }
+
+    protected void applyLogToStateMachine(long index) {
+        // Now maybe we apply to the state machine
+        for (long i = context.getLastApplied() + 1;
+             i < index + 1; i++) {
+            ActorRef clientActor = null;
+            String identifier = null;
+            ClientRequestTracker tracker = findClientRequestTracker(i);
+
+            if (tracker != null) {
+                clientActor = tracker.getClientActor();
+                identifier = tracker.getIdentifier();
+            }
+            ReplicatedLogEntry replicatedLogEntry =
+                context.getReplicatedLog().get(i);
+
+            if (replicatedLogEntry != null) {
+                actor().tell(new ApplyState(clientActor, identifier,
+                    replicatedLogEntry), actor());
+            } else {
+                context.getLogger().error(
+                    "Missing index " + i + " from log. Cannot apply state.");
+            }
+        }
+        // Send a local message to the local RaftActor (it's derived class to be
+        // specific to apply the log to it's index)
+        context.setLastApplied(index);
+    }
 
     @Override
     public RaftState handleMessage(ActorRef sender, Object message) {
@@ -307,6 +302,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return raftState;
     }
 
+    @Override public String getLeaderId() {
+        return leaderId;
+    }
+
     private RaftState applyTerm(RaftRPC rpc) {
         // If RPC request or response contains term T > currentTerm:
         // set currentTerm = T, convert to follower (§5.1)
@@ -318,11 +317,4 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return state();
     }
 
-    private void applyLogToStateMachine(long index) {
-        // Send a local message to the local RaftActor (it's derived class to be
-        // specific to apply the log to it's index)
-        context.setLastApplied(index);
-    }
-
-
 }
index 3e6b502..0d035db 100644 (file)
@@ -18,13 +18,13 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
  * The behavior of a RaftActor when it is in the CandidateState
- * <p>
+ * <p/>
  * Candidates (§5.2):
  * <ul>
  * <li> On conversion to candidate, start election:
@@ -48,9 +48,11 @@ public class Candidate extends AbstractRaftActorBehavior {
 
     private final int votesRequired;
 
-    public Candidate(RaftActorContext context, List<String> peerPaths) {
+    public Candidate(RaftActorContext context) {
         super(context);
 
+        Collection<String> peerPaths = context.getPeerAddresses().values();
+
         for (String peerPath : peerPaths) {
             peerToActor.put(peerPath,
                 context.actorSelection(peerPath));
@@ -83,11 +85,7 @@ public class Candidate extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries, RaftState suggestedState) {
 
-        // There is some peer who thinks it's a leader but is not
-        // I will not accept this append entries
-        sender.tell(new AppendEntriesReply(
-            context.getTermInformation().getCurrentTerm(), false),
-            context.getActor());
+        context.getLogger().error("An unexpected AppendEntries received in state " + state());
 
         return suggestedState;
     }
@@ -102,30 +100,30 @@ public class Candidate extends AbstractRaftActorBehavior {
 
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply, RaftState suggestedState) {
-        if(suggestedState == RaftState.Follower) {
+        if (suggestedState == RaftState.Follower) {
             // If base class thinks I should be follower then I am
             return suggestedState;
         }
 
-        if(requestVoteReply.isVoteGranted()){
+        if (requestVoteReply.isVoteGranted()) {
             voteCount++;
         }
 
-        if(voteCount >= votesRequired){
+        if (voteCount >= votesRequired) {
             return RaftState.Leader;
         }
 
         return state();
     }
 
-    @Override protected RaftState state() {
+    @Override public RaftState state() {
         return RaftState.Candidate;
     }
 
     @Override
     public RaftState handleMessage(ActorRef sender, Object message) {
-        if(message instanceof ElectionTimeout){
-            if(votesRequired == 0){
+        if (message instanceof ElectionTimeout) {
+            if (votesRequired == 0) {
                 // If there are no peers then we should be a Leader
                 // We wait for the election timeout to occur before declare
                 // ourselves the leader. This gives enough time for a leader
@@ -141,24 +139,33 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
 
-    private void startNewTerm(){
+    private void startNewTerm() {
+
+
         // set voteCount back to 1 (that is voting for self)
         voteCount = 1;
 
         // Increment the election term and vote for self
         long currentTerm = context.getTermInformation().getCurrentTerm();
-        context.getTermInformation().update(currentTerm+1, context.getId());
+        context.getTermInformation().update(currentTerm + 1, context.getId());
+
+        context.getLogger().debug("Starting new term " + (currentTerm+1));
 
         // Request for a vote
-        for(ActorSelection peerActor : peerToActor.values()){
+        for (ActorSelection peerActor : peerToActor.values()) {
             peerActor.tell(new RequestVote(
                     context.getTermInformation().getCurrentTerm(),
-                    context.getId(), context.getReplicatedLog().last().getIndex(),
-                    context.getReplicatedLog().last().getTerm()),
-                context.getActor());
+                    context.getId(),
+                    context.getReplicatedLog().lastIndex(),
+                    context.getReplicatedLog().lastTerm()),
+                context.getActor()
+            );
         }
 
 
     }
 
+    @Override public void close() throws Exception {
+        stopElection();
+    }
 }
index d932710..88558ca 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -36,6 +37,114 @@ public class Follower extends AbstractRaftActorBehavior {
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries, RaftState suggestedState) {
+
+        // If we got here then we do appear to be talking to the leader
+        leaderId = appendEntries.getLeaderId();
+
+        // 2. Reply false if log doesn’t contain an entry at prevLogIndex
+        // whose term matches prevLogTerm (§5.3)
+        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+            .get(appendEntries.getPrevLogIndex());
+
+
+        if (lastIndex() > -1 && previousEntry != null
+            && previousEntry.getTerm() != appendEntries
+            .getPrevLogTerm()) {
+
+            context.getLogger().debug(
+                "Cannot append entries because previous entry term "
+                    + previousEntry.getTerm()
+                    + " is not equal to append entries prevLogTerm "
+                    + appendEntries.getPrevLogTerm());
+
+            sender.tell(
+                new AppendEntriesReply(context.getId(), currentTerm(), false,
+                    lastIndex(), lastTerm()), actor()
+            );
+            return state();
+        }
+
+        if (appendEntries.getEntries() != null
+            && appendEntries.getEntries().size() > 0) {
+            context.getLogger().debug(
+                "Number of entries to be appended = " + appendEntries
+                    .getEntries().size());
+
+            // 3. If an existing entry conflicts with a new one (same index
+            // but different terms), delete the existing entry and all that
+            // follow it (§5.3)
+            int addEntriesFrom = 0;
+            if (context.getReplicatedLog().size() > 0) {
+                for (int i = 0;
+                     i < appendEntries.getEntries()
+                         .size(); i++, addEntriesFrom++) {
+                    ReplicatedLogEntry matchEntry =
+                        appendEntries.getEntries().get(i);
+                    ReplicatedLogEntry newEntry = context.getReplicatedLog()
+                        .get(matchEntry.getIndex());
+
+                    if (newEntry == null) {
+                        //newEntry not found in the log
+                        break;
+                    }
+
+                    if (newEntry != null && newEntry.getTerm() == matchEntry
+                        .getTerm()) {
+                        continue;
+                    }
+                    if (newEntry != null && newEntry.getTerm() != matchEntry
+                        .getTerm()) {
+                        context.getLogger().debug(
+                            "Removing entries from log starting at "
+                                + matchEntry.getIndex());
+                        context.getReplicatedLog()
+                            .removeFrom(matchEntry.getIndex());
+                        break;
+                    }
+                }
+            }
+
+            context.getLogger().debug(
+                "After cleanup entries to be added from = " + (addEntriesFrom
+                    + lastIndex()));
+
+            // 4. Append any new entries not already in the log
+            for (int i = addEntriesFrom;
+                 i < appendEntries.getEntries().size(); i++) {
+                context.getLogger().debug(
+                    "Append entry to log " + appendEntries.getEntries().get(i)
+                        .toString());
+                context.getReplicatedLog()
+                    .appendAndPersist(appendEntries.getEntries().get(i));
+            }
+
+            context.getLogger().debug(
+                "Log size is now " + context.getReplicatedLog().size());
+        }
+
+
+        // 5. If leaderCommit > commitIndex, set commitIndex =
+        // min(leaderCommit, index of last new entry)
+
+        long prevCommitIndex = context.getCommitIndex();
+
+        context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
+            context.getReplicatedLog().lastIndex()));
+
+        if (prevCommitIndex != context.getCommitIndex()) {
+            context.getLogger()
+                .debug("Commit index set to " + context.getCommitIndex());
+        }
+
+        // If commitIndex > lastApplied: increment lastApplied, apply
+        // log[lastApplied] to state machine (§5.3)
+        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+            applyLogToStateMachine(appendEntries.getLeaderCommit());
+        }
+
+        sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
+            lastIndex(), lastTerm()), actor());
+
         return suggestedState;
     }
 
@@ -49,7 +158,7 @@ public class Follower extends AbstractRaftActorBehavior {
         return suggestedState;
     }
 
-    @Override protected RaftState state() {
+    @Override public RaftState state() {
         return RaftState.Follower;
     }
 
@@ -62,4 +171,8 @@ public class Follower extends AbstractRaftActorBehavior {
 
         return super.handleMessage(sender, message);
     }
+
+    @Override public void close() throws Exception {
+        stopElection();
+    }
 }
index cfefd21..c06ee9b 100644 (file)
@@ -12,17 +12,22 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
+import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,21 +66,37 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private Cancellable heartbeatCancel = null;
 
-    public Leader(RaftActorContext context, List<String> followerPaths) {
+    private List<ClientRequestTracker> trackerList = new ArrayList<>();
+
+    private final int minReplicationCount;
+
+    public Leader(RaftActorContext context) {
         super(context);
 
-        for (String followerPath : followerPaths) {
+        if(lastIndex() >= 0) {
+            context.setCommitIndex(lastIndex());
+        }
+
+        for (String followerId : context.getPeerAddresses().keySet()) {
             FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerPath,
-                    new AtomicLong(0),
-                    new AtomicLong(0));
+                new FollowerLogInformationImpl(followerId,
+                    new AtomicLong(lastIndex()),
+                    new AtomicLong(-1));
 
-            followerToActor.put(followerPath,
-                context.actorSelection(followerLogInformation.getId()));
-            followerToLog.put(followerPath, followerLogInformation);
+            followerToActor.put(followerId,
+                context.actorSelection(context.getPeerAddress(followerId)));
+
+            followerToLog.put(followerId, followerLogInformation);
+
+        }
 
+        if (followerToActor.size() > 0) {
+            minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
+        } else {
+            minReplicationCount = 0;
         }
 
+
         // Immediately schedule a heartbeat
         // Upon election: send initial empty AppendEntries RPCs
         // (heartbeat) to each server; repeat during idle periods to
@@ -87,47 +108,184 @@ public class Leader extends AbstractRaftActorBehavior {
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries, RaftState suggestedState) {
+
+        context.getLogger()
+            .error("An unexpected AppendEntries received in state " + state());
+
         return suggestedState;
     }
 
     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
+
+        // Do not take any other action since a behavior change is coming
+        if (suggestedState != state())
+            return suggestedState;
+
+        // Update the FollowerLogInformation
+        String followerId = appendEntriesReply.getFollowerId();
+        FollowerLogInformation followerLogInformation =
+            followerToLog.get(followerId);
+        if (appendEntriesReply.isSuccess()) {
+            followerLogInformation
+                .setMatchIndex(appendEntriesReply.getLogLastIndex());
+            followerLogInformation
+                .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
+        } else {
+            followerLogInformation.decrNextIndex();
+        }
+
+        // Now figure out if this reply warrants a change in the commitIndex
+        // If there exists an N such that N > commitIndex, a majority
+        // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
+        // set commitIndex = N (§5.3, §5.4).
+        for (long N = context.getCommitIndex() + 1; ; N++) {
+            int replicatedCount = 1;
+
+            for (FollowerLogInformation info : followerToLog.values()) {
+                if (info.getMatchIndex().get() >= N) {
+                    replicatedCount++;
+                }
+            }
+
+            if (replicatedCount >= minReplicationCount){
+                ReplicatedLogEntry replicatedLogEntry =
+                    context.getReplicatedLog().get(N);
+                if (replicatedLogEntry != null
+                    && replicatedLogEntry.getTerm()
+                    == currentTerm()) {
+                    context.setCommitIndex(N);
+                }
+            } else {
+                break;
+            }
+        }
+
+        if(context.getCommitIndex() > context.getLastApplied()){
+            applyLogToStateMachine(context.getCommitIndex());
+        }
+
         return suggestedState;
     }
 
+    protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+        for (ClientRequestTracker tracker : trackerList) {
+            if (tracker.getIndex() == logIndex) {
+                return tracker;
+            }
+        }
+
+        return null;
+    }
+
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply, RaftState suggestedState) {
         return suggestedState;
     }
 
-    @Override protected RaftState state() {
+    @Override public RaftState state() {
         return RaftState.Leader;
     }
 
     @Override public RaftState handleMessage(ActorRef sender, Object message) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
-        scheduleHeartBeat(HEART_BEAT_INTERVAL);
-
-        if (message instanceof SendHeartBeat) {
-            for (ActorSelection follower : followerToActor.values()) {
-                follower.tell(new AppendEntries(
-                    context.getTermInformation().getCurrentTerm(),
-                    context.getId(),
-                    context.getReplicatedLog().last().getIndex(),
-                    context.getReplicatedLog().last().getTerm(),
-                    Collections.EMPTY_LIST, context.getCommitIndex()),
-                    context.getActor());
+        try {
+            if (message instanceof SendHeartBeat) {
+                return sendHeartBeat();
+            } else if (message instanceof Replicate) {
+
+                Replicate replicate = (Replicate) message;
+                long logIndex = replicate.getReplicatedLogEntry().getIndex();
+
+                context.getLogger().debug("Replicate message " + logIndex);
+
+                if (followerToActor.size() == 0) {
+                    context.setCommitIndex(
+                        replicate.getReplicatedLogEntry().getIndex());
+
+                    context.getActor()
+                        .tell(new ApplyState(replicate.getClientActor(),
+                                replicate.getIdentifier(),
+                                replicate.getReplicatedLogEntry()),
+                            context.getActor()
+                        );
+                } else {
+
+                    trackerList.add(
+                        new ClientRequestTrackerImpl(replicate.getClientActor(),
+                            replicate.getIdentifier(),
+                            logIndex)
+                    );
+
+                    ReplicatedLogEntry prevEntry =
+                        context.getReplicatedLog().get(lastIndex() - 1);
+                    long prevLogIndex = -1;
+                    long prevLogTerm = -1;
+                    if (prevEntry != null) {
+                        prevLogIndex = prevEntry.getIndex();
+                        prevLogTerm = prevEntry.getTerm();
+                    }
+                    // Send an AppendEntries to all followers
+                    for (String followerId : followerToActor.keySet()) {
+                        ActorSelection followerActor =
+                            followerToActor.get(followerId);
+                        FollowerLogInformation followerLogInformation =
+                            followerToLog.get(followerId);
+                        followerActor.tell(
+                            new AppendEntries(currentTerm(), context.getId(),
+                                prevLogIndex, prevLogTerm,
+                                context.getReplicatedLog().getFrom(
+                                    followerLogInformation.getNextIndex()
+                                        .get()
+                                ), context.getCommitIndex()
+                            ),
+                            actor()
+                        );
+                    }
+                }
             }
-            return state();
+        } finally {
+            scheduleHeartBeat(HEART_BEAT_INTERVAL);
         }
+
         return super.handleMessage(sender, message);
     }
 
-    private void scheduleHeartBeat(FiniteDuration interval) {
+    private RaftState sendHeartBeat() {
+        if (followerToActor.size() > 0) {
+            for (String follower : followerToActor.keySet()) {
+
+                FollowerLogInformation followerLogInformation =
+                    followerToLog.get(follower);
+
+                AtomicLong nextIndex =
+                    followerLogInformation.getNextIndex();
+
+                List<ReplicatedLogEntry> entries =
+                    context.getReplicatedLog().getFrom(nextIndex.get());
+
+                followerToActor.get(follower).tell(new AppendEntries(
+                        context.getTermInformation().getCurrentTerm(),
+                        context.getId(),
+                        context.getReplicatedLog().lastIndex(),
+                        context.getReplicatedLog().lastTerm(),
+                        entries, context.getCommitIndex()),
+                    context.getActor()
+                );
+            }
+        }
+        return state();
+    }
+
+    private void stopHeartBeat() {
         if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
             heartbeatCancel.cancel();
         }
+    }
+
+    private void scheduleHeartBeat(FiniteDuration interval) {
+        stopHeartBeat();
 
         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
         // message is sent to itself.
@@ -135,9 +293,18 @@ public class Leader extends AbstractRaftActorBehavior {
         // need to be sent if there are other messages being sent to the remote
         // actor.
         heartbeatCancel =
-            context.getActorSystem().scheduler().scheduleOnce(interval,
+            context.getActorSystem().scheduler().scheduleOnce(
+                interval,
                 context.getActor(), new SendHeartBeat(),
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
+    @Override public void close() throws Exception {
+        stopHeartBeat();
+    }
+
+    @Override public String getLeaderId() {
+        return context.getId();
+    }
+
 }
index 6811678..ca2d916 100644 (file)
@@ -24,7 +24,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
  * In each of these behaviors the Raft Actor handles the same Raft messages
  * differently.
  */
-public interface RaftActorBehavior {
+public interface RaftActorBehavior extends AutoCloseable{
     /**
      * Handle a message. If the processing of the message warrants a state
      * change then a new state should be returned otherwise this method should
@@ -36,4 +36,17 @@ public interface RaftActorBehavior {
      * @return The new state or self (this)
      */
     RaftState handleMessage(ActorRef sender, Object message);
+
+    /**
+     * The state associated with a given behavior
+     *
+     * @return
+     */
+    RaftState state();
+
+    /**
+     *
+     * @return
+     */
+    String getLeaderId();
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java
new file mode 100644 (file)
index 0000000..a60aea4
--- /dev/null
@@ -0,0 +1,13 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+public class FindLeader {
+
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java
new file mode 100644 (file)
index 0000000..b36ef11
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+public class FindLeaderReply {
+    private final String leaderActor;
+
+    public FindLeaderReply(String leaderActor) {
+        this.leaderActor = leaderActor;
+    }
+
+    public String getLeaderActor() {
+        return leaderActor;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplyState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplyState.java
new file mode 100644 (file)
index 0000000..c9ba26e
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
+public class ApplyState {
+    private final ActorRef clientActor;
+    private final String identifier;
+    private final ReplicatedLogEntry replicatedLogEntry;
+
+    public ApplyState(ActorRef clientActor, String identifier,
+        ReplicatedLogEntry replicatedLogEntry) {
+        this.clientActor = clientActor;
+        this.identifier = identifier;
+        this.replicatedLogEntry = replicatedLogEntry;
+    }
+
+    public ActorRef getClientActor() {
+        return clientActor;
+    }
+
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    public ReplicatedLogEntry getReplicatedLogEntry() {
+        return replicatedLogEntry;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java
new file mode 100644 (file)
index 0000000..6ff7cfc
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
+public class Replicate {
+    private final ActorRef clientActor;
+    private final String identifier;
+    private final ReplicatedLogEntry replicatedLogEntry;
+
+    public Replicate(ActorRef clientActor, String identifier,
+        ReplicatedLogEntry replicatedLogEntry) {
+
+        this.clientActor = clientActor;
+        this.identifier = identifier;
+        this.replicatedLogEntry = replicatedLogEntry;
+    }
+
+    public ActorRef getClientActor() {
+        return clientActor;
+    }
+
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    public ReplicatedLogEntry getReplicatedLogEntry() {
+        return replicatedLogEntry;
+    }
+}
index c0e89a8..9bb5029 100644 (file)
@@ -62,4 +62,14 @@ public class AppendEntries extends AbstractRaftRPC {
     public long getLeaderCommit() {
         return leaderCommit;
     }
+
+    @Override public String toString() {
+        return "AppendEntries{" +
+            "leaderId='" + leaderId + '\'' +
+            ", prevLogIndex=" + prevLogIndex +
+            ", prevLogTerm=" + prevLogTerm +
+            ", entries=" + entries +
+            ", leaderCommit=" + leaderCommit +
+            '}';
+    }
 }
index 28f0f6b..7524d8f 100644 (file)
@@ -17,9 +17,24 @@ public class AppendEntriesReply extends AbstractRaftRPC{
     // prevLogIndex and prevLogTerm
     private final boolean success;
 
-    public AppendEntriesReply(long term, boolean success) {
+    // The index of the last entry in the followers log
+    // This will be used to set the matchIndex for the follower on the
+    // Leader
+    private final long logLastIndex;
+
+    private final long logLastTerm;
+
+    // The followerId - this will be used to figure out which follower is
+    // responding
+    private final String followerId;
+
+    public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm) {
         super(term);
+
+        this.followerId = followerId;
         this.success = success;
+        this.logLastIndex = logLastIndex;
+        this.logLastTerm = logLastTerm;
     }
 
     public long getTerm() {
@@ -29,4 +44,16 @@ public class AppendEntriesReply extends AbstractRaftRPC{
     public boolean isSuccess() {
         return success;
     }
+
+    public long getLogLastIndex() {
+        return logLastIndex;
+    }
+
+    public long getLogLastTerm() {
+        return logLastTerm;
+    }
+
+    public String getFollowerId() {
+        return followerId;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf b/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf
new file mode 100644 (file)
index 0000000..494a99e
--- /dev/null
@@ -0,0 +1,12 @@
+akka {
+    loglevel = "DEBUG"
+    actor {
+        serializers {
+          java = "akka.serialization.JavaSerializer"
+        }
+
+        serialization-bindings {
+            "org.opendaylight.controller.cluster.raft.RaftActor$ReplicatedLogImplEntry" = java
+        }
+    }
+}
index 3b332e4..addf51a 100644 (file)
@@ -12,9 +12,14 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class MockRaftActorContext implements RaftActorContext {
 
@@ -25,6 +30,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private long lastApplied = 0;
     private final ElectionTerm electionTerm;
     private ReplicatedLog replicatedLog;
+    private Map<String, String> peerAddresses = new HashMap();
 
     public MockRaftActorContext(){
         electionTerm = null;
@@ -102,6 +108,22 @@ public class MockRaftActorContext implements RaftActorContext {
         return this.system;
     }
 
+    @Override public LoggingAdapter getLogger() {
+        return Logging.getLogger(system, this);
+    }
+
+    @Override public Map<String, String> getPeerAddresses() {
+        return peerAddresses;
+    }
+
+    @Override public String getPeerAddress(String peerId) {
+        return peerAddresses.get(peerId);
+    }
+
+    public void setPeerAddresses(Map<String, String> peerAddresses) {
+        this.peerAddresses = peerAddresses;
+    }
+
 
     public static class MockReplicatedLog implements ReplicatedLog {
         private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, "");
@@ -115,12 +137,35 @@ public class MockRaftActorContext implements RaftActorContext {
             return last;
         }
 
+        @Override public long lastIndex() {
+            return last.getIndex();
+        }
+
+        @Override public long lastTerm() {
+            return last.getTerm();
+        }
+
         @Override public void removeFrom(long index) {
         }
 
         @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
         }
 
+        @Override public void appendAndPersist(
+            ReplicatedLogEntry replicatedLogEntry) {
+        }
+
+        @Override public List<ReplicatedLogEntry> getFrom(long index) {
+            return Collections.EMPTY_LIST;
+        }
+
+        @Override public long size() {
+            if(replicatedLogEntry != null){
+                return 1;
+            }
+            return 0;
+        }
+
         public void setReplicatedLogEntry(
             ReplicatedLogEntry replicatedLogEntry) {
             this.replicatedLogEntry = replicatedLogEntry;
@@ -135,13 +180,35 @@ public class MockRaftActorContext implements RaftActorContext {
         private final List<ReplicatedLogEntry> log = new ArrayList<>(10000);
 
         @Override public ReplicatedLogEntry get(long index) {
+            if(index >= log.size() || index < 0){
+                return null;
+            }
             return log.get((int) index);
         }
 
         @Override public ReplicatedLogEntry last() {
+            if(log.size() == 0){
+                return null;
+            }
             return log.get(log.size()-1);
         }
 
+        @Override public long lastIndex() {
+            if(log.size() == 0){
+                return -1;
+            }
+
+            return last().getIndex();
+        }
+
+        @Override public long lastTerm() {
+            if(log.size() == 0){
+                return -1;
+            }
+
+            return last().getTerm();
+        }
+
         @Override public void removeFrom(long index) {
             for(int i=(int) index ; i < log.size() ; i++) {
                 log.remove(i);
@@ -151,6 +218,23 @@ public class MockRaftActorContext implements RaftActorContext {
         @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
             log.add(replicatedLogEntry);
         }
+
+        @Override public void appendAndPersist(
+            ReplicatedLogEntry replicatedLogEntry) {
+            append(replicatedLogEntry);
+        }
+
+        @Override public List<ReplicatedLogEntry> getFrom(long index) {
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            for(int i=(int) index ; i < log.size() ; i++) {
+                entries.add(get(i));
+            }
+            return entries;
+        }
+
+        @Override public long size() {
+            return log.size();
+        }
     }
 
     public static class MockReplicatedLogEntry implements ReplicatedLogEntry {
index 3cd373a..273342e 100644 (file)
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 
 public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
 
@@ -53,35 +52,6 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         }};
     }
 
-    /**
-     * This test verifies that when an AppendEntries RPC is received by a RaftActor
-     * with a commitIndex that is greater than what has been applied to the
-     * state machine of the RaftActor, the RaftActor applies the state and
-     * sets it current applied state to the commitIndex of the sender.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
-        new JavaTestKit(getSystem()) {{
-
-            RaftActorContext context =
-                createActorContext();
-
-            context.setLastApplied(100);
-            setLastLogEntry((MockRaftActorContext) context, 0, 0, "");
-
-            // The new commitIndex is 101
-            AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, null, 101);
-
-            RaftState raftState =
-                createBehavior(context).handleMessage(getRef(), appendEntries);
-
-            assertEquals(101L, context.getLastApplied());
-
-        }};
-    }
 
     /**
      * This test verifies that when an AppendEntries is received with a term that
@@ -134,248 +104,56 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         }};
     }
 
-    /**
-     * This test verifies that when an AppendEntries is received a specific prevLogTerm
-     * which does not match the term that is in RaftActors log entry at prevLogIndex
-     * then the RaftActor does not change it's state and it returns a failure.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
-        throws Exception {
-        new JavaTestKit(getSystem()) {{
-
-            MockRaftActorContext context = (MockRaftActorContext)
-                createActorContext();
-
-            // First set the receivers term to lower number
-            context.getTermInformation().update(95, "test");
-
-            // Set the last log entry term for the receiver to be greater than
-            // what we will be sending as the prevLogTerm in AppendEntries
-            MockRaftActorContext.MockReplicatedLog mockReplicatedLog =
-                setLastLogEntry(context, 20, 0, "");
-
-            // Also set the entry at index 0 with term 20 which will be greater
-            // than the prevLogTerm sent by the sender
-            mockReplicatedLog.setReplicatedLogEntry(
-                new MockRaftActorContext.MockReplicatedLogEntry(20, 0, ""));
-
-            // AppendEntries is now sent with a bigger term
-            // this will set the receivers term to be the same as the sender's term
-            AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, null, 101);
-
-            RaftActorBehavior behavior = createBehavior(context);
-
-            // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
-
-            RaftState raftState =
-                behavior.handleMessage(getRef(), appendEntries);
-
-            assertEquals(expected, raftState);
-
-            // Also expect an AppendEntriesReply to be sent where success is false
-            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
-                "AppendEntriesReply") {
-                // do not put code outside this method, will run afterwards
-                protected Boolean match(Object in) {
-                    if (in instanceof AppendEntriesReply) {
-                        AppendEntriesReply reply = (AppendEntriesReply) in;
-                        return reply.isSuccess();
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get();
-
-            assertEquals(false, out);
 
-
-        }};
-    }
-
-    /**
-     * This test verifies that when a new AppendEntries message is received with
-     * new entries and the logs of the sender and receiver match that the new
-     * entries get added to the log and the log is incremented by the number of
-     * entries received in appendEntries
-     *
-     * @throws Exception
-     */
     @Test
-    public void testHandleAppendEntriesAddNewEntries() throws Exception {
-        new JavaTestKit(getSystem()) {{
+    public void testHandleAppendEntriesAddSameEntryToLog(){
+        new JavaTestKit(getSystem()) {
+            {
 
-            MockRaftActorContext context = (MockRaftActorContext)
-                createActorContext();
+                MockRaftActorContext context = (MockRaftActorContext)
+                    createActorContext();
 
-            // First set the receivers term to lower number
-            context.getTermInformation().update(1, "test");
-
-            // Prepare the receivers log
-            MockRaftActorContext.SimpleReplicatedLog log =
-                new MockRaftActorContext.SimpleReplicatedLog();
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
-
-            context.setReplicatedLog(log);
-
-            // Prepare the entries to be sent with AppendEntries
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 3, "three"));
-            entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 4, "four"));
-
-            // Send appendEntries with the same term as was set on the receiver
-            // before the new behavior was created (1 in this case)
-            // This will not work for a Candidate because as soon as a Candidate
-            // is created it increments the term
-            AppendEntries appendEntries =
-                new AppendEntries(1, "leader-1", 2, 1, entries, 101);
+                // First set the receivers term to lower number
+                context.getTermInformation().update(2, "test");
 
-            RaftActorBehavior behavior = createBehavior(context);
+                // Prepare the receivers log
+                MockRaftActorContext.SimpleReplicatedLog log =
+                    new MockRaftActorContext.SimpleReplicatedLog();
+                log.append(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
 
-            if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
-                // Resetting the Candidates term to make sure it will match
-                // the term sent by AppendEntries. If this was not done then
-                // the test will fail because the Candidate will assume that
-                // the message was sent to it from a lower term peer and will
-                // thus respond with a failure
-                context.getTermInformation().update(1, "test");
-            }
+                context.setReplicatedLog(log);
 
-            // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+                List<ReplicatedLogEntry> entries = new ArrayList<>();
+                entries.add(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
 
-            RaftState raftState =
-                behavior.handleMessage(getRef(), appendEntries);
+                AppendEntries appendEntries =
+                    new AppendEntries(2, "leader-1", -1, 1, entries, 0);
 
-            assertEquals(expected, raftState);
-            assertEquals(5, log.last().getIndex() + 1);
-            assertNotNull(log.get(3));
-            assertNotNull(log.get(4));
+                RaftActorBehavior behavior = createBehavior(context);
 
-            // Also expect an AppendEntriesReply to be sent where success is false
-            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
-                "AppendEntriesReply") {
-                // do not put code outside this method, will run afterwards
-                protected Boolean match(Object in) {
-                    if (in instanceof AppendEntriesReply) {
-                        AppendEntriesReply reply = (AppendEntriesReply) in;
-                        return reply.isSuccess();
-                    } else {
-                        throw noMatch();
-                    }
+                if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
+                    // Resetting the Candidates term to make sure it will match
+                    // the term sent by AppendEntries. If this was not done then
+                    // the test will fail because the Candidate will assume that
+                    // the message was sent to it from a lower term peer and will
+                    // thus respond with a failure
+                    context.getTermInformation().update(2, "test");
                 }
-            }.get();
 
-            assertEquals(true, out);
+                // Send an unknown message so that the state of the RaftActor remains unchanged
+                RaftState expected = behavior.handleMessage(getRef(), "unknown");
 
+                RaftState raftState =
+                    behavior.handleMessage(getRef(), appendEntries);
 
-        }};
-    }
+                assertEquals(expected, raftState);
 
-    /**
-     * This test verifies that when a new AppendEntries message is received with
-     * new entries and the logs of the sender and receiver are out-of-sync that
-     * the log is first corrected by removing the out of sync entries from the
-     * log and then adding in the new entries sent with the AppendEntries message
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testHandleAppendEntriesCorrectReceiverLogEntries()
-        throws Exception {
-        new JavaTestKit(getSystem()) {{
+                assertEquals(1, log.size());
 
-            MockRaftActorContext context = (MockRaftActorContext)
-                createActorContext();
 
-            // First set the receivers term to lower number
-            context.getTermInformation().update(2, "test");
-
-            // Prepare the receivers log
-            MockRaftActorContext.SimpleReplicatedLog log =
-                new MockRaftActorContext.SimpleReplicatedLog();
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
-
-            context.setReplicatedLog(log);
-
-            // Prepare the entries to be sent with AppendEntries
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(2, 2, "two-1"));
-            entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(2, 3, "three"));
-
-            // Send appendEntries with the same term as was set on the receiver
-            // before the new behavior was created (1 in this case)
-            // This will not work for a Candidate because as soon as a Candidate
-            // is created it increments the term
-            AppendEntries appendEntries =
-                new AppendEntries(2, "leader-1", 1, 1, entries, 101);
-
-            RaftActorBehavior behavior = createBehavior(context);
-
-            if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
-                // Resetting the Candidates term to make sure it will match
-                // the term sent by AppendEntries. If this was not done then
-                // the test will fail because the Candidate will assume that
-                // the message was sent to it from a lower term peer and will
-                // thus respond with a failure
-                context.getTermInformation().update(2, "test");
-            }
-
-            // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
-
-            RaftState raftState =
-                behavior.handleMessage(getRef(), appendEntries);
-
-            assertEquals(expected, raftState);
-
-            // The entry at index 2 will be found out-of-sync with the leader
-            // and will be removed
-            // Then the two new entries will be added to the log
-            // Thus making the log to have 4 entries
-            assertEquals(4, log.last().getIndex() + 1);
-            assertNotNull(log.get(2));
-
-            // Check that the entry at index 2 has the new data
-            assertEquals("two-1", log.get(2).getData());
-            assertNotNull(log.get(3));
-
-            // Also expect an AppendEntriesReply to be sent where success is false
-            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
-                "AppendEntriesReply") {
-                // do not put code outside this method, will run afterwards
-                protected Boolean match(Object in) {
-                    if (in instanceof AppendEntriesReply) {
-                        AppendEntriesReply reply = (AppendEntriesReply) in;
-                        return reply.isSuccess();
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get();
-
-            assertEquals(true, out);
-
-
-        }};
+            }};
     }
 
     /**
@@ -566,7 +344,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
     }
 
     protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
-        return new AppendEntriesReply(100, false);
+        return new AppendEntriesReply("follower-1", 100, false, 100, 100);
     }
 
     protected RequestVote createRequestVoteWithNewerTerm() {
index 183c668..8bcee58 100644 (file)
@@ -4,6 +4,7 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import junit.framework.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -15,8 +16,9 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
-import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -37,12 +39,38 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     private final ActorRef peerActor4 = getSystem().actorOf(Props.create(
         DoNothingActor.class));
 
+    private final Map<String, String> onePeer = new HashMap<>();
+    private final Map<String, String> twoPeers = new HashMap<>();
+    private final Map<String, String> fourPeers = new HashMap<>();
+
+    @Before
+    public void setUp(){
+        onePeer.put(peerActor1.path().toString(),
+            peerActor1.path().toString());
+
+        twoPeers.put(peerActor1.path().toString(),
+            peerActor1.path().toString());
+        twoPeers.put(peerActor2.path().toString(),
+            peerActor2.path().toString());
+
+        fourPeers.put(peerActor1.path().toString(),
+            peerActor1.path().toString());
+        fourPeers.put(peerActor2.path().toString(),
+            peerActor2.path().toString());
+        fourPeers.put(peerActor3.path().toString(),
+            peerActor3.path().toString());
+        fourPeers.put(peerActor4.path().toString(),
+            peerActor3.path().toString());
+
+
+    }
+
     @Test
     public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){
         RaftActorContext raftActorContext = createActorContext();
         long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm();
 
-        new Candidate(raftActorContext, Collections.EMPTY_LIST);
+        new Candidate(raftActorContext);
 
         assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm());
         assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor());
@@ -55,7 +83,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
             new Within(duration("1 seconds")) {
                 protected void run() {
 
-                    Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+                    Candidate candidate = new Candidate(createActorContext(getTestActor()));
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
                         // do not put code outside this method, will run afterwards
@@ -78,7 +106,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     public void testHandleElectionTimeoutWhenThereAreZeroPeers(){
         RaftActorContext raftActorContext = createActorContext();
         Candidate candidate =
-            new Candidate(raftActorContext, Collections.EMPTY_LIST);
+            new Candidate(raftActorContext);
 
         RaftState raftState =
             candidate.handleMessage(candidateActor, new ElectionTimeout());
@@ -87,12 +115,12 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testHandleElectionTimeoutWhenThereAreTwoPeers(){
-        RaftActorContext raftActorContext = createActorContext();
+    public void testHandleElectionTimeoutWhenThereAreTwoNodesInCluster(){
+        MockRaftActorContext raftActorContext =
+            (MockRaftActorContext) createActorContext();
+        raftActorContext.setPeerAddresses(onePeer);
         Candidate candidate =
-            new Candidate(raftActorContext, Arrays
-                .asList(peerActor1.path().toString(),
-                    peerActor2.path().toString()));
+            new Candidate(raftActorContext);
 
         RaftState raftState =
             candidate.handleMessage(candidateActor, new ElectionTimeout());
@@ -101,12 +129,12 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testBecomeLeaderOnReceivingMajorityVotesInThreePeerCluster(){
-        RaftActorContext raftActorContext = createActorContext();
+    public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodesInCluster(){
+        MockRaftActorContext raftActorContext =
+            (MockRaftActorContext) createActorContext();
+        raftActorContext.setPeerAddresses(twoPeers);
         Candidate candidate =
-            new Candidate(raftActorContext, Arrays
-                .asList(peerActor1.path().toString(),
-                    peerActor2.path().toString()));
+            new Candidate(raftActorContext);
 
         RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
 
@@ -115,17 +143,16 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testBecomeLeaderOnReceivingMajorityVotesInFivePeerCluster(){
-        RaftActorContext raftActorContext = createActorContext();
+    public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodesInCluster(){
+        MockRaftActorContext raftActorContext =
+            (MockRaftActorContext) createActorContext();
+        raftActorContext.setPeerAddresses(fourPeers);
         Candidate candidate =
-            new Candidate(raftActorContext, Arrays
-                .asList(peerActor1.path().toString(),
-                    peerActor2.path().toString(),
-                    peerActor3.path().toString()));
+            new Candidate(raftActorContext);
 
         RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
 
-        RaftState stateOnSecondVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+        RaftState stateOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true));
 
         Assert.assertEquals(RaftState.Candidate, stateOnFirstVote);
         Assert.assertEquals(RaftState.Leader, stateOnSecondVote);
@@ -139,7 +166,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
             new Within(duration("1 seconds")) {
                 protected void run() {
 
-                    Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+                    Candidate candidate = new Candidate(createActorContext(getTestActor()));
 
                     candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.EMPTY_LIST, 0));
 
@@ -168,7 +195,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
             new Within(duration("1 seconds")) {
                 protected void run() {
 
-                    Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+                    Candidate candidate = new Candidate(createActorContext(getTestActor()));
 
                     candidate.handleMessage(getTestActor(), new RequestVote(0, "test", 0, 0));
 
@@ -261,7 +288,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
 
     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
-        return new Candidate(actorContext, Collections.EMPTY_LIST);
+        return new Candidate(actorContext);
     }
 
     @Override protected RaftActorContext createActorContext() {
index 90acbb1..ca0e13d 100644 (file)
@@ -8,12 +8,19 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
@@ -133,4 +140,264 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+    /**
+     * This test verifies that when an AppendEntries RPC is received by a RaftActor
+     * with a commitIndex that is greater than what has been applied to the
+     * state machine of the RaftActor, the RaftActor applies the state and
+     * sets it current applied state to the commitIndex of the sender.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            RaftActorContext context =
+                createActorContext();
+
+            context.setLastApplied(100);
+            setLastLogEntry((MockRaftActorContext) context, 0, 0, "");
+
+            // The new commitIndex is 101
+            AppendEntries appendEntries =
+                new AppendEntries(100, "leader-1", 0, 0, null, 101);
+
+            RaftState raftState =
+                createBehavior(context).handleMessage(getRef(), appendEntries);
+
+            assertEquals(101L, context.getLastApplied());
+
+        }};
+    }
+
+    /**
+     * This test verifies that when an AppendEntries is received a specific prevLogTerm
+     * which does not match the term that is in RaftActors log entry at prevLogIndex
+     * then the RaftActor does not change it's state and it returns a failure.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
+        throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                createActorContext();
+
+            // First set the receivers term to lower number
+            context.getTermInformation().update(95, "test");
+
+            // Set the last log entry term for the receiver to be greater than
+            // what we will be sending as the prevLogTerm in AppendEntries
+            MockRaftActorContext.MockReplicatedLog mockReplicatedLog =
+                setLastLogEntry(context, 20, 0, "");
+
+            // Also set the entry at index 0 with term 20 which will be greater
+            // than the prevLogTerm sent by the sender
+            mockReplicatedLog.setReplicatedLogEntry(
+                new MockRaftActorContext.MockReplicatedLogEntry(20, 0, ""));
+
+            // AppendEntries is now sent with a bigger term
+            // this will set the receivers term to be the same as the sender's term
+            AppendEntries appendEntries =
+                new AppendEntries(100, "leader-1", 0, 0, null, 101);
+
+            RaftActorBehavior behavior = createBehavior(context);
+
+            // Send an unknown message so that the state of the RaftActor remains unchanged
+            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+            RaftState raftState =
+                behavior.handleMessage(getRef(), appendEntries);
+
+            assertEquals(expected, raftState);
+
+            // Also expect an AppendEntriesReply to be sent where success is false
+            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+                "AppendEntriesReply") {
+                // do not put code outside this method, will run afterwards
+                protected Boolean match(Object in) {
+                    if (in instanceof AppendEntriesReply) {
+                        AppendEntriesReply reply = (AppendEntriesReply) in;
+                        return reply.isSuccess();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get();
+
+            assertEquals(false, out);
+
+
+        }};
+    }
+
+
+
+    /**
+     * This test verifies that when a new AppendEntries message is received with
+     * new entries and the logs of the sender and receiver match that the new
+     * entries get added to the log and the log is incremented by the number of
+     * entries received in appendEntries
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleAppendEntriesAddNewEntries() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                createActorContext();
+
+            // First set the receivers term to lower number
+            context.getTermInformation().update(1, "test");
+
+            // Prepare the receivers log
+            MockRaftActorContext.SimpleReplicatedLog log =
+                new MockRaftActorContext.SimpleReplicatedLog();
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
+
+            context.setReplicatedLog(log);
+
+            // Prepare the entries to be sent with AppendEntries
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            entries.add(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 3, "three"));
+            entries.add(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 4, "four"));
+
+            // Send appendEntries with the same term as was set on the receiver
+            // before the new behavior was created (1 in this case)
+            // This will not work for a Candidate because as soon as a Candidate
+            // is created it increments the term
+            AppendEntries appendEntries =
+                new AppendEntries(1, "leader-1", 2, 1, entries, 4);
+
+            RaftActorBehavior behavior = createBehavior(context);
+
+            // Send an unknown message so that the state of the RaftActor remains unchanged
+            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+            RaftState raftState =
+                behavior.handleMessage(getRef(), appendEntries);
+
+            assertEquals(expected, raftState);
+            assertEquals(5, log.last().getIndex() + 1);
+            assertNotNull(log.get(3));
+            assertNotNull(log.get(4));
+
+            // Also expect an AppendEntriesReply to be sent where success is false
+            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+                "AppendEntriesReply") {
+                // do not put code outside this method, will run afterwards
+                protected Boolean match(Object in) {
+                    if (in instanceof AppendEntriesReply) {
+                        AppendEntriesReply reply = (AppendEntriesReply) in;
+                        return reply.isSuccess();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get();
+
+            assertEquals(true, out);
+
+
+        }};
+    }
+
+
+
+    /**
+     * This test verifies that when a new AppendEntries message is received with
+     * new entries and the logs of the sender and receiver are out-of-sync that
+     * the log is first corrected by removing the out of sync entries from the
+     * log and then adding in the new entries sent with the AppendEntries message
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleAppendEntriesCorrectReceiverLogEntries()
+        throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                createActorContext();
+
+            // First set the receivers term to lower number
+            context.getTermInformation().update(2, "test");
+
+            // Prepare the receivers log
+            MockRaftActorContext.SimpleReplicatedLog log =
+                new MockRaftActorContext.SimpleReplicatedLog();
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
+
+            context.setReplicatedLog(log);
+
+            // Prepare the entries to be sent with AppendEntries
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            entries.add(
+                new MockRaftActorContext.MockReplicatedLogEntry(2, 2, "two-1"));
+            entries.add(
+                new MockRaftActorContext.MockReplicatedLogEntry(2, 3, "three"));
+
+            // Send appendEntries with the same term as was set on the receiver
+            // before the new behavior was created (1 in this case)
+            // This will not work for a Candidate because as soon as a Candidate
+            // is created it increments the term
+            AppendEntries appendEntries =
+                new AppendEntries(2, "leader-1", 1, 1, entries, 3);
+
+            RaftActorBehavior behavior = createBehavior(context);
+
+            // Send an unknown message so that the state of the RaftActor remains unchanged
+            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+            RaftState raftState =
+                behavior.handleMessage(getRef(), appendEntries);
+
+            assertEquals(expected, raftState);
+
+            // The entry at index 2 will be found out-of-sync with the leader
+            // and will be removed
+            // Then the two new entries will be added to the log
+            // Thus making the log to have 4 entries
+            assertEquals(4, log.last().getIndex() + 1);
+            assertNotNull(log.get(2));
+
+            // Check that the entry at index 2 has the new data
+            assertEquals("two-1", log.get(2).getData());
+            assertNotNull(log.get(3));
+
+            // Also expect an AppendEntriesReply to be sent where success is false
+            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+                "AppendEntriesReply") {
+                // do not put code outside this method, will run afterwards
+                protected Boolean match(Object in) {
+                    if (in instanceof AppendEntriesReply) {
+                        AppendEntriesReply reply = (AppendEntriesReply) in;
+                        return reply.isSuccess();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get();
+
+            assertEquals(true, out);
+
+
+        }};
+    }
+
 }
index 5684d66..e5e54d5 100644 (file)
@@ -12,9 +12,8 @@ import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -27,7 +26,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     public void testHandleMessageForUnknownMessage() throws Exception {
         new JavaTestKit(getSystem()) {{
             Leader leader =
-                new Leader(createActorContext(), Collections.EMPTY_LIST);
+                new Leader(createActorContext());
 
             // handle message should return the Leader state when it receives an
             // unknown message
@@ -46,11 +45,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                     ActorRef followerActor = getTestActor();
 
-                    List<String> followers = new ArrayList();
+                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
 
-                    followers.add(followerActor.path().toString());
+                    Map<String, String> peerAddresses = new HashMap();
 
-                    Leader leader = new Leader(createActorContext(), followers);
+                    peerAddresses.put(followerActor.path().toString(), followerActor.path().toString());
+
+                    actorContext.setPeerAddresses(peerAddresses);
+
+                    Leader leader = new Leader(actorContext);
                     leader.handleMessage(senderActor, new SendHeartBeat());
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
@@ -78,7 +81,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
-        return new Leader(actorContext, Collections.EMPTY_LIST);
+        return new Leader(actorContext);
     }
 
     @Override protected RaftActorContext createActorContext() {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.