Merge "Add LeadershipChangeCount to ShardStats"
authorMoiz Raja <moraja@cisco.com>
Sat, 11 Apr 2015 01:13:56 +0000 (01:13 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 11 Apr 2015 01:13:56 +0000 (01:13 +0000)
21 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntries.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ElectionTermImplTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntriesTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/DataPersistenceProviderMonitor.java

index c27ff373865069df16b08eae829e1722ff74a5f8..b4b2afbc4ad602ccd1bc9f50da8641cd0b05f605 100644 (file)
@@ -209,7 +209,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
 
         snapshottedJournal.addAll(snapshotJournalEntries);
-        clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+        snapshotJournalEntries.clear();
 
         previousSnapshotIndex = snapshotIndex;
         setSnapshotIndex(snapshotCapturedIndex);
index 41a807aa355d394f659f488209f65e732646b120..2185ffdef3886bdb75d0123bf981ce60b2f39bb5 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -152,8 +153,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     @Override
     public void handleRecover(Object message) {
         if(raftRecovery == null) {
-            raftRecovery = new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
-                    getRaftActorRecoveryCohort());
+            raftRecovery = newRaftActorRecoverySupport();
         }
 
         boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message);
@@ -175,6 +175,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
+        return new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
+                getRaftActorRecoveryCohort());
+    }
+
     protected void initializeBehavior(){
         changeCurrentBehavior(new Follower(context));
     }
@@ -188,8 +193,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     @Override
     public void handleCommand(Object message) {
         if(snapshotSupport == null) {
-            snapshotSupport = new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
-                    currentBehavior, getRaftActorSnapshotCohort(), self());
+            snapshotSupport = newRaftActorSnapshotMessageSupport();
         }
 
         boolean handled = snapshotSupport.handleSnapshotMessage(message);
@@ -239,6 +243,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
+        return new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
+                currentBehavior, getRaftActorSnapshotCohort());
+    }
+
     private void onGetOnDemandRaftStats() {
         // Debugging message to retrieve raft stats.
 
@@ -562,6 +571,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return getRaftActorContext().hasFollowers();
     }
 
+    /**
+     * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntriesTest}
+     *             whose type for fromIndex is long instead of int. This class was kept for backwards
+     *             compatibility with Helium.
+     */
+    @Deprecated
     static class DeleteEntries implements Serializable {
         private static final long serialVersionUID = 1L;
         private final int fromIndex;
index d2c14de73df482d4a3caebc4d6b096019f21fcd9..5dc8361cc43470b72165dfd2c7b51d6372ad3438 100644 (file)
@@ -11,10 +11,10 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
 
@@ -57,6 +57,9 @@ class RaftActorRecoverySupport {
                 onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
             } else if (message instanceof DeleteEntries) {
                 replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
+            } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) {
+                // Handle this message for backwards compatibility with pre-Lithium versions.
+                replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex());
             } else if (message instanceof UpdateElectionTerm) {
                 context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
                         ((UpdateElectionTerm) message).getVotedFor());
index 21c8ffa68e44763a14f0d5865ad03d8104e25d60..790ff89510ab222473a68b8105eab31c32158dd2 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import akka.actor.ActorRef;
 import akka.japi.Procedure;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
@@ -30,23 +29,21 @@ class RaftActorSnapshotMessageSupport {
     private final RaftActorContext context;
     private final RaftActorBehavior currentBehavior;
     private final RaftActorSnapshotCohort cohort;
-    private final ActorRef raftActorRef;
     private final Logger log;
 
     private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
         @Override
         public void apply(Void notUsed) throws Exception {
-            cohort.createSnapshot(raftActorRef);
+            cohort.createSnapshot(context.getActor());
         }
     };
 
     RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
-            RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort, ActorRef raftActorRef) {
+            RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort) {
         this.persistence = persistence;
         this.context = context;
         this.currentBehavior = currentBehavior;
         this.cohort = cohort;
-        this.raftActorRef = raftActorRef;
         this.log = context.getLogger();
     }
 
