BUG 2134 : Make persistence configurable at the datastore level
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
index c15c9198bd17c2a3da0fa54b017379a5e8971b13..69af77c7496b26ed02033d97fd8572025814cc9f 100644 (file)
@@ -7,19 +7,29 @@ import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.event.Logging;
 import akka.japi.Creator;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotOffer;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import org.junit.After;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
 import scala.concurrent.duration.FiniteDuration;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -32,7 +42,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.mock;
 
 public class RaftActorTest extends AbstractActorTest {
 
@@ -45,30 +58,42 @@ public class RaftActorTest extends AbstractActorTest {
 
     public static class MockRaftActor extends RaftActor {
 
+        private final DataPersistenceProvider dataPersistenceProvider;
+
         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
             private final Map<String, String> peerAddresses;
             private final String id;
             private final Optional<ConfigParams> config;
+            private final DataPersistenceProvider dataPersistenceProvider;
 
             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
-                    Optional<ConfigParams> config) {
+                    Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
                 this.peerAddresses = peerAddresses;
                 this.id = id;
                 this.config = config;
+                this.dataPersistenceProvider = dataPersistenceProvider;
             }
 
             @Override
             public MockRaftActor create() throws Exception {
-                return new MockRaftActor(id, peerAddresses, config);
+                return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
             }
         }
 
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final CountDownLatch applyRecoverySnapshot = new CountDownLatch(1);
+        private final CountDownLatch applyStateLatch = new CountDownLatch(1);
+
         private final List<Object> state;
 
-        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config) {
+        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
+            if(dataPersistenceProvider == null){
+                this.dataPersistenceProvider = new PersistentDataProvider();
+            } else {
+                this.dataPersistenceProvider = dataPersistenceProvider;
+            }
         }
 
         public void waitForRecoveryComplete() {
@@ -79,16 +104,27 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
+        public CountDownLatch getApplyRecoverySnapshotLatch(){
+            return applyRecoverySnapshot;
+        }
+
         public List<Object> getState() {
             return state;
         }
 
         public static Props props(final String id, final Map<String, String> peerAddresses,
                 Optional<ConfigParams> config){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config));
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
         }
 
+        public static Props props(final String id, final Map<String, String> peerAddresses,
+                                  Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
+        }
+
+
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+            applyStateLatch.countDown();
         }
 
         @Override
@@ -111,6 +147,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         @Override
         protected void applyRecoverySnapshot(ByteString snapshot) {
+            applyRecoverySnapshot.countDown();
             try {
                 Object data = toObject(snapshot);
                 System.out.println("!!!!!applyRecoverySnapshot: "+data);
@@ -123,7 +160,6 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override protected void createSnapshot() {
-            throw new UnsupportedOperationException("createSnapshot");
         }
 
         @Override protected void applySnapshot(ByteString snapshot) {
@@ -132,6 +168,11 @@ public class RaftActorTest extends AbstractActorTest {
         @Override protected void onStateChanged() {
         }
 
+        @Override
+        protected DataPersistenceProvider persistence() {
+            return this.dataPersistenceProvider;
+        }
+
         @Override public String persistenceId() {
             return this.getId();
         }
@@ -155,6 +196,9 @@ public class RaftActorTest extends AbstractActorTest {
             return obj;
         }
 
+        public ReplicatedLog getReplicatedLog(){
+            return this.getRaftActorContext().getReplicatedLog();
+        }
 
     }
 
@@ -294,6 +338,343 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    /**
+     * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
+     * process recovery messages
+     *
+     * @throws Exception
+     */
+
+    @Test
+    public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+                Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+                        Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
+
+                CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
+
+                assertEquals("apply recovery snapshot", true, applyRecoverySnapshotLatch.await(5, TimeUnit.SECONDS));
+
+                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
+
+                ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
+
+                assertEquals("add replicated log entry", 1, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
+
+                assertEquals("add replicated log entry", 2, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+
+                assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
+
+                // The snapshot had 4 items + we added 2 more items during the test
+                // We start removing from 5 and we should get 1 item in the replicated log
+                mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
+
+                assertEquals("remove log entries", 1, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
+
+                assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+                assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
+
+                mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
+
+                mockRaftActor.waitForRecoveryComplete();
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }};
+    }
+
+    /**
+     * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
+     * not process recovery messages
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+                Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+                        Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
+
+                mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
+
+                CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
+
+                assertEquals("apply recovery snapshot", false, applyRecoverySnapshotLatch.await(1, TimeUnit.SECONDS));
+
+                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
+
+                ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
+
+                assertEquals("add replicated log entry", 0, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
+
+                assertEquals("add replicated log entry", 0, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+
+                assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
+
+                mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
+
+                assertEquals("remove log entries", 0, replicatedLog.size());
+
+                mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
+
+                assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+                assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
+
+                mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
+
+                mockRaftActor.waitForRecoveryComplete();
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+            }};
+    }
+
+
+    @Test
+    public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch persistLatch = new CountDownLatch(1);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
+
+                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch persistLatch = new CountDownLatch(1);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)));
+
+                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch persistLatch = new CountDownLatch(2);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
+
+                mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
+
+                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testApplyLogEntriesCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testApplyLogEntriesCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch persistLatch = new CountDownLatch(1);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
+
+                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch persistLatch = new CountDownLatch(1);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setSaveSnapshotLatch(persistLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
+
+                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+
+                assertEquals("Save Snapshot called", true, persistLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                CountDownLatch deleteMessagesLatch = new CountDownLatch(1);
+                CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1);
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+                dataPersistenceProviderMonitor.setDeleteMessagesLatch(deleteMessagesLatch);
+                dataPersistenceProviderMonitor.setDeleteSnapshotsLatch(deleteSnapshotsLatch);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
+
+                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+
+                mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
+
+                assertEquals("Delete Messages called", true, deleteMessagesLatch.await(5, TimeUnit.SECONDS));
+
+                assertEquals("Delete Snapshots called", true, deleteSnapshotsLatch.await(5, TimeUnit.SECONDS));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
     private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;