Bug:3260-Recovery misses flows installed on single node 72/21372/3
authorKamal Rameshan <kramesha@cisco.com>
Thu, 28 May 2015 20:54:48 +0000 (13:54 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 1 Jun 2015 16:17:43 +0000 (16:17 +0000)
There are 2 bugs which were enountered
1. When akka replays the journal, it replays it from the peristent journal's sequence number present at the time of snapshot success
2. The log entry on which a snapshot capture is triggered does not make it to that snapshot and gets removed from persistent journal as well.
So on recovery, that log entry/data is missing

To fix the first, the snapshotSeqNr() method of UnTypedPersistenActor is overridden, so that akka uses the cached last-sequence-number rather than using its own

To fix the second issue, the capture of snapshot for single node is done after applyState. This ensures that the persistent journal and snapshot are in sync

Also the in-memory journal was replaying all its messages and not honoring the fromSequenceNr, like akka does. So fixed it.

The tests needed to be fixed primarily due to the in-memory journal change.

A new test is added to test out the recovery of single node.

Change-Id: I779d1d6ce9880b19322d831ef5c8696b4c751e3d
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
12 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java

index f8bbf638a07f37c29c3aef64a4d51a963fe3b7f0..0b3fca090cfbfeed1ca89282a406587648848730 100644 (file)
@@ -209,6 +209,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
 
+            if (!hasFollowers()) {
+                // for single node, the capture should happen after the apply state
+                // as we delete messages from the persistent journal which have made it to the snapshot
+                // capturing the snapshot before applying makes the persistent journal and snapshot out of sync
+                // and recovery shows data missing
+                context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
+
+                context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+            }
+
         } else if (message instanceof ApplyJournalEntries){
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             if(LOG.isDebugEnabled()) {
@@ -312,6 +322,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return new LeaderStateChanged(memberId, leaderId);
     }
 
+    @Override
+    public long snapshotSequenceNr() {
+        // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
+        // so that we can delete the persistent journal based on the saved sequence-number
+        // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot
+        // was saved and not the number we saved.
+        // We would want to override it , by asking akka to use the last-sequence number known to us.
+        return context.getSnapshotManager().getLastSequenceNumber();
+    }
+
     /**
      * When a derived RaftActor needs to persist something it must call
      * persistData.
@@ -336,21 +356,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
             @Override
             public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
-                if(!hasFollowers()){
+                if (!hasFollowers()){
                     // Increment the Commit Index and the Last Applied values
                     raftContext.setCommitIndex(replicatedLogEntry.getIndex());
                     raftContext.setLastApplied(replicatedLogEntry.getIndex());
 
-                    // Apply the state immediately
+                    // Apply the state immediately.
                     self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
 
                     // Send a ApplyJournalEntries message so that we write the fact that we applied
                     // the state to durable storage
                     self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
 
-                    context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
-
                 } else if (clientActor != null) {
+                    context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
+
                     // Send message for replication
                     currentBehavior.handleMessage(getSelf(),
                             new Replicate(clientActor, identifier, replicatedLogEntry));
index 38db03e534fc5423d1b1e72cbaf76f2b399250b1..8cf01f11eb7e95b8944e117d4112600f2c00fd9f 100644 (file)
@@ -127,6 +127,8 @@ class RaftActorRecoverySupport {
         long lastUnappliedIndex = context.getLastApplied() + 1;
 
         if(log.isDebugEnabled()) {
+            // it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal
+            // but the entry itself has made it to that state and recovered via the snapshot
             log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
                     context.getId(), lastUnappliedIndex, toIndex);
         }
index 8388eaf7436f251c63aa277024bf886f803153cb..0ad1df3c33bb6433e5ddf0afca46f5f4920d2882 100644 (file)
@@ -181,4 +181,10 @@ public interface ReplicatedLog {
      */
     int dataSize();
 
+    /**
+     * We decide if snapshot need to be captured based on the count/memory consumed.
+     * @param replicatedLogEntry
+     */
+    void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry);
+
 }