index 5a77b9aea3ac4b008af17e760ac5a0f53349220e..1cfe1538699b9748557b385c92c7f81e2b536756 100644 (file)
@@ -11,7 +11,7 @@ import akka.japi.Procedure;
 import java.util.Collections;
 import java.util.List;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
 /**
@@ -27,7 +27,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
     private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
         @Override
-        public void apply(DeleteEntries param) {
+        public void apply(DeleteEntries notUsed) {
         }
     };
 
@@ -56,7 +56,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         // FIXME: Maybe this should be done after the command is saved
         long adjustedIndex = removeFrom(logEntryIndex);
         if(adjustedIndex >= 0) {
-            persistence.persist(new DeleteEntries((int)adjustedIndex), deleteProcedure);
+            persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
         }
     }
 
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntries.java
new file mode 100644 (file)
index 0000000..97742c0
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
+
+/**
+ * Internal message that is stored in the akka's persistent journal to delete journal entries.
+ *
+ * @author Thomas Pantelis
+ */
+public class DeleteEntries implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final long fromIndex;
+
+    public DeleteEntries(long fromIndex) {
+        this.fromIndex = fromIndex;
+    }
+
+    public long getFromIndex() {
+        return fromIndex;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("DeleteEntries [fromIndex=").append(fromIndex).append("]");
+        return builder.toString();
+    }
+}
index 3c6c8281fb734b0d378963c831c2f462fbcc3182..1289ed7f905f23919c32068a91fe0c29db0f3214 100644 (file)
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.RaftActorTest.MockRaftActor;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -184,7 +183,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     }
 
     protected void waitUntilLeader(ActorRef actorRef) {
-        RaftActorTest.RaftActorTestKit.waitUntilLeader(actorRef);
+        RaftActorTestKit.waitUntilLeader(actorRef);
     }
 
     protected TestActorRef<TestRaftActor> newTestRaftActor(String id, Map<String, String> peerAddresses,
index c99f253657734cee8112f42e85b93e7feeebf215..d175289af56c0226a938ccae5cc3d37632d3d2ec 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -39,6 +40,35 @@ public class AbstractReplicatedLogImplTest {
 
     }
 
+    @Test
+    public void testEmptyLog() {
+        replicatedLogImpl = new MockAbstractReplicatedLogImpl();
+
+        assertEquals("size", 0, replicatedLogImpl.size());
+        assertEquals("dataSize", 0, replicatedLogImpl.dataSize());
+        assertEquals("getSnapshotIndex", -1, replicatedLogImpl.getSnapshotIndex());
+        assertEquals("getSnapshotTerm", -1, replicatedLogImpl.getSnapshotTerm());
+        assertEquals("lastIndex", -1, replicatedLogImpl.lastIndex());
+        assertEquals("lastTerm", -1, replicatedLogImpl.lastTerm());
+        assertEquals("isPresent", false, replicatedLogImpl.isPresent(0));
+        assertEquals("isInSnapshot", false, replicatedLogImpl.isInSnapshot(0));
+        Assert.assertNull("get(0)", replicatedLogImpl.get(0));
+        Assert.assertNull("last", replicatedLogImpl.last());
+
+        List<ReplicatedLogEntry> list = replicatedLogImpl.getFrom(0, 1);
+        assertEquals("getFrom size", 0, list.size());
+
+        assertEquals("removeFrom", -1, replicatedLogImpl.removeFrom(1));
+
+        replicatedLogImpl.setSnapshotIndex(2);
+        replicatedLogImpl.setSnapshotTerm(1);
+
+        assertEquals("getSnapshotIndex", 2, replicatedLogImpl.getSnapshotIndex());
+        assertEquals("getSnapshotTerm", 1, replicatedLogImpl.getSnapshotTerm());
+        assertEquals("lastIndex", 2, replicatedLogImpl.lastIndex());
+        assertEquals("lastTerm", 1, replicatedLogImpl.lastTerm());
+    }
+
     @Test
     public void testIndexOperations() {
 
@@ -133,24 +163,67 @@ public class AbstractReplicatedLogImplTest {
         replicatedLogImpl.snapshotPreCommit(-1, -1);
         assertEquals(8, replicatedLogImpl.size());
         assertEquals(-1, replicatedLogImpl.getSnapshotIndex());
+        assertEquals(-1, replicatedLogImpl.getSnapshotTerm());
 
-        replicatedLogImpl.snapshotPreCommit(4, 3);
+        replicatedLogImpl.snapshotPreCommit(4, 2);
         assertEquals(3, replicatedLogImpl.size());
         assertEquals(4, replicatedLogImpl.getSnapshotIndex());
+        assertEquals(2, replicatedLogImpl.getSnapshotTerm());
 
         replicatedLogImpl.snapshotPreCommit(6, 3);
         assertEquals(1, replicatedLogImpl.size());
         assertEquals(6, replicatedLogImpl.getSnapshotIndex());
+        assertEquals(3, replicatedLogImpl.getSnapshotTerm());
 
         replicatedLogImpl.snapshotPreCommit(7, 3);
         assertEquals(0, replicatedLogImpl.size());
         assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+        assertEquals(3, replicatedLogImpl.getSnapshotTerm());
 
         //running it again on an empty list should not throw exception
         replicatedLogImpl.snapshotPreCommit(7, 3);
         assertEquals(0, replicatedLogImpl.size());
         assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+        assertEquals(3, replicatedLogImpl.getSnapshotTerm());
+    }
+
+    @Test
+    public void testSnapshotCommit() {
+
+        replicatedLogImpl.snapshotPreCommit(1, 1);
+
+        replicatedLogImpl.snapshotCommit();
+
+        assertEquals("size", 2, replicatedLogImpl.size());
+        assertEquals("dataSize", 2, replicatedLogImpl.dataSize());
+        assertEquals("getSnapshotIndex", 1, replicatedLogImpl.getSnapshotIndex());
+        assertEquals("getSnapshotTerm", 1, replicatedLogImpl.getSnapshotTerm());
+        assertEquals("lastIndex", 3, replicatedLogImpl.lastIndex());
+        assertEquals("lastTerm", 2, replicatedLogImpl.lastTerm());
+
+        Assert.assertNull("get(0)", replicatedLogImpl.get(0));
+        Assert.assertNull("get(1)", replicatedLogImpl.get(1));
+        Assert.assertNotNull("get(2)", replicatedLogImpl.get(2));
+        Assert.assertNotNull("get(3)", replicatedLogImpl.get(3));
+    }
+
+    @Test
+    public void testSnapshotRollback() {
 
+        replicatedLogImpl.snapshotPreCommit(1, 1);
+
+        assertEquals("size", 2, replicatedLogImpl.size());
+        assertEquals("getSnapshotIndex", 1, replicatedLogImpl.getSnapshotIndex());
+        assertEquals("getSnapshotTerm", 1, replicatedLogImpl.getSnapshotTerm());
+
+        replicatedLogImpl.snapshotRollback();
+
+        assertEquals("size", 4, replicatedLogImpl.size());
+        assertEquals("dataSize", 4, replicatedLogImpl.dataSize());
+        assertEquals("getSnapshotIndex", -1, replicatedLogImpl.getSnapshotIndex());
+        assertEquals("getSnapshotTerm", -1, replicatedLogImpl.getSnapshotTerm());
+        Assert.assertNotNull("get(0)", replicatedLogImpl.get(0));
+        Assert.assertNotNull("get(3)", replicatedLogImpl.get(3));
     }
 
     @Test
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ElectionTermImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ElectionTermImplTest.java
new file mode 100644 (file)
index 0000000..da49718
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import akka.japi.Procedure;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for ElectionTermImpl.
+ *
+ * @author Thomas Pantelis
+ */
+public class ElectionTermImplTest {
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
+
+    @Mock
+    private DataPersistenceProvider mockPersistence;
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void testUpdateAndPersist() throws Exception {
+        ElectionTermImpl impl = new ElectionTermImpl(mockPersistence, "test", LOG);
+
+        impl.updateAndPersist(10, "member-1");
+
+        assertEquals("getCurrentTerm", 10, impl.getCurrentTerm());
+        assertEquals("getVotedFor", "member-1", impl.getVotedFor());
+
+        ArgumentCaptor<Object> message = ArgumentCaptor.forClass(Object.class);
+        ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
+        verify(mockPersistence).persist(message.capture(), procedure.capture());
+
+        assertEquals("Message type", UpdateElectionTerm.class, message.getValue().getClass());
+        UpdateElectionTerm update = (UpdateElectionTerm)message.getValue();
+        assertEquals("getCurrentTerm", 10, update.getCurrentTerm());
+        assertEquals("getVotedFor", "member-1", update.getVotedFor());
+
+        procedure.getValue().apply(null);
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
new file mode 100644 (file)
index 0000000..53110b3
--- /dev/null
@@ -0,0 +1,260 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
+
+    final RaftActor actorDelegate;
+    final RaftActorRecoveryCohort recoveryCohortDelegate;
+    final RaftActorSnapshotCohort snapshotCohortDelegate;
+    private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+    private final List<Object> state;
+    private ActorRef roleChangeNotifier;
+    private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
+    private RaftActorRecoverySupport raftActorRecoverySupport;
+    private RaftActorSnapshotMessageSupport snapshotMessageSupport;
+
+    public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
+        private static final long serialVersionUID = 1L;
+        private final Map<String, String> peerAddresses;
+        private final String id;
+        private final Optional<ConfigParams> config;
+        private final DataPersistenceProvider dataPersistenceProvider;
+        private final ActorRef roleChangeNotifier;
+        private RaftActorSnapshotMessageSupport snapshotMessageSupport;
+
+        private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
+            Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
+            ActorRef roleChangeNotifier) {
+            this.peerAddresses = peerAddresses;
+            this.id = id;
+            this.config = config;
+            this.dataPersistenceProvider = dataPersistenceProvider;
+            this.roleChangeNotifier = roleChangeNotifier;
+        }
+
+        @Override
+        public MockRaftActor create() throws Exception {
+            MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
+                dataPersistenceProvider);
+            mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
+            mockRaftActor.snapshotMessageSupport = snapshotMessageSupport;
+            return mockRaftActor;
+        }
+    }
+
+    public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+                         DataPersistenceProvider dataPersistenceProvider) {
+        super(id, peerAddresses, config);
+        state = new ArrayList<>();
+        this.actorDelegate = mock(RaftActor.class);
+        this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
+        this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
+        if(dataPersistenceProvider == null){
+            setPersistence(true);
+        } else {
+            setPersistence(dataPersistenceProvider);
+        }
+    }
+
+    public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
+        raftActorRecoverySupport = support;
+    }
+
+    @Override
+    public RaftActorRecoverySupport newRaftActorRecoverySupport() {
+        return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
+    }
+
+    @Override
+    protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
+        return snapshotMessageSupport != null ? snapshotMessageSupport : super.newRaftActorSnapshotMessageSupport();
+    }
+
+    public void waitForRecoveryComplete() {
+        try {
+            assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void waitForInitializeBehaviorComplete() {
+        try {
+            assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    public void waitUntilLeader(){
+        for(int i = 0;i < 10; i++){
+            if(isLeader()){
+                break;
+            }
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    public List<Object> getState() {
+        return state;
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+            Optional<ConfigParams> config){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+            Optional<ConfigParams> config, RaftActorSnapshotMessageSupport snapshotMessageSupport){
+        MockRaftActorCreator creator = new MockRaftActorCreator(peerAddresses, id, config, null, null);
+        creator.snapshotMessageSupport = snapshotMessageSupport;
+        return Props.create(creator);
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+                              Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+        Optional<ConfigParams> config, ActorRef roleChangeNotifier){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+                              Optional<ConfigParams> config, ActorRef roleChangeNotifier,
+                              DataPersistenceProvider dataPersistenceProvider){
+        return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
+    }
+
+
+    @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+        actorDelegate.applyState(clientActor, identifier, data);
+        LOG.info("{}: applyState called", persistenceId());
+    }
+
+    @Override
+    @Nonnull
+    protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+        return this;
+    }
+
+    @Override
+    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+        return this;
+    }
+
+    @Override
+    public void startLogRecoveryBatch(int maxBatchSize) {
+    }
+
+    @Override
+    public void appendRecoveredLogEntry(Payload data) {
+        state.add(data);
+    }
+
+    @Override
+    public void applyCurrentLogRecoveryBatch() {
+    }
+
+    @Override
+    protected void onRecoveryComplete() {
+        actorDelegate.onRecoveryComplete();
+        recoveryComplete.countDown();
+    }
+
+    @Override
+    protected void initializeBehavior() {
+        super.initializeBehavior();
+        initializeBehaviorComplete.countDown();
+    }
+
+    @Override
+    public void applyRecoverySnapshot(byte[] bytes) {
+        recoveryCohortDelegate.applyRecoverySnapshot(bytes);
+        try {
+            Object data = toObject(bytes);
+            if (data instanceof List) {
+                state.addAll((List<?>) data);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void createSnapshot(ActorRef actorRef) {
+        LOG.info("{}: createSnapshot called", persistenceId());
+        snapshotCohortDelegate.createSnapshot(actorRef);
+    }
+
+    @Override
+    public void applySnapshot(byte [] snapshot) {
+        LOG.info("{}: applySnapshot called", persistenceId());
+        snapshotCohortDelegate.applySnapshot(snapshot);
+    }
+
+    @Override
+    protected void onStateChanged() {
+        actorDelegate.onStateChanged();
+    }
+
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return Optional.fromNullable(roleChangeNotifier);
+    }
+
+    @Override public String persistenceId() {
+        return this.getId();
+    }
+
+    private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
+        Object obj = null;
+        ByteArrayInputStream bis = null;
+        ObjectInputStream ois = null;
+        try {
+            bis = new ByteArrayInputStream(bs);
+            ois = new ObjectInputStream(bis);
+            obj = ois.readObject();
+        } finally {
+            if (bis != null) {
+                bis.close();
+            }
+            if (ois != null) {
+                ois.close();
+            }
+        }
+        return obj;
+    }
+
+    public ReplicatedLog getReplicatedLog(){
+        return this.getRaftActorContext().getReplicatedLog();
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
new file mode 100644 (file)
index 0000000..7420fc4
--- /dev/null
@@ -0,0 +1,311 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotOffer;
+import java.util.Arrays;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for RaftActorRecoverySupport.
+ *
+ * @author Thomas Pantelis
+ */
+public class RaftActorRecoverySupportTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
+
+    @Mock
+    private DataPersistenceProvider mockPersistence;
+
+    @Mock
+    private RaftActorBehavior mockBehavior;
+
+    @Mock
+    private RaftActorRecoveryCohort mockCohort;
+
+    private RaftActorRecoverySupport support;
+
+    private RaftActorContext context;
+    private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+
+        context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG),
+                -1, -1, Collections.<String,String>emptyMap(), configParams, LOG);
+
+        support = new RaftActorRecoverySupport(mockPersistence, context , mockBehavior, mockCohort);
+
+        doReturn(true).when(mockPersistence).isRecoveryApplicable();
+
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+    }
+
+    private void sendMessageToSupport(Object message) {
+        sendMessageToSupport(message, false);
+    }
+
+    private void sendMessageToSupport(Object message, boolean expComplete) {
+        boolean complete = support.handleRecoveryMessage(message);
+        assertEquals("complete", expComplete, complete);
+    }
+
+    @Test
+    public void testOnReplicatedLogEntry() {
+        MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1", 5));
+
+        sendMessageToSupport(logEntry);
+
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Journal data size", 5, context.getReplicatedLog().dataSize());
+        assertEquals("Last index", 1, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", -1, context.getLastApplied());
+        assertEquals("Commit index", -1, context.getCommitIndex());
+        assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
+    }
+
+    @Test
+    public void testOnApplyJournalEntries() {
+        configParams.setJournalRecoveryLogBatchSize(5);
+
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                2, new MockRaftActorContext.MockPayload("2")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                3, new MockRaftActorContext.MockPayload("3")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                4, new MockRaftActorContext.MockPayload("4")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                5, new MockRaftActorContext.MockPayload("5")));
+
+        sendMessageToSupport(new ApplyJournalEntries(2));
+
+        assertEquals("Last applied", 2, context.getLastApplied());
+        assertEquals("Commit index", 2, context.getCommitIndex());
+
+        sendMessageToSupport(new ApplyJournalEntries(4));
+
+        assertEquals("Last applied", 4, context.getLastApplied());
+        assertEquals("Last applied", 4, context.getLastApplied());
+
+        sendMessageToSupport(new ApplyJournalEntries(5));
+
+        assertEquals("Last index", 5, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", 5, context.getLastApplied());
+        assertEquals("Commit index", 5, context.getCommitIndex());
+        assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
+
+        InOrder inOrder = Mockito.inOrder(mockCohort);
+        inOrder.verify(mockCohort).startLogRecoveryBatch(5);
+
+        for(int i = 0; i < replicatedLog.size() - 1; i++) {
+            inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
+        }
+
+        inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
+        inOrder.verify(mockCohort).startLogRecoveryBatch(5);
+        inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(replicatedLog.size() - 1).getData());
+
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    @Test
+    public void testOnApplyLogEntries() {
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+
+        sendMessageToSupport(new ApplyLogEntries(0));
+
+        assertEquals("Last applied", 0, context.getLastApplied());
+        assertEquals("Commit index", 0, context.getCommitIndex());
+    }
+
+    @Test
+    public void testOnSnapshotOffer() {
+
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                2, new MockRaftActorContext.MockPayload("2")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                3, new MockRaftActorContext.MockPayload("3")));
+
+        byte[] snapshotBytes = {1,2,3,4,5};
+
+        ReplicatedLogEntry unAppliedEntry1 = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                4, new MockRaftActorContext.MockPayload("4", 4));
+
+        ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                5, new MockRaftActorContext.MockPayload("5", 5));
+
+        int lastAppliedDuringSnapshotCapture = 3;
+        int lastIndexDuringSnapshotCapture = 5;
+
+        Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
+                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
+
+        SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+
+        sendMessageToSupport(snapshotOffer);
+
+        assertEquals("Journal log size", 2, context.getReplicatedLog().size());
+        assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
+        assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
+        assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
+        assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
+
+        verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
+    }
+
+    @Test
+    public void testOnRecoveryCompletedWithRemainingBatch() {
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+
+        sendMessageToSupport(new ApplyJournalEntries(1));
+
+        sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+
+        assertEquals("Last applied", 1, context.getLastApplied());
+        assertEquals("Commit index", 1, context.getCommitIndex());
+
+        InOrder inOrder = Mockito.inOrder(mockCohort);
+        inOrder.verify(mockCohort).startLogRecoveryBatch(anyInt());
+
+        for(int i = 0; i < replicatedLog.size(); i++) {
+            inOrder.verify(mockCohort).appendRecoveredLogEntry(replicatedLog.get(i).getData());
+        }
+
+        inOrder.verify(mockCohort).applyCurrentLogRecoveryBatch();
+
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    @Test
+    public void testOnRecoveryCompletedWithNoRemainingBatch() {
+        sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+
+        verifyNoMoreInteractions(mockCohort);
+    }
+
+    @Test
+    public void testOnDeprecatedDeleteEntries() {
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                2, new MockRaftActorContext.MockPayload("2")));
+
+        sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1));
+
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
+    }
+
+    @Test
+    public void testOnDeleteEntries() {
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                0, new MockRaftActorContext.MockPayload("0")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1")));
+        replicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                2, new MockRaftActorContext.MockPayload("2")));
+
+        sendMessageToSupport(new DeleteEntries(1));
+
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
+    }
+
+    @Test
+    public void testUpdateElectionTerm() {
+
+        sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
+
+        assertEquals("Current term", 5, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted For", "member2", context.getTermInformation().getVotedFor());
+    }
+
+    @Test
+    public void testRecoveryWithPersistenceDisabled() {
+        doReturn(false).when(mockPersistence).isRecoveryApplicable();
+
+        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
+
+        sendMessageToSupport(snapshotOffer);
+
+        sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                4, new MockRaftActorContext.MockPayload("4")));
+        sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1,
+                5, new MockRaftActorContext.MockPayload("5")));
+
+        sendMessageToSupport(new ApplyJournalEntries(4));
+
+        sendMessageToSupport(new DeleteEntries(5));
+
+        sendMessageToSupport(new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(5));
+
+        assertEquals("Journal log size", 0, context.getReplicatedLog().size());
+        assertEquals("Last index", -1, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", -1, context.getLastApplied());
+        assertEquals("Commit index", -1, context.getCommitIndex());
+        assertEquals("Snapshot term", -1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", -1, context.getReplicatedLog().getSnapshotIndex());
+
+        sendMessageToSupport(new UpdateElectionTerm(5, "member2"));
+
+        assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted For", null, context.getTermInformation().getVotedFor());
+
+        sendMessageToSupport(RecoveryCompleted.getInstance(), true);
+
+        verifyNoMoreInteractions(mockCohort);
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
new file mode 100644 (file)
index 0000000..86b90d8
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotMetadata;
+import java.util.Arrays;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for RaftActorSnapshotMessageSupport.
+ *
+ * @author Thomas Pantelis
+ */
+public class RaftActorSnapshotMessageSupportTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
+
+    @Mock
+    private DataPersistenceProvider mockPersistence;
+
+    @Mock
+    private RaftActorBehavior mockBehavior;
+
+    @Mock
+    private RaftActorSnapshotCohort mockCohort;
+
+    @Mock
+    private SnapshotManager mockSnapshotManager;
+
+    @Mock
+    ActorRef mockRaftActorRef;
+
+    private RaftActorSnapshotMessageSupport support;
+
+    private RaftActorContext context;
+    private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+
+        context = new RaftActorContextImpl(mockRaftActorRef, null, "test",
+                new ElectionTermImpl(mockPersistence, "test", LOG),
+                -1, -1, Collections.<String,String>emptyMap(), configParams, LOG) {
+            @Override
+            public SnapshotManager getSnapshotManager() {
+                return mockSnapshotManager;
+            }
+        };
+
+        support = new RaftActorSnapshotMessageSupport(mockPersistence, context, mockBehavior, mockCohort);
+
+        doReturn(true).when(mockPersistence).isRecoveryApplicable();
+
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior));
+    }
+
+    private void sendMessageToSupport(Object message) {
+        sendMessageToSupport(message, true);
+    }
+
+    private void sendMessageToSupport(Object message, boolean expHandled) {
+        boolean handled = support.handleSnapshotMessage(message);
+        assertEquals("complete", expHandled, handled);
+    }
+
+    @Test
+    public void testOnApplySnapshot() {
+
+        ReplicatedLog replicatedLog = context.getReplicatedLog();
+        replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
+
+        byte[] snapshotBytes = {1,2,3,4,5};
+
+        ReplicatedLogEntry unAppliedEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
+
+        long lastAppliedDuringSnapshotCapture = 1;
+        long lastIndexDuringSnapshotCapture = 2;
+
+        Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry),
+                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
+
+        sendMessageToSupport(new ApplySnapshot(snapshot));
+
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
+        assertEquals("Commit index", -1, context.getCommitIndex());
+        assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
+
+        verify(mockCohort).applySnapshot(snapshotBytes);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void testOnCaptureSnapshot() throws Exception {
+
+        sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1));
+
+        ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
+        verify(mockSnapshotManager).create(procedure.capture());
+
+        procedure.getValue().apply(null);
+
+        verify(mockCohort).createSnapshot(same(mockRaftActorRef));
+    }
+
+    @Test
+    public void testOnCaptureSnapshotReply() {
+
+        byte[] snapshot = {1,2,3,4,5};
+        sendMessageToSupport(new CaptureSnapshotReply(snapshot));
+
+        verify(mockSnapshotManager).persist(same(mockPersistence), same(snapshot), same(mockBehavior), anyLong());
+    }
+
+    @Test
+    public void testOnSaveSnapshotSuccess() {
+
+        long sequenceNumber = 100;
+        sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L)));
+
+        verify(mockSnapshotManager).commit(mockPersistence, sequenceNumber);
+    }
+
+    @Test
+    public void testOnSaveSnapshotFailure() {
+
+        sendMessageToSupport(new SaveSnapshotFailure(new SnapshotMetadata("foo", 100, 1234L),
+                new Throwable("mock")));
+
+        verify(mockSnapshotManager).rollback();
+    }
+
+    @Test
+    public void testOnCommitSnapshot() {
+
+        sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
+
+        verify(mockSnapshotManager).commit(mockPersistence, -1);
+    }
+
+    @Test
+    public void testUnhandledMessage() {
+
+        sendMessageToSupport("unhandled", false);
+    }
+}
index 14bfd1d348b69dc76332fc35ec8f8f94dd80e8db..5062f8f6e0c6d5875e949dcc38afeda5297f5827 100644 (file)
@@ -2,43 +2,32 @@ package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
-import akka.japi.Creator;
 import akka.japi.Procedure;
