BUG-3381: Capture Snapshot on recovery if journal is not empty 21/27721/7
authorevvy <dhiraviam.natarajan@gmail.com>
Mon, 26 Oct 2015 15:06:20 +0000 (20:36 +0530)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 30 Oct 2015 00:47:55 +0000 (20:47 -0400)
Change-Id: Ib1068cb6d4848d151039887b51458399ff421178
Signed-off-by: evvy <dhiraviam.natarajan@gmail.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 0154c89..a043c69 100644 (file)
@@ -173,6 +173,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             initializeBehavior();
 
             raftRecovery = null;
+
+            if (context.getReplicatedLog().size() > 0) {
+                self().tell(new InitiateCaptureSnapshot(), self());
+                LOG.info("Snapshot capture initiated after recovery");
+            } else {
+                LOG.info("Snapshot capture NOT initiated after recovery, journal empty");
+            }
         }
     }
 
index 51d6478..4b43095 100644 (file)
@@ -21,6 +21,7 @@ import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.timeout;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -105,6 +106,7 @@ public class RaftActorTest extends AbstractActorTest {
         kit.waitUntilLeader();
     }
 
+
     @Test
     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
@@ -943,6 +945,40 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testRaftActorOnRecoverySnapshot() throws Exception {
+        TEST_LOG.info("testRaftActorOnRecoverySnapshot");
+
+        new JavaTestKit(getSystem()) {{
+                String persistenceId = factory.generateActorId("follower-");
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                // Set the heartbeat interval high to essentially disable election otherwise the test
+                // may fail if the actor is switched to Leader
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
+
+                // Create mock ReplicatedLogEntry
+                ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1,
+                        new MockRaftActorContext.MockPayload("F", 1));
+
+                InMemoryJournal.addEntry(persistenceId, 1, replLogEntry);
+
+                TestActorRef<MockRaftActor> ref = factory.createTestActor(
+                        MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
+
+                MockRaftActor mockRaftActor = ref.underlyingActor();
+
+                mockRaftActor.waitForRecoveryComplete();
+
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
+                verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
+            }};
+    }
+
     @Test
     public void testSwitchBehavior(){
         String persistenceId = factory.generateActorId("leader-");
index f235d88..949889a 100644 (file)
@@ -186,9 +186,9 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         // Verify the persisted snapshot in the leader. This should reflect the advanced snapshot index as
         // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
-        assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
-        List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
+        assertEquals("Persisted snapshots size", 2, persistedSnapshots.size());
+        verifySnapshot("Persisted", persistedSnapshots.get(1), initialTerm, 2, currentTerm, 3);
+        List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(1).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
 
index 144f0f5..3a7b458 100644 (file)
@@ -2329,7 +2329,6 @@ public class ShardTest extends AbstractShardTest {
                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
 
             waitUntilLeader(shard);
-
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
@@ -2337,35 +2336,35 @@ public class ShardTest extends AbstractShardTest {
             // Trigger creation of a snapshot by ensuring
             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
-
-            assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
-
-            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
-                    savedSnapshot.get() instanceof Snapshot);
-
-            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
-
-            latch.set(new CountDownLatch(1));
-            savedSnapshot.set(null);
+            awaitAndValidateSnapshot(expectedRoot);
 
             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
+            awaitAndValidateSnapshot(expectedRoot);
 
-            assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }
 
-            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
-                    savedSnapshot.get() instanceof Snapshot);
+            private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
+                                              ) throws InterruptedException {
+                System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
+                assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
-            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+                assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+                        savedSnapshot.get() instanceof Snapshot);
 
-            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-        }
+                verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+
+                latch.set(new CountDownLatch(1));
+                savedSnapshot.set(null);
+            }
 
-        private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
+            private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
 
-            final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
-            assertEquals("Root node", expectedRoot, actual);
+                final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+                assertEquals("Root node", expectedRoot, actual);
 
-        }};
+           }
+        };
     }
 
     /**

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.