index 8c32eab61df5ae32b77d5e690c3fb84e59352dd7..ab1e23bb94d8a7a9790d3319a8ba977bef1baa36 100644 (file)
@@ -61,6 +61,44 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         appendAndPersist(replicatedLogEntry, null);
     }
 
+    @Override
+    public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+        long journalSize = replicatedLogEntry.getIndex() + 1;
+        long dataThreshold = context.getTotalMemory() *
+                context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
+        if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+                || getDataSizeForSnapshotCheck() > dataThreshold)) {
+
+            boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+                    currentBehavior.getReplicatedToAllIndex());
+            if (started) {
+                if (!context.hasFollowers()) {
+                    dataSizeSinceLastSnapshot = 0;
+                }
+            }
+        }
+    }
+
+    private long getDataSizeForSnapshotCheck() {
+        long dataSizeForCheck = dataSize();
+        if (!context.hasFollowers()) {
+            // When we do not have followers we do not maintain an in-memory log
+            // due to this the journalSize will never become anything close to the
+            // snapshot batch count. In fact will mostly be 1.
+            // Similarly since the journal's dataSize depends on the entries in the
+            // journal the journal's dataSize will never reach a value close to the
+            // memory threshold.
+            // By maintaining the dataSize outside the journal we are tracking essentially
+            // what we have written to the disk however since we no longer are in
+            // need of doing a snapshot just for the sake of freeing up memory we adjust
+            // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
+            // as if we were maintaining a real snapshot
+            dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
+        }
+        return dataSizeForCheck;
+    }
+
     @Override
     public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
             final Procedure<ReplicatedLogEntry> callback)  {
@@ -84,40 +122,8 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
                     context.getLogger().debug("{}: persist complete {}", context.getId(), replicatedLogEntry);
 
                     int logEntrySize = replicatedLogEntry.size();
-
-                    long dataSizeForCheck = dataSize();
-
                     dataSizeSinceLastSnapshot += logEntrySize;
 
-                    if (!context.hasFollowers()) {
-                        // When we do not have followers we do not maintain an in-memory log
-                        // due to this the journalSize will never become anything close to the
-                        // snapshot batch count. In fact will mostly be 1.
-                        // Similarly since the journal's dataSize depends on the entries in the
-                        // journal the journal's dataSize will never reach a value close to the
-                        // memory threshold.
-                        // By maintaining the dataSize outside the journal we are tracking essentially
-                        // what we have written to the disk however since we no longer are in
-                        // need of doing a snapshot just for the sake of freeing up memory we adjust
-                        // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
-                        // as if we were maintaining a real snapshot
-                        dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
-                    }
-                    long journalSize = replicatedLogEntry.getIndex() + 1;
-                    long dataThreshold = context.getTotalMemory() *
-                            context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
-                    if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
-                            || dataSizeForCheck > dataThreshold)) {
-
-                        boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
-                                currentBehavior.getReplicatedToAllIndex());
-
-                        if(started){
-                            dataSizeSinceLastSnapshot = 0;
-                        }
-                    }
-
                     if (callback != null){
                         callback.apply(replicatedLogEntry);
                     }
index 9a916625c9331413685d6263bfe053930b6795bf..9bbe285c29e863c885c88933c7f82fd7d739a704 100644 (file)
@@ -82,6 +82,10 @@ public class SnapshotManager implements SnapshotState {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
+    public long getLastSequenceNumber() {
+        return lastSequenceNumber;
+    }
+
     @VisibleForTesting
     public CaptureSnapshot getCaptureSnapshot() {
         return captureSnapshot;
@@ -319,6 +323,7 @@ public class SnapshotManager implements SnapshotState {
 
         @Override
         public void commit(long sequenceNumber) {
+            LOG.debug("Snapshot success sequence number:", sequenceNumber);
             context.getReplicatedLog().snapshotCommit();
             context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
                     sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
@@ -365,6 +370,8 @@ public class SnapshotManager implements SnapshotState {
             this.term = -1L;
             if (!hasFollowers) {
                 if(lastLogEntry != null) {
+                    // since we have persisted the last-log-entry to persistent journal before the capture,
+                    // we would want to snapshot from this entry.
                     index = lastLogEntry.getIndex();
                     term = lastLogEntry.getTerm();
                 }
index d175289af56c0226a938ccae5cc3d37632d3d2ec..afec6a5d3ddb7c61c7effd7d67bdb9aa32cc2026 100644 (file)
@@ -306,5 +306,11 @@ public class AbstractReplicatedLogImplTest {
         @Override
         public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
         }
+
+        @Override
+        public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+        }
+
+
     }
 }
index 01ff6ce14f4eb44930f13dc5661ad7c6ab9c82fc..e19b5216103757bafada5d9659c913c9de79c3e5 100644 (file)
@@ -243,7 +243,8 @@ public class MockRaftActorContext implements RaftActorContext {
     }
 
     public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
-        @Override public void appendAndPersist(
+        @Override
+        public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
             append(replicatedLogEntry);
         }
@@ -253,6 +254,10 @@ public class MockRaftActorContext implements RaftActorContext {
             return -1;
         }
 
+        @Override
+        public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+        }
+
         @Override public void removeFromAndPersist(long index) {
             removeFrom(index);
         }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java
new file mode 100644 (file)
index 0000000..a934586
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import akka.actor.ActorRef;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Recovery Integration Test for single node
+ */
+public class RecoveryIntegrationSingleNodeTest extends AbstractRaftActorIntegrationTest {
+
+    static final Logger LOG = LoggerFactory.getLogger(RecoveryIntegrationSingleNodeTest.class);
+
+    @Before
+    public void setup() {
+        leaderConfigParams = newLeaderConfigParams();
+    }
+
+
+    @Test
+    public void testJournalReplayAfterSnapshotWithSingleNode() throws Exception {
+
+        String persistenceId = factory.generateActorId("singleNode");
+        TestActorRef<AbstractRaftActorIntegrationTest.TestRaftActor> singleNodeActorRef = newTestRaftActor(persistenceId,
+                ImmutableMap.<String, String>builder().build(), leaderConfigParams);
+
+        waitUntilLeader(singleNodeActorRef);
+
+        ActorRef singleNodeCollectorActor = singleNodeActorRef.underlyingActor().collectorActor();
+        RaftActorContext singleNodeContext = singleNodeActorRef.underlyingActor().getRaftActorContext();
+
+
+        MockRaftActorContext.MockPayload payload0 = sendPayloadData(singleNodeActorRef, "zero");
+        MockRaftActorContext.MockPayload payload1 = sendPayloadData(singleNodeActorRef, "one");
+        MockRaftActorContext.MockPayload payload2 = sendPayloadData(singleNodeActorRef, "two");
+
+        MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 3);
+
+        // this should trigger a snapshot
+        MockRaftActorContext.MockPayload payload3 = sendPayloadData(singleNodeActorRef, "three");
+
+        MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 4);
+
+        //add 2 more
+        MockRaftActorContext.MockPayload payload4 = sendPayloadData(singleNodeActorRef, "four");
+        MockRaftActorContext.MockPayload payload5 = sendPayloadData(singleNodeActorRef, "five");
+
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(singleNodeCollectorActor, SaveSnapshotSuccess.class);
+
+        MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 6);
+
+        assertEquals("Last applied", 5, singleNodeContext.getLastApplied());
+
+        assertEquals("Incorrect State after snapshot success is received ",
+                Lists.newArrayList(payload0, payload1, payload2, payload3, payload4, payload5), singleNodeActorRef.underlyingActor().getState());
+
+        // we get 2 log entries (4 and 5 indexes) and 3 ApplyJournalEntries (for 3, 4, and 5 indexes)
+        assertEquals(5, InMemoryJournal.get(persistenceId).size());
+
+        List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
+        assertEquals(1, persistedSnapshots.size());
+
+        List<Object> snapshottedState = (List<Object>)MockRaftActor.toObject(persistedSnapshots.get(0).getState());
+        assertEquals("Incorrect Snapshot", Lists.newArrayList(payload0, payload1, payload2, payload3), snapshottedState);
+
+        //recovery logic starts
+        killActor(singleNodeActorRef);
+
+        singleNodeActorRef = newTestRaftActor(persistenceId,
+                ImmutableMap.<String, String>builder().build(), leaderConfigParams);
+
+        singleNodeActorRef.underlyingActor().waitForRecoveryComplete();
+
+        assertEquals("Incorrect State after Recovery ",
+                Lists.newArrayList(payload0, payload1, payload2, payload3, payload4, payload5), singleNodeActorRef.underlyingActor().getState());
+
+    }
+}
index fbe6fc4501f6b6b62b60490036e005e8ca065207..51cff356f2cdda165231df6b14375e776a1c715f 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.reset;
@@ -50,9 +49,6 @@ public class ReplicatedLogImplTest {
     @Mock
     private RaftActorBehavior mockBehavior;
 
-    @Mock
-    private SnapshotManager mockSnapshotManager;
-
     private RaftActorContext context;
     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
 
@@ -62,12 +58,7 @@ public class ReplicatedLogImplTest {
 
         context = new RaftActorContextImpl(null, null, "test",
                 new ElectionTermImpl(mockPersistence, "test", LOG),
-                -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG)  {
-            @Override
-            public SnapshotManager getSnapshotManager() {
-                return mockSnapshotManager;
-            }
-        };
+                -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
     }
 
     private void verifyPersist(Object message) throws Exception {
@@ -103,7 +94,6 @@ public class ReplicatedLogImplTest {
         verifyPersist(logEntry);
 
         verify(mockCallback).apply(same(logEntry));
-        verifyNoMoreInteractions(mockSnapshotManager);
 
         assertEquals("size", 2, log.size());
     }
@@ -122,13 +112,11 @@ public class ReplicatedLogImplTest {
         log.appendAndPersist(logEntry1);
         verifyPersist(logEntry1);
 
-        verifyNoMoreInteractions(mockSnapshotManager);
         reset(mockPersistence);
 
         log.appendAndPersist(logEntry2);
         verifyPersist(logEntry2);
 
-        verify(mockSnapshotManager).capture(same(logEntry2), eq(1L));
 
         assertEquals("size", 2, log.size());
     }
@@ -149,22 +137,16 @@ public class ReplicatedLogImplTest {
         int dataSize = 600;
         MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
 
-        doReturn(true).when(mockSnapshotManager).capture(same(logEntry), eq(1L));
-
         log.appendAndPersist(logEntry);
         verifyPersist(logEntry);
 
-        verify(mockSnapshotManager).capture(same(logEntry), eq(1L));
-
-        reset(mockPersistence, mockSnapshotManager);
+        reset(mockPersistence);
 
         logEntry = new MockReplicatedLogEntry(1, 3, new MockPayload("3", 5));
 
         log.appendAndPersist(logEntry);
         verifyPersist(logEntry);
 
-        verifyNoMoreInteractions(mockSnapshotManager);
-
         assertEquals("size", 2, log.size());
     }
 
index 0e2449e4b8e62d7d8cd4af3d4bd33407136460e2..7142c478295ad5869873253059bffbfa8f2a7466 100644 (file)
@@ -139,8 +139,8 @@ public class InMemoryJournal extends AsyncWriteJournal {
     }
 
     @Override
-    public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
-            long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
+    public Future<Void> doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr,
+            final long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
         return Futures.future(new Callable<Void>() {
             @Override
             public Void call() throws Exception {
@@ -150,16 +150,18 @@ public class InMemoryJournal extends AsyncWriteJournal {
                 }
 
                 Map<Long, Object> journal = journals.get(persistenceId);
-                if(journal == null) {
+                if (journal == null) {
                     return null;
                 }
 
                 synchronized (journal) {
                     for (Map.Entry<Long,Object> entry : journal.entrySet()) {
-                        PersistentRepr persistentMessage =
-                                new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
-                                        false, null, null);
-                        replayCallback.apply(persistentMessage);
+                        if (entry.getKey() >= fromSequenceNr && entry.getKey() <= toSequenceNr) {
+                            PersistentRepr persistentMessage =
+                                    new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
+                                            false, null, null);
+                            replayCallback.apply(persistentMessage);
+                        }
                     }
                 }
 
index 49cfcbc9c257a31a5e7ad51585ebda185d8f9fd5..c8953be5c4562e422f9fe56f6155e806c3717540 100644 (file)
@@ -121,6 +121,31 @@ import scala.concurrent.duration.FiniteDuration;
 public class ShardTest extends AbstractShardTest {
     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
 
+    private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
+
+    final CountDownLatch recoveryComplete = new CountDownLatch(1);
+
+    protected Props newShardPropsWithRecoveryComplete() {
+
+        Creator<Shard> creator = new Creator<Shard>() {
+            @Override
+            public Shard create() throws Exception {
+                return new Shard(shardID, Collections.<String,String>emptyMap(),
+                        newDatastoreContext(), SCHEMA_CONTEXT) {
+                    @Override
+                    protected void onRecoveryComplete() {
+                        try {
+                            super.onRecoveryComplete();
+                        } finally {
+                            recoveryComplete.countDown();
+                        }
+                    }
+                };
+            }
+        };
+        return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
+    }
+
     @Test
     public void testRegisterChangeListener() throws Exception {
         new ShardTestKit(getSystem()) {{
@@ -497,7 +522,9 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testApplyStateWithCandidatePayload() throws Exception {
 
-        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
+
+        recoveryComplete.await(5,  TimeUnit.SECONDS);
 
         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
@@ -542,8 +569,10 @@ public class ShardTest extends AbstractShardTest {
         final DataTreeModification writeMod = source.takeSnapshot().newModification();
         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
+        InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
+
         // Set up the InMemoryJournal.
-        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
+        InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
 
         int nListEntries = 16;
         Set<Integer> listEntryKeys = new HashSet<>();
@@ -558,11 +587,11 @@ public class ShardTest extends AbstractShardTest {
             final DataTreeModification mod = source.takeSnapshot().newModification();
             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
 
-            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+            InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
                 payloadForModification(source, mod)));
         }
 
-        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
                 new ApplyJournalEntries(nListEntries));
 
         testRecovery(listEntryKeys);
@@ -576,7 +605,9 @@ public class ShardTest extends AbstractShardTest {
 
         // Set up the InMemoryJournal.
 
-        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
+        InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
+
+        InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
                   new WriteModification(TestModel.OUTER_LIST_PATH,
                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
@@ -590,11 +621,11 @@ public class ShardTest extends AbstractShardTest {
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
-            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+            InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
                     newModificationPayload(mod)));
         }
 
-        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
                 new ApplyJournalEntries(nListEntries));
 
         testRecovery(listEntryKeys);
index a2309be48f4e09f5b102418611bf7d035943a589..f1f96d4a7d7acc431e61d8e5688156b5286bd83d 100644 (file)
@@ -166,9 +166,12 @@ public class PreLithiumShardTest extends AbstractShardTest {
                                 getNormalizedNode().toByteString().toByteArray(),
                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
 
+        InMemoryJournal.addEntry(shardID.toString(), 0, new String("Dummy data as snapshot sequence number is " +
+                "set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"));
+
         // Set up the InMemoryJournal.
 
-        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
+        InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
                   new WriteModification(TestModel.OUTER_LIST_PATH,
                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
@@ -183,7 +186,7 @@ public class PreLithiumShardTest extends AbstractShardTest {
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
-            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+            InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
                     newLegacyPayload(mod)));
         }
 
@@ -194,11 +197,11 @@ public class PreLithiumShardTest extends AbstractShardTest {
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
-            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+            InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
                     newLegacyByteStringPayload(mod)));
         }
 
-        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyLogEntries(nListEntries));
 
         testRecovery(listEntryKeys);
     }