Snapshot changes 20/9520/7
authorKamal Rameshan <kramesha@cisco.com>
Thu, 31 Jul 2014 06:00:36 +0000 (23:00 -0700)
committerKamal Rameshan <kramesha@cisco.com>
Fri, 1 Aug 2014 02:30:48 +0000 (19:30 -0700)
Fixed adjusted index logic.
Added tests
Added Reinstate Node in TestDriver.
Changes related to recovery from snapshot
Review changes
Rebased with latest

Tasks to be done:
1. Snapshot capture criteria
2. Handling Snapshot failure
3. Configuration Interface
4. Install snapshot

Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
Reinstate from snapshot + review changes

Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
TestDriver changes

Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
Log interpolation change

Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
Rebased.

Change-Id: Ie0d8341f501fef87f48e8ea6f5d6b6e878d1abe3
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java [new file with mode: 0644]

index 641ec05..90bf121 100644 (file)
@@ -56,10 +56,12 @@ public class ExampleActor extends RaftActor {
             }
 
         } else if (message instanceof PrintState) {
-            LOG.debug("State of the node:"+getId() + " has = "+state.size() + " entries");
+            LOG.debug("State of the node:{} has entries={}, {}",
+                getId(), state.size(), getReplicatedLogState());
 
         } else if (message instanceof PrintRole) {
-            LOG.debug(getId() + " = " + getRaftState());
+            LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),getPeers());
+
         } else {
             super.onReceiveCommand(message);
         }