-import akka.pattern.Patterns;
-import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
-import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -46,17 +35,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.annotation.Nonnull;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
-import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
@@ -64,22 +48,18 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntrie
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
-import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
-import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 public class RaftActorTest extends AbstractActorTest {
@@ -98,275 +78,6 @@ public class RaftActorTest extends AbstractActorTest {
         InMemorySnapshotStore.clear();
     }
 
-    public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
-
-        private final RaftActor actorDelegate;
-        private final RaftActorRecoveryCohort recoveryCohortDelegate;
-        private final RaftActorSnapshotCohort snapshotCohortDelegate;
-        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
-        private final List<Object> state;
-        private ActorRef roleChangeNotifier;
-        private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
-
-        public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
-            private static final long serialVersionUID = 1L;
-            private final Map<String, String> peerAddresses;
-            private final String id;
-            private final Optional<ConfigParams> config;
-            private final DataPersistenceProvider dataPersistenceProvider;
-            private final ActorRef roleChangeNotifier;
-
-            private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
-                Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
-                ActorRef roleChangeNotifier) {
-                this.peerAddresses = peerAddresses;
-                this.id = id;
-                this.config = config;
-                this.dataPersistenceProvider = dataPersistenceProvider;
-                this.roleChangeNotifier = roleChangeNotifier;
-            }
-
-            @Override
-            public MockRaftActor create() throws Exception {
-                MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
-                    dataPersistenceProvider);
-                mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
-                return mockRaftActor;
-            }
-        }
-
-        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
-                             DataPersistenceProvider dataPersistenceProvider) {
-            super(id, peerAddresses, config);
-            state = new ArrayList<>();
-            this.actorDelegate = mock(RaftActor.class);
-            this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
-            this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
-            if(dataPersistenceProvider == null){
-                setPersistence(true);
-            } else {
-                setPersistence(dataPersistenceProvider);
-            }
-        }
-
-        public void waitForRecoveryComplete() {
-            try {
-                assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-
-        public void waitForInitializeBehaviorComplete() {
-            try {
-                assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-
-
-        public void waitUntilLeader(){
-            for(int i = 0;i < 10; i++){
-                if(isLeader()){
-                    break;
-                }
-                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-            }
-        }
-
-        public List<Object> getState() {
-            return state;
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-                Optional<ConfigParams> config){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-                                  Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-            Optional<ConfigParams> config, ActorRef roleChangeNotifier){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
-        }
-
-        public static Props props(final String id, final Map<String, String> peerAddresses,
-                                  Optional<ConfigParams> config, ActorRef roleChangeNotifier,
-                                  DataPersistenceProvider dataPersistenceProvider){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
-        }
-
-
-        @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
-            actorDelegate.applyState(clientActor, identifier, data);
-            LOG.info("{}: applyState called", persistenceId());
-        }
-
-        @Override
-        @Nonnull
-        protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
-            return this;
-        }
-
-        @Override
-        protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
-            return this;
-        }
-
-        @Override
-        public void startLogRecoveryBatch(int maxBatchSize) {
-        }
-
-        @Override
-        public void appendRecoveredLogEntry(Payload data) {
-            state.add(data);
-        }
-
-        @Override
-        public void applyCurrentLogRecoveryBatch() {
-        }
-
-        @Override
-        protected void onRecoveryComplete() {
-            actorDelegate.onRecoveryComplete();
-            recoveryComplete.countDown();
-        }
-
-        @Override
-        protected void initializeBehavior() {
-            super.initializeBehavior();
-            initializeBehaviorComplete.countDown();
-        }
-
-        @Override
-        public void applyRecoverySnapshot(byte[] bytes) {
-            recoveryCohortDelegate.applyRecoverySnapshot(bytes);
-            try {
-                Object data = toObject(bytes);
-                if (data instanceof List) {
-                    state.addAll((List<?>) data);
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        public void createSnapshot(ActorRef actorRef) {
-            LOG.info("{}: createSnapshot called", persistenceId());
-            snapshotCohortDelegate.createSnapshot(actorRef);
-        }
-
-        @Override
-        public void applySnapshot(byte [] snapshot) {
-            LOG.info("{}: applySnapshot called", persistenceId());
-            snapshotCohortDelegate.applySnapshot(snapshot);
-        }
-
-        @Override
-        protected void onStateChanged() {
-            actorDelegate.onStateChanged();
-        }
-
-        @Override
-        protected Optional<ActorRef> getRoleChangeNotifier() {
-            return Optional.fromNullable(roleChangeNotifier);
-        }
-
-        @Override public String persistenceId() {
-            return this.getId();
-        }
-
-        private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
-            Object obj = null;
-            ByteArrayInputStream bis = null;
-            ObjectInputStream ois = null;
-            try {
-                bis = new ByteArrayInputStream(bs);
-                ois = new ObjectInputStream(bis);
-                obj = ois.readObject();
-            } finally {
-                if (bis != null) {
-                    bis.close();
-                }
-                if (ois != null) {
-                    ois.close();
-                }
-            }
-            return obj;
-        }
-
-        public ReplicatedLog getReplicatedLog(){
-            return this.getRaftActorContext().getReplicatedLog();
-        }
-    }
-
-
-    public static class RaftActorTestKit extends JavaTestKit {
-        private final ActorRef raftActor;
-
-        public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
-            super(actorSystem);
-
-            raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
-                    Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
-
-        }
-
-
-        public ActorRef getRaftActor() {
-            return raftActor;
-        }
-
-        public boolean waitForLogMessage(final Class<?> logEventClass, String message){
-            // Wait for a specific log message to show up
-            return
-                new JavaTestKit.EventFilter<Boolean>(logEventClass
-                ) {
-                    @Override
-                    protected Boolean run() {
-                        return true;
-                    }
-                }.from(raftActor.path().toString())
-                    .message(message)
-                    .occurrences(1).exec();
-
-
-        }
-
-        protected void waitUntilLeader(){
-            waitUntilLeader(raftActor);
-        }
-
-        public static void waitUntilLeader(ActorRef actorRef) {
-            FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
-            for(int i = 0; i < 20 * 5; i++) {
-                Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
-                try {
-                    FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
-                    if(resp.getLeaderActor() != null) {
-                        return;
-                    }
-                } catch(TimeoutException e) {
-                } catch(Exception e) {
-                    System.err.println("FindLeader threw ex");
-                    e.printStackTrace();
-                }
-
-
-                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-            }
-
-            Assert.fail("Leader not found for actorRef " + actorRef.path());
-        }
-
-    }
-
-
     @Test
     public void testConstruction() {
         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
@@ -379,18 +90,20 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRaftActorRecovery() throws Exception {
+    public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
         new JavaTestKit(getSystem()) {{
             String persistenceId = factory.generateActorId("follower-");
 
             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
             // Set the heartbeat interval high to essentially disable election otherwise the test
             // may fail if the actor is switched to Leader and the commitIndex is set to the last
             // log entry.
             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
-                    Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
+                    ImmutableMap.<String, String>builder().put("member1", "address").build(),
+                    Optional.<ConfigParams>of(config)), persistenceId);
 
             watch(followerActor);
 
@@ -446,276 +159,147 @@ public class RaftActorTest extends AbstractActorTest {
                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
                             Optional.<ConfigParams>of(config)));
 
-            ref.underlyingActor().waitForRecoveryComplete();
+            MockRaftActor mockRaftActor = ref.underlyingActor();
+
+            mockRaftActor.waitForRecoveryComplete();
 
-            RaftActorContext context = ref.underlyingActor().getRaftActorContext();
+            RaftActorContext context = mockRaftActor.getRaftActorContext();
             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
                     context.getReplicatedLog().size());
             assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
-            assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
+            assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
+
+            mockRaftActor.waitForInitializeBehaviorComplete();
+
+            assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
         }};
     }
 
     @Test
-    public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
+    public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
         new JavaTestKit(getSystem()) {{
-            String persistenceId = factory.generateActorId("leader-");
+            String persistenceId = factory.generateActorId("follower-");
 
             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-            // Setup the persisted journal with some entries
-            ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
-                    new MockRaftActorContext.MockPayload("zero"));
-            ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
-                    new MockRaftActorContext.MockPayload("oen"));
-            ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
-                    new MockRaftActorContext.MockPayload("two"));
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-            long seqNr = 1;
-            InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
-            InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
-            InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
-            InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
+            TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
+                    ImmutableMap.<String, String>builder().put("member1", "address").build(),
+                    Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
 
-            int lastAppliedToState = 1;
-            int lastIndex = 2;
+            MockRaftActor mockRaftActor = ref.underlyingActor();
 
-            //reinstate the actor
-            TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
-                            Optional.<ConfigParams>of(config)));
+            mockRaftActor.waitForRecoveryComplete();
 
-            leaderActor.underlyingActor().waitForRecoveryComplete();
+            mockRaftActor.waitForInitializeBehaviorComplete();
 
-            RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
-            assertEquals("Journal log size", 3, context.getReplicatedLog().size());
-            assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
-            assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
-            assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+            assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
         }};
     }
 
-    /**
-     * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
-     * process recovery messages
-     *
-     * @throws Exception
-     */
-
     @Test
-    public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
+    public void testRaftActorForwardsToRaftActorRecoverySupport() {
+        String persistenceId = factory.generateActorId("leader-");
 
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
 
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
+        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+                Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
 
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+        MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
-                // Wait for akka's recovery to complete so it doesn't interfere.
-                mockRaftActor.waitForRecoveryComplete();
+        // Wait for akka's recovery to complete so it doesn't interfere.
+        mockRaftActor.waitForRecoveryComplete();
 
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
+        RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
+        mockRaftActor.setRaftActorRecoverySupport(mockSupport );
 
-                Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
-                        Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
+        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
+        mockRaftActor.handleRecover(snapshotOffer);
 
-                mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
+        MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, new MockRaftActorContext.MockPayload("1", 5));
+        mockRaftActor.handleRecover(logEntry);
 
-                verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+        ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
+        mockRaftActor.handleRecover(applyJournalEntries);
 
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
+        ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
+        mockRaftActor.handleRecover(applyLogEntries);
 
-                ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
+        DeleteEntries deleteEntries = new DeleteEntries(1);
+        mockRaftActor.handleRecover(deleteEntries);
 
-                assertEquals("add replicated log entry", 1, replicatedLog.size());
+        org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
+                new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
+        mockRaftActor.handleRecover(deprecatedDeleteEntries);
 
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
+        UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
+        mockRaftActor.handleRecover(updateElectionTerm);
 
-                assertEquals("add replicated log entry", 2, replicatedLog.size());
-
-                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
-
-                assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
-
-                // The snapshot had 4 items + we added 2 more items during the test
-                // We start removing from 5 and we should get 1 item in the replicated log
-                mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
-
-                assertEquals("remove log entries", 1, replicatedLog.size());
-
-                mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
-
-                assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
-                assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
-
-                mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
-
-            }};
+        verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
+        verify(mockSupport).handleRecoveryMessage(same(logEntry));
+        verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
+        verify(mockSupport).handleRecoveryMessage(same(applyLogEntries));
+        verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
+        verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries));
+        verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
     }
 
-    /**
-     * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
-     * not process recovery messages
-     *
-     * @throws Exception
-     */
     @Test
-    public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                // Wait for akka's recovery to complete so it doesn't interfere.
-                mockRaftActor.waitForRecoveryComplete();
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
-
-                Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
-                        Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
-
-                mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
-
-                verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
-
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
-
-                ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
-
-                assertEquals("add replicated log entry", 0, replicatedLog.size());
-
-                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
-
-                assertEquals("add replicated log entry", 0, replicatedLog.size());
-
-                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
+    public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
+        String persistenceId = factory.generateActorId("leader-");
 
-                assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
 
-                mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
+        config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-                assertEquals("remove log entries", 0, replicatedLog.size());
+        RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
 
-                mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
+        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+                Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), mockSupport), persistenceId);
 
-                assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
-                assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
+        MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
-                mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
-            }};
-    }
+        // Wait for akka's recovery to complete so it doesn't interfere.
+        mockRaftActor.waitForRecoveryComplete();
 
+        ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
+        mockRaftActor.handleCommand(applySnapshot);
 
-    @Test
-    public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
+        CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1);
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
+        mockRaftActor.handleCommand(captureSnapshot);
 
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
+        mockRaftActor.handleCommand(captureSnapshotReply);
 
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
+        mockRaftActor.handleCommand(saveSnapshotSuccess);
 
-                CountDownLatch persistLatch = new CountDownLatch(1);
-                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
-                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+        SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
+        mockRaftActor.handleCommand(saveSnapshotFailure);
 
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
+        mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
 
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
-
-                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
-            }
-        };
-    }
-
-    @Test
-    public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
-
-                mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
-
-                verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
-            }
-        };
-    }
-
-    @Test
-    public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                mockRaftActor.waitUntilLeader();
-
-                mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
-
-                mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
-
-                verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
-            }
-        };
+        verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
+        verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
+        verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
+        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
+        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
+        verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
     }
 
     @Test
@@ -748,112 +332,6 @@ public class RaftActorTest extends AbstractActorTest {
         };
     }
 
-    @Test
-    public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                        MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
-                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
-
-                RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
-
-                raftActorContext.getSnapshotManager().capture(
-                        new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
-                                new MockRaftActorContext.MockPayload("D")), -1);
-
-                mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
-
-                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-
-                verify(dataPersistenceProvider).saveSnapshot(anyObject());
-
-            }
-        };
-    }
-
-    @Test
-    public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-                MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
-
-                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
-                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
-                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
-                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
-                mockRaftActor.getReplicatedLog().append(lastEntry);
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
-
-                RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
-                mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
-
-                long replicatedToAllIndex = 1;
-
-                mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
-
-                verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
-
-                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-
-                mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
-
-                verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
-
-                verify(dataPersistenceProvider).deleteMessages(100);
-
-                assertEquals(3, mockRaftActor.getReplicatedLog().size());
-                assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
-
-                assertNotNull(mockRaftActor.getReplicatedLog().get(2));
-                assertNotNull(mockRaftActor.getReplicatedLog().get(3));
-                assertNotNull(mockRaftActor.getReplicatedLog().get(4));
-
-                // Index 2 will not be in the log because it was removed due to snapshotting
-                assertNull(mockRaftActor.getReplicatedLog().get(1));
-                assertNull(mockRaftActor.getReplicatedLog().get(0));
-
-            }
-        };
-    }
-
     @Test
     public void testApplyState() throws Exception {
 
@@ -885,106 +363,6 @@ public class RaftActorTest extends AbstractActorTest {
         };
     }
 
-    @Test
-    public void testApplySnapshot() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
-
-                oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
-                oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
-                oldReplicatedLog.append(
-                        new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
-                                mock(Payload.class)));
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
-
-                Snapshot snapshot = mock(Snapshot.class);
-
-                doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
-
-                doReturn(3L).when(snapshot).getLastAppliedIndex();
-
-                mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
-
-                verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
-
-                assertTrue("The replicatedLog should have changed",
-                        oldReplicatedLog != mockRaftActor.getReplicatedLog());
-
-                assertEquals("lastApplied should be same as in the snapshot",
-                        (Long) 3L, mockRaftActor.getLastApplied());
-
-                assertEquals(0, mockRaftActor.getReplicatedLog().size());
-
-            }
-        };
-    }
-
-    @Test
-    public void testSaveSnapshotFailure() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                String persistenceId = factory.generateActorId("leader-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
-
-                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
-
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
-
-                RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
-
-                mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
-
-                raftActorContext.getSnapshotManager().capture(
-                        new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
-                                new MockRaftActorContext.MockPayload("D")), 1);
-
-                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-
-                mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
-                        new Exception()));
-
-                assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
-                        mockRaftActor.getReplicatedLog().getSnapshotIndex());
-
-            }
-        };
-    }
-
     @Test
     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
         new JavaTestKit(getSystem()) {{
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTestKit.java
new file mode 100644 (file)
index 0000000..3e747e3
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.Assert;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+public class RaftActorTestKit extends JavaTestKit {
+    private final ActorRef raftActor;
+
+    public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
+        super(actorSystem);
+
+        raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
+                Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
+
+    }
+
+
+    public ActorRef getRaftActor() {
+        return raftActor;
+    }
+
+    public boolean waitForLogMessage(final Class<?> logEventClass, String message){
+        // Wait for a specific log message to show up
+        return
+            new JavaTestKit.EventFilter<Boolean>(logEventClass
+            ) {
+                @Override
+                protected Boolean run() {
+                    return true;
+                }
+            }.from(raftActor.path().toString())
+                .message(message)
+                .occurrences(1).exec();
+
+
+    }
+
+    protected void waitUntilLeader(){
+        waitUntilLeader(raftActor);
+    }
+
+    public static void waitUntilLeader(ActorRef actorRef) {
+        FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
+        for(int i = 0; i < 20 * 5; i++) {
+            Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
+            try {
+                FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
+                if(resp.getLeaderActor() != null) {
+                    return;
+                }
+            } catch(TimeoutException e) {
+            } catch(Exception e) {
+                System.err.println("FindLeader threw ex");
+                e.printStackTrace();
+            }
+
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.fail("Leader not found for actorRef " + actorRef.path());
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java
new file mode 100644 (file)
index 0000000..72ccae7
--- /dev/null
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.japi.Procedure;
+import com.google.common.base.Supplier;
+import java.util.Collections;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.internal.matchers.Same;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for ReplicatedLogImpl.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReplicatedLogImplTest {
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
+
+    @Mock
+    private DataPersistenceProvider mockPersistence;
+
+    @Mock
+    private RaftActorBehavior mockBehavior;
+
+    @Mock
+    private SnapshotManager mockSnapshotManager;
+
+    private RaftActorContext context;
+    private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+
+        context = new RaftActorContextImpl(null, null, "test",
+                new ElectionTermImpl(mockPersistence, "test", LOG),
+                -1, -1, Collections.<String,String>emptyMap(), configParams, LOG)  {
+            @Override
+            public SnapshotManager getSnapshotManager() {
+                return mockSnapshotManager;
+            }
+        };
+    }
+
+    private void verifyPersist(Object message) throws Exception {
+        verifyPersist(message, new Same(message));
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void verifyPersist(Object message, Matcher<?> matcher) throws Exception {
+        ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
+        verify(mockPersistence).persist(Matchers.argThat(matcher), procedure.capture());
+
+        procedure.getValue().apply(message);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAppendAndPersistExpectingNoCapture() throws Exception {
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+        MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
+
+        log.appendAndPersist(logEntry);
+
+        verifyPersist(logEntry);
+
+        assertEquals("size", 1, log.size());
+
+        reset(mockPersistence);
+
+        Procedure<ReplicatedLogEntry> mockCallback = Mockito.mock(Procedure.class);
+        log.appendAndPersist(logEntry, mockCallback);
+
+        verifyPersist(logEntry);
+
+        verify(mockCallback).apply(same(logEntry));
+        verifyNoMoreInteractions(mockSnapshotManager);
+
+        assertEquals("size", 2, log.size());
+    }
+
+    @Test
+    public void testAppendAndPersistExpectingCaptureDueToJournalCount() throws Exception {
+        configParams.setSnapshotBatchCount(2);
+
+        doReturn(1L).when(mockBehavior).getReplicatedToAllIndex();
+
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+        MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
+        MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
+
+        log.appendAndPersist(logEntry1);
+        verifyPersist(logEntry1);
+
+        verifyNoMoreInteractions(mockSnapshotManager);
+        reset(mockPersistence);
+
+        log.appendAndPersist(logEntry2);
+        verifyPersist(logEntry2);
+
+        verify(mockSnapshotManager).capture(same(logEntry2), eq(1L));
+
+        assertEquals("size", 2, log.size());
+    }
+
+    @Test
+    public void testAppendAndPersistExpectingCaptureDueToDataSize() throws Exception {
+        doReturn(1L).when(mockBehavior).getReplicatedToAllIndex();
+
+        context.setTotalMemoryRetriever(new Supplier<Long>() {
+            @Override
+            public Long get() {
+                return 100L;
+            }
+        });
+
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+        int dataSize = 600;
+        MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
+
+        doReturn(true).when(mockSnapshotManager).capture(same(logEntry), eq(1L));
+
+        log.appendAndPersist(logEntry);
+        verifyPersist(logEntry);
+
+        verify(mockSnapshotManager).capture(same(logEntry), eq(1L));
+
+        reset(mockPersistence, mockSnapshotManager);
+
+        logEntry = new MockReplicatedLogEntry(1, 3, new MockPayload("3", 5));
+
+        log.appendAndPersist(logEntry);
+        verifyPersist(logEntry);
+
+        verifyNoMoreInteractions(mockSnapshotManager);
+
+        assertEquals("size", 2, log.size());
+    }
+
+    @Test
+    public void testRemoveFromAndPersist() throws Exception {
+
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+        log.append(new MockReplicatedLogEntry(1, 0, new MockPayload("0")));
+        log.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
+        log.append(new MockReplicatedLogEntry(1, 2, new MockPayload("2")));
+
+        log.removeFromAndPersist(1);
+
+        DeleteEntries deleteEntries = new DeleteEntries(1);
+        verifyPersist(deleteEntries, match(deleteEntries));
+
+        assertEquals("size", 1, log.size());
+
+        reset(mockPersistence);
+
+        log.removeFromAndPersist(1);
+
+        verifyNoMoreInteractions(mockPersistence);
+    }
+
+    public Matcher<DeleteEntries> match(final DeleteEntries actual){
+        return new BaseMatcher<DeleteEntries>() {
+            @Override
+            public boolean matches(Object o) {
+                DeleteEntries other = (DeleteEntries) o;
+                return actual.getFromIndex() == other.getFromIndex();
+            }
+
+            @Override
+            public void describeTo(Description description) {
+                description.appendText("DeleteEntries: fromIndex: " + actual.getFromIndex());
+            }
+        };
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntriesTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/base/messages/DeleteEntriesTest.java
new file mode 100644 (file)
index 0000000..55d2bcc
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for DeleteEntries.
+ *
+ * @author Thomas Pantelis
+ */
+public class DeleteEntriesTest {
+
+    @Test
+    public void testSerialization() {
+
+        DeleteEntries deleteEntries = new DeleteEntries(11);
+
+        DeleteEntries clone = (DeleteEntries) SerializationUtils.clone(deleteEntries);
+
+        Assert.assertEquals("getFromIndex", 11, clone.getFromIndex());
+    }
+}
index db4bf3143898f2e73ecb03d7f29b28fbb1165b82..730310e22e2f37d5153c2c918ea5b458b3531fe2 100644 (file)
@@ -52,4 +52,8 @@ public interface DataPersistenceProvider {
      */
     void deleteMessages(long sequenceNumber);
 
+    /**
+     * Returns the last sequence number contained in the journal.
+     */
+    long getLastSequenceNumber();
 }
index c74236bb479cd8372e52526438ce5906f23ee3b9..e27fa26aebb3953f6a0ddd35c056a812a00e833c 100644 (file)
@@ -54,4 +54,9 @@ public class DelegatingPersistentDataProvider implements DataPersistenceProvider
     public void deleteMessages(long sequenceNumber) {
         delegate.deleteMessages(sequenceNumber);
     }
+
+    @Override
+    public long getLastSequenceNumber() {
+        return delegate.getLastSequenceNumber();
+    }
 }
index fed81177a1436bbb047574043f587cd3b5b0a013..d1af58f18b426c19d7c70ef0d0a44734948e4b72 100644 (file)
@@ -43,4 +43,9 @@ public class NonPersistentDataProvider implements DataPersistenceProvider {
     @Override
     public void deleteMessages(long sequenceNumber) {
     }
+
+    @Override
+    public long getLastSequenceNumber() {
+        return -1;
+    }
 }
\ No newline at end of file
index f130a1f27e0381f86ea5bb02d997b0a291bd6e17..4ccd5f4d29cc39a695772f8e36a800d078fcde79 100644 (file)
@@ -47,4 +47,9 @@ public class PersistentDataProvider implements DataPersistenceProvider {
     public void deleteMessages(long sequenceNumber) {
         persistentActor.deleteMessages(sequenceNumber);
     }
+
+    @Override
+    public long getLastSequenceNumber() {
+        return persistentActor.lastSequenceNr();
+    }
 }
\ No newline at end of file
index 33d405639538cac69c3554851c1b99e74fd5aeaa..bad9fc31f42c09bc54b3ec632ed25e06061c0f37 100644 (file)
@@ -10,9 +10,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
-
 import java.util.concurrent.CountDownLatch;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 
 /**
  * This class is intended for testing purposes. It just triggers CountDownLatch's in each method.
@@ -65,4 +64,9 @@ public class DataPersistenceProviderMonitor implements DataPersistenceProvider {
     public void setDeleteMessagesLatch(CountDownLatch deleteMessagesLatch) {
         this.deleteMessagesLatch = deleteMessagesLatch;
     }
+
+    @Override
+    public long getLastSequenceNumber() {
+        return -1;
+    }
 }