@@ -83,6 +85,7 @@ public class ExampleActor extends RaftActor {
     @Override protected void applySnapshot(Object snapshot) {
         state.clear();
         state.putAll((HashMap) snapshot);
+        LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
     }
 
     @Override public void onReceiveRecover(Object message) {
index c2d0b3a..18c14e7 100644 (file)
@@ -11,14 +11,13 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This is a test driver for testing akka-raft implementation
  * Its uses ExampleActors and threads to push content(key-vals) to these actors
  * Each ExampleActor can have one or more ClientActors. Each ClientActor spawns
- * a thread and starts push logs to the actor its assignged to.
+ * a thread and starts push logs to the actor its assigned to.
  */
 public class TestDriver {
 
@@ -26,7 +25,8 @@ public class TestDriver {
     private static Map<String, String> allPeers = new HashMap<>();
     private static Map<String, ActorRef> clientActorRefs  = new HashMap<String, ActorRef>();
     private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
-    private static LogGenerator logGenerator = new LogGenerator();;
+    private static LogGenerator logGenerator = new LogGenerator();
+    private int nameCounter = 0;
 
     /**
      * Create nodes, add clients and start logging.
@@ -35,6 +35,7 @@ public class TestDriver {
      *  createNodes:{num}
      *  addNodes:{num}
      *  stopNode:{nodeName}
+     *  reinstateNode:{nodeName}
      *  addClients:{num}
      *  addClientsToNode:{nodeName, num}
      *  startLogging
@@ -83,6 +84,10 @@ public class TestDriver {
                 String[] arr = command.split(":");
                 td.stopNode(arr[1]);
 
+            } else if (command.startsWith("reinstateNode")) {
+                String[] arr = command.split(":");
+                td.reinstateNode(arr[1]);
+
             } else if (command.startsWith("startLogging")) {
                 td.startAllLogging();
 
@@ -108,8 +113,8 @@ public class TestDriver {
 
     public void createNodes(int num) {
         for (int i=0; i < num; i++)  {
-            int rand = getUnusedRandom(num);
-            allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
+            nameCounter = nameCounter + 1;
+            allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
         }
 
         for (String s : allPeers.keySet())  {
@@ -125,9 +130,9 @@ public class TestDriver {
     public void addNodes(int num) {
         Map<String, String> newPeers = new HashMap<>();
         for (int i=0; i < num; i++)  {
-            int rand = getUnusedRandom(num);
-            newPeers.put("example-"+rand, "akka://default/user/example-"+rand);
-            allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
+            nameCounter = nameCounter + 1;
+            newPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
+            allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
 
         }
         Map<String, ActorRef> newActorRefs = new HashMap<String, ActorRef>(num);
@@ -165,7 +170,7 @@ public class TestDriver {
     public void addClientsToNode(String actorName, int num) {
         ActorRef actorRef = actorRefs.get(actorName);
         for (int i=0; i < num; i++) {
-            String clientName = "client-" + i + "-" + actorRef;
+            String clientName = "client-" + i + "-" + actorName;
             clientActorRefs.put(clientName,
                 actorSystem.actorOf(ClientActor.props(actorRef), clientName));
             System.out.println("Added client-node:" + clientName);
@@ -174,11 +179,13 @@ public class TestDriver {
 
     public void stopNode(String actorName) {
         ActorRef actorRef = actorRefs.get(actorName);
-        String clientName = "client-"+actorName;
-        if(clientActorRefs.containsKey(clientName)) {
-            actorSystem.stop(clientActorRefs.get(clientName));
-            clientActorRefs.remove(clientName);
+
+        for (Map.Entry<String,ActorRef> entry : clientActorRefs.entrySet()) {
+            if (entry.getKey().endsWith(actorName)) {
+                actorSystem.stop(entry.getValue());
+            }
         }
+
         actorSystem.stop(actorRef);
         actorRefs.remove(actorName);
 
@@ -187,7 +194,21 @@ public class TestDriver {
         }
 
         allPeers.remove(actorName);
+    }
+
+    public void reinstateNode(String actorName) {
+        String address = "akka://default/user/"+actorName;
+        allPeers.put(actorName, address);
 
+        ActorRef exampleActor = actorSystem.actorOf(ExampleActor.props(actorName, withoutPeer(actorName)), actorName);
+
+        for (ActorRef actor : actorRefs.values()) {
+            actor.tell(new AddRaftPeer(actorName, address), null);
+        }
+
+        actorRefs.put(actorName, exampleActor);
+
+        addClientsToNode(actorName, 1);
     }
 
     public void startAllLogging() {
@@ -232,14 +253,6 @@ public class TestDriver {
         return null;
     }
 
-    private int getUnusedRandom(int num) {
-        int rand = -1;
-        do {
-            rand = (new Random()).nextInt(num * num);
-        } while (allPeers.keySet().contains("example-"+rand));
-
-        return rand;
-    }
 
     private static Map<String, String> withoutPeer(String peerId) {
         Map<String, String> without = new ConcurrentHashMap<>(allPeers);
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
new file mode 100644 (file)
index 0000000..24bfa3d
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract class handling the mapping of
+ * logical LogEntry Index and the physical list index.
+ */
+public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
+
+    protected final List<ReplicatedLogEntry> journal;
+    protected final Object snapshot;
+    protected long snapshotIndex = -1;
+    protected long snapshotTerm = -1;
+
+    public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+        long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+        this.snapshot = state;
+        this.snapshotIndex = snapshotIndex;
+        this.snapshotTerm = snapshotTerm;
+        this.journal = new ArrayList<>(unAppliedEntries);
+    }
+
+
+    public AbstractReplicatedLogImpl() {
+        this.snapshot = null;
+        this.journal = new ArrayList<>();
+    }
+
+    protected int adjustedIndex(long logEntryIndex) {
+        if(snapshotIndex < 0){
+            return (int) logEntryIndex;
+        }
+        return (int) (logEntryIndex - (snapshotIndex + 1));
+    }
+
+    @Override
+    public ReplicatedLogEntry get(long logEntryIndex) {
+        int adjustedIndex = adjustedIndex(logEntryIndex);
+
+        if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+            // physical index should be less than list size and >= 0
+            return null;
+        }
+
+        return journal.get(adjustedIndex);
+    }
+
+    @Override
+    public ReplicatedLogEntry last() {
+        if (journal.isEmpty()) {
+            return null;
+        }
+        // get the last entry directly from the physical index
+        return journal.get(journal.size() - 1);
+    }
+
+    @Override
+    public long lastIndex() {
+        if (journal.isEmpty()) {
+            // it can happen that after snapshot, all the entries of the
+            // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
+            return snapshotIndex;
+        }
+        return last().getIndex();
+    }
+
+    @Override
+    public long lastTerm() {
+        if (journal.isEmpty()) {
+            // it can happen that after snapshot, all the entries of the
+            // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
+            return snapshotTerm;
+        }
+        return last().getTerm();
+    }
+
+    @Override
+    public void removeFrom(long logEntryIndex) {
+        int adjustedIndex = adjustedIndex(logEntryIndex);
+        if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+            // physical index should be less than list size and >= 0
+            return;
+        }
+        journal.subList(adjustedIndex , journal.size()).clear();
+    }
+
+    @Override
+    public void append(ReplicatedLogEntry replicatedLogEntry) {
+        journal.add(replicatedLogEntry);
+    }
+
+    @Override
+    public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
+        int adjustedIndex = adjustedIndex(logEntryIndex);
+        int size = journal.size();
+        List<ReplicatedLogEntry> entries = new ArrayList<>(100);
+        if (adjustedIndex >= 0 && adjustedIndex < size) {
+            // physical index should be less than list size and >= 0
+            entries.addAll(journal.subList(adjustedIndex, size));
+        }
+        return entries;
+    }
+
+    @Override
+    public long size() {
+       return journal.size();
+    }
+
+    @Override
+    public boolean isPresent(long logEntryIndex) {
+        if (logEntryIndex > lastIndex()) {
+            // if the request logical index is less than the last present in the list
+            return false;
+        }
+        int adjustedIndex = adjustedIndex(logEntryIndex);
+        return (adjustedIndex >= 0);
+    }
+
+    @Override
+    public boolean isInSnapshot(long logEntryIndex) {
+        return logEntryIndex <= snapshotIndex;
+    }
+
+    @Override
+    public Object getSnapshot() {
+        return snapshot;
+    }
+
+    @Override
+    public long getSnapshotIndex() {
+        return snapshotIndex;
+    }
+
+    @Override
+    public long getSnapshotTerm() {
+        return snapshotTerm;
+    }
+
+    @Override
+    public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
+
+    @Override
+    public abstract void removeFromAndPersist(long index);
+}
index b8e9653..36c86f5 100644 (file)
@@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -81,6 +80,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
+    private static final int SNAPSHOT_ENTRY_COUNT = 100000;
+
     /**
      * The current state determines the current behavior of a RaftActor
      * A Raft Actor always starts off in the Follower State
@@ -108,6 +109,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     @Override public void onReceiveRecover(Object message) {
         if (message instanceof SnapshotOffer) {
+            LOG.debug("SnapshotOffer called..");
             SnapshotOffer offer = (SnapshotOffer) message;
             Snapshot snapshot = (Snapshot) offer.snapshot();
 
@@ -116,6 +118,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
             // when we need to install it on a peer
             replicatedLog = new ReplicatedLogImpl(snapshot);
 
+            context.setReplicatedLog(replicatedLog);
+
+            LOG.debug("Applied snapshot to replicatedLog. " +
+                "snapshotIndex={}, snapshotTerm={}, journal-size={}",
+                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
+                replicatedLog.size());
+
             // Apply the snapshot to the actors state
             applySnapshot(snapshot.getState());
 
@@ -127,7 +136,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
             context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
         } else if (message instanceof RecoveryCompleted) {
             LOG.debug(
-                "Last index in log : " + replicatedLog.lastIndex());
+                "RecoveryCompleted - Switching actor to Follower - " +
+                    "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+                    "journal-size={}",
+                replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+                replicatedLog.snapshotTerm, replicatedLog.size());
             currentBehavior = switchBehavior(RaftState.Follower);
         }
     }
@@ -191,6 +204,15 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
+    public java.util.Set<String> getPeers() {
+        return context.getPeerAddresses().keySet();
+    }
+
+    protected String getReplicatedLogState() {
+        return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
+            + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
+            + ", im-mem journal size=" + context.getReplicatedLog().size();
+    }
 
 
     /**
@@ -343,85 +365,33 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
     private void trimPersistentData(long sequenceNumber) {
-        // Trim snapshots
+        // Trim akka snapshots
         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
         // For now guessing that it is ANDed.
         deleteSnapshots(new SnapshotSelectionCriteria(
-            sequenceNumber - 100000, 43200000));
+            sequenceNumber - SNAPSHOT_ENTRY_COUNT, 43200000));
 
-        // Trim journal
+        // Trim akka journal
         deleteMessages(sequenceNumber);
     }
 
 
-    private class ReplicatedLogImpl implements ReplicatedLog {
-        private final List<ReplicatedLogEntry> journal;
-        private final Object snapshot;
-        private long snapshotIndex = -1;
-        private long snapshotTerm = -1;
+    private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
         public ReplicatedLogImpl(Snapshot snapshot) {
-            this.snapshot = snapshot.getState();
-            this.snapshotIndex = snapshot.getLastAppliedIndex();
-            this.snapshotTerm = snapshot.getLastAppliedTerm();
-
-            this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
+            super(snapshot.getState(),
+                snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+                snapshot.getUnAppliedEntries());
         }
 
         public ReplicatedLogImpl() {
-            this.snapshot = null;
-            this.journal = new ArrayList<>();
-        }
-
-        @Override public ReplicatedLogEntry get(long index) {
-            int adjustedIndex = adjustedIndex(index);
-
-            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
-                return null;
-            }
-
-            return journal.get(adjustedIndex);
-        }
-
-        @Override public ReplicatedLogEntry last() {
-            if (journal.size() == 0) {
-                return null;
-            }
-            return get(journal.size() - 1);
-        }
-
-        @Override public long lastIndex() {
-            if (journal.size() == 0) {
-                return -1;
-            }
-
-            return last().getIndex();
-        }
-
-        @Override public long lastTerm() {
-            if (journal.size() == 0) {
-                return -1;
-            }
-
-            return last().getTerm();
-        }
-
-
-        @Override public void removeFrom(long index) {
-            int adjustedIndex = adjustedIndex(index);
-
-            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
-                return;
-            }
-
-            journal.subList(adjustedIndex , journal.size()).clear();
+            super();
         }
 
+        @Override public void removeFromAndPersist(long logEntryIndex) {
+            int adjustedIndex = adjustedIndex(logEntryIndex);
 
-        @Override public void removeFromAndPersist(long index) {
-            int adjustedIndex = adjustedIndex(index);
-
-            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+            if (adjustedIndex < 0) {
                 return;
             }
 
@@ -435,29 +405,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
                     //FIXME : Doing nothing for now
                 }
             });
-
-
-        }
-
-        @Override public void append(
-            final ReplicatedLogEntry replicatedLogEntry) {
-            journal.add(replicatedLogEntry);
-        }
-
-        @Override public List<ReplicatedLogEntry> getFrom(long index) {
-            int adjustedIndex = adjustedIndex(index);
-
-            List<ReplicatedLogEntry> entries = new ArrayList<>(100);
-            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
-                return entries;
-            }
-
-
-            for (int i = adjustedIndex;
-                 i < journal.size(); i++) {
-                entries.add(journal.get(i));
-            }
-            return entries;
         }
 
         @Override public void appendAndPersist(
@@ -482,20 +429,42 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 new Procedure<ReplicatedLogEntry>() {
                     public void apply(ReplicatedLogEntry evt) throws Exception {
                         // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
-                        if (size() > 100000) {
-                            ReplicatedLogEntry lastAppliedEntry =
-                                get(context.getLastApplied());
+                        if (journal.size() > SNAPSHOT_ENTRY_COUNT) {
+                            LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
+
+                            ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
                             if (lastAppliedEntry != null) {
                                 lastAppliedIndex = lastAppliedEntry.getIndex();
                                 lastAppliedTerm = lastAppliedEntry.getTerm();
                             }
 
-                            saveSnapshot(Snapshot.create(createSnapshot(),
+                            LOG.debug("Snapshot Capture logSize: {}", journal.size());
+                            LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied());
+                            LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
+                            LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+
+                            // create a snapshot object from the state provided and save it
+                            // when snapshot is saved async, SaveSnapshotSuccess is raised.
+                            Snapshot sn = Snapshot.create(createSnapshot(),
                                 getFrom(context.getLastApplied() + 1),
                                 lastIndex(), lastTerm(), lastAppliedIndex,
-                                lastAppliedTerm));
+                                lastAppliedTerm);
+                            saveSnapshot(sn);
+
+                            LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+
+                            //be greedy and remove entries from in-mem journal which are in the snapshot
+                            // and update snapshotIndex and snapshotTerm without waiting for the success,
+                            // TODO: damage-recovery to be done on failure
+                            journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear();
+                            snapshotIndex = lastAppliedIndex;
+                            snapshotTerm = lastAppliedTerm;
+
+                            LOG.info("Removed in-memory snapshotted entries, " +
+                                "adjusted snaphsotIndex:{}" +
+                                "and term:{}", snapshotIndex, lastAppliedTerm);
                         }
                         // Send message for replication
                         if (clientActor != null) {
@@ -509,46 +478,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
             );
         }
 
-        @Override public long size() {
-            return journal.size() + snapshotIndex + 1;
-        }
-
-        @Override public boolean isPresent(long index) {
-            int adjustedIndex = adjustedIndex(index);
-
-            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
-                return false;
-            }
-            return true;
-        }
-
-        @Override public boolean isInSnapshot(long index) {
-            return index <= snapshotIndex;
-        }
-
-        @Override public Object getSnapshot() {
-            return snapshot;
-        }
-
-        @Override public long getSnapshotIndex() {
-            return snapshotIndex;
-        }
-
-        @Override public long getSnapshotTerm() {
-            return snapshotTerm;
-        }
-
-        private int adjustedIndex(long index) {
-            if(snapshotIndex < 0){
-                return (int) index;
-            }
-            return (int) (index - snapshotIndex);
-        }
     }
 
-
-
-
     private static class DeleteEntries implements Serializable {
         private final int fromIndex;
 
@@ -609,6 +540,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
         public long getLastAppliedTerm() {
             return lastAppliedTerm;
         }
+
+        public String getLogMessage() {
+            StringBuilder sb = new StringBuilder();
+            return sb.append("Snapshot={")
+                .append("lastTerm:" + this.getLastTerm()  + ", ")
+                .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
+                .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
+                .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
+                .toString();
+
+        }
     }
 
     private class ElectionTermImpl implements ElectionTerm {
index ae9431a..ec80055 100644 (file)
@@ -85,6 +85,12 @@ public interface RaftActorContext {
      */
     void setLastApplied(long lastApplied);
 
+    /**
+     *
+     * @param replicatedLog
+     */
+    public void setReplicatedLog(ReplicatedLog replicatedLog);
+
     /**
      * @return A representation of the log
      */
index 833c8a9..0fbc55d 100644 (file)
@@ -33,7 +33,7 @@ public class RaftActorContextImpl implements RaftActorContext{
 
     private long lastApplied;
 
-    private final ReplicatedLog replicatedLog;
+    private ReplicatedLog replicatedLog;
 
     private final Map<String, String> peerAddresses;
 
@@ -90,6 +90,10 @@ public class RaftActorContextImpl implements RaftActorContext{
         this.lastApplied = lastApplied;
     }
 
+    @Override public void setReplicatedLog(ReplicatedLog replicatedLog) {
+        this.replicatedLog = replicatedLog;
+    }
+
     @Override public ReplicatedLog getReplicatedLog() {
         return replicatedLog;
     }
index db62dfc..c8cd41d 100644 (file)
@@ -126,15 +126,10 @@ public class Follower extends AbstractRaftActorBehavior {
             int addEntriesFrom = 0;
             if (context.getReplicatedLog().size() > 0) {
 
-                // Find the entry up until which the one that is not in the
-                // follower's log
-                for (int i = 0;
-                     i < appendEntries.getEntries()
-                         .size(); i++, addEntriesFrom++) {
-                    ReplicatedLogEntry matchEntry =
-                        appendEntries.getEntries().get(i);
-                    ReplicatedLogEntry newEntry = context.getReplicatedLog()
-                        .get(matchEntry.getIndex());
+                // Find the entry up until which the one that is not in the follower's log
+                for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
+                    ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
+                    ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
 
                     if (newEntry == null) {
                         //newEntry not found in the log
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
new file mode 100644 (file)
index 0000000..ae8e525
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import static org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+/**
+*
+*/
+public class AbstractReplicatedLogImplTest {
+
+    private MockAbstractReplicatedLogImpl replicatedLogImpl;
+
+    @Before
+    public void setUp() {
+        replicatedLogImpl = new MockAbstractReplicatedLogImpl();
+    }
+
+    @After
+    public void tearDown() {
+        replicatedLogImpl.journal.clear();
+        replicatedLogImpl.setSnapshotIndex(-1);
+        replicatedLogImpl.setSnapshotTerm(-1);
+        replicatedLogImpl = null;
+    }
+
+    @Test
+    public void testIndexOperations() {
+        // create a set of initial entries in the in-memory log
+        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 0, new MockPayload("A")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 1, new MockPayload("B")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(1, 2, new MockPayload("C")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 3, new MockPayload("D")));
+
+        // check if the values returned are correct, with snapshotIndex = -1
+        assertEquals("B", replicatedLogImpl.get(1).getData().toString());
+        assertEquals("D", replicatedLogImpl.last().getData().toString());
+        assertEquals(3, replicatedLogImpl.lastIndex());
+        assertEquals(2, replicatedLogImpl.lastTerm());
+        assertEquals(2, replicatedLogImpl.getFrom(2).size());
+        assertEquals(4, replicatedLogImpl.size());
+        assertTrue(replicatedLogImpl.isPresent(2));
+        assertFalse(replicatedLogImpl.isPresent(4));
+        assertFalse(replicatedLogImpl.isInSnapshot(2));
+
+        // now create a snapshot of 3 entries, with 1 unapplied entry left in the log
+        // It removes the entries which have made it to snapshot
+        // and updates the snapshot index and term
+        Map state = takeSnapshot(3);
+
+        // check the values after the snapshot.
+        // each index value passed in the test is the logical index (log entry index)
+        // which gets mapped to the list's physical index
+        assertEquals("D", replicatedLogImpl.get(3).getData().toString());
+        assertEquals("D", replicatedLogImpl.last().getData().toString());
+        assertNull(replicatedLogImpl.get(1));
+        assertEquals(3, replicatedLogImpl.lastIndex());
+        assertEquals(2, replicatedLogImpl.lastTerm());
+        assertEquals(0, replicatedLogImpl.getFrom(2).size());
+        assertEquals(1, replicatedLogImpl.size());
+        assertFalse(replicatedLogImpl.isPresent(2));
+        assertTrue(replicatedLogImpl.isPresent(3));
+        assertFalse(replicatedLogImpl.isPresent(4));
+        assertTrue(replicatedLogImpl.isInSnapshot(2));
+
+        // append few more entries
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
+
+        // check their values as well
+        assertEquals(5, replicatedLogImpl.size());
+        assertEquals("D", replicatedLogImpl.get(3).getData().toString());
+        assertEquals("E", replicatedLogImpl.get(4).getData().toString());
+        assertEquals("H", replicatedLogImpl.last().getData().toString());
+        assertEquals(3, replicatedLogImpl.lastTerm());
+        assertEquals(7, replicatedLogImpl.lastIndex());
+        assertTrue(replicatedLogImpl.isPresent(7));
+        assertFalse(replicatedLogImpl.isInSnapshot(7));
+        assertEquals(1, replicatedLogImpl.getFrom(7).size());
+        assertEquals(2, replicatedLogImpl.getFrom(6).size());
+
+        // take a second snapshot with 5 entries with 0 unapplied entries left in the log
+        state = takeSnapshot(5);
+
+        assertEquals(0, replicatedLogImpl.size());
+        assertNull(replicatedLogImpl.last());
+        assertNull(replicatedLogImpl.get(7));
+        assertNull(replicatedLogImpl.get(1));
+        assertFalse(replicatedLogImpl.isPresent(7));
+        assertTrue(replicatedLogImpl.isInSnapshot(7));
+        assertEquals(0, replicatedLogImpl.getFrom(7).size());
+        assertEquals(0, replicatedLogImpl.getFrom(6).size());
+
+    }
+
+    // create a snapshot for test
+    public Map takeSnapshot(int numEntries) {
+        Map map = new HashMap(numEntries);
+        List<ReplicatedLogEntry> entries = replicatedLogImpl.getEntriesTill(numEntries);
+        for (ReplicatedLogEntry entry : entries) {
+            map.put(entry.getIndex(), entry.getData().toString());
+        }
+
+        int term = (int) replicatedLogImpl.lastTerm();
+        int lastIndex = (int) entries.get(entries.size() - 1).getIndex();
+        entries.clear();
+        replicatedLogImpl.setSnapshotTerm(term);
+        replicatedLogImpl.setSnapshotIndex(lastIndex);
+
+        return map;
+
+    }
+    class MockAbstractReplicatedLogImpl extends AbstractReplicatedLogImpl {
+        @Override
+        public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry) {
+        }
+
+        @Override
+        public void removeFromAndPersist(long index) {
+        }
+
+        public void setSnapshotIndex(long snapshotIndex) {
+            this.snapshotIndex = snapshotIndex;
+        }
+
+        public void setSnapshotTerm(long snapshotTerm) {
+            this.snapshotTerm = snapshotTerm;
+        }
+
+        public List<ReplicatedLogEntry> getEntriesTill(int index) {
+            return journal.subList(0, index);
+        }
+    }
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.