From: Kamal Rameshan Date: Thu, 28 May 2015 20:54:48 +0000 (-0700) Subject: Bug:3260-Recovery misses flows installed on single node X-Git-Tag: release/lithium~86 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=refs%2Fchanges%2F72%2F21372%2F3 Bug:3260-Recovery misses flows installed on single node There are 2 bugs which were enountered 1. When akka replays the journal, it replays it from the peristent journal's sequence number present at the time of snapshot success 2. The log entry on which a snapshot capture is triggered does not make it to that snapshot and gets removed from persistent journal as well. So on recovery, that log entry/data is missing To fix the first, the snapshotSeqNr() method of UnTypedPersistenActor is overridden, so that akka uses the cached last-sequence-number rather than using its own To fix the second issue, the capture of snapshot for single node is done after applyState. This ensures that the persistent journal and snapshot are in sync Also the in-memory journal was replaying all its messages and not honoring the fromSequenceNr, like akka does. So fixed it. The tests needed to be fixed primarily due to the in-memory journal change. A new test is added to test out the recovery of single node. Change-Id: I779d1d6ce9880b19322d831ef5c8696b4c751e3d Signed-off-by: Kamal Rameshan --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index f8bbf638a0..0b3fca090c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -209,6 +209,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { applyState(applyState.getClientActor(), applyState.getIdentifier(), applyState.getReplicatedLogEntry().getData()); + if (!hasFollowers()) { + // for single node, the capture should happen after the apply state + // as we delete messages from the persistent journal which have made it to the snapshot + // capturing the snapshot before applying makes the persistent journal and snapshot out of sync + // and recovery shows data missing + context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry()); + + context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior); + } + } else if (message instanceof ApplyJournalEntries){ ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; if(LOG.isDebugEnabled()) { @@ -312,6 +322,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return new LeaderStateChanged(memberId, leaderId); } + @Override + public long snapshotSequenceNr() { + // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal, + // so that we can delete the persistent journal based on the saved sequence-number + // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot + // was saved and not the number we saved. + // We would want to override it , by asking akka to use the last-sequence number known to us. + return context.getSnapshotManager().getLastSequenceNumber(); + } + /** * When a derived RaftActor needs to persist something it must call * persistData. @@ -336,21 +356,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure() { @Override public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception { - if(!hasFollowers()){ + if (!hasFollowers()){ // Increment the Commit Index and the Last Applied values raftContext.setCommitIndex(replicatedLogEntry.getIndex()); raftContext.setLastApplied(replicatedLogEntry.getIndex()); - // Apply the state immediately + // Apply the state immediately. self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self()); // Send a ApplyJournalEntries message so that we write the fact that we applied // the state to durable storage self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self()); - context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior); - } else if (clientActor != null) { + context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry); + // Send message for replication currentBehavior.handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry)); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 38db03e534..8cf01f11eb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -127,6 +127,8 @@ class RaftActorRecoverySupport { long lastUnappliedIndex = context.getLastApplied() + 1; if(log.isDebugEnabled()) { + // it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal + // but the entry itself has made it to that state and recovered via the snapshot log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}", context.getId(), lastUnappliedIndex, toIndex); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 8388eaf743..0ad1df3c33 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -181,4 +181,10 @@ public interface ReplicatedLog { */ int dataSize(); + /** + * We decide if snapshot need to be captured based on the count/memory consumed. + * @param replicatedLogEntry + */ + void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry); + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index 8c32eab61d..ab1e23bb94 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -61,6 +61,44 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { appendAndPersist(replicatedLogEntry, null); } + @Override + public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) { + long journalSize = replicatedLogEntry.getIndex() + 1; + long dataThreshold = context.getTotalMemory() * + context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; + + if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 + || getDataSizeForSnapshotCheck() > dataThreshold)) { + + boolean started = context.getSnapshotManager().capture(replicatedLogEntry, + currentBehavior.getReplicatedToAllIndex()); + if (started) { + if (!context.hasFollowers()) { + dataSizeSinceLastSnapshot = 0; + } + } + } + } + + private long getDataSizeForSnapshotCheck() { + long dataSizeForCheck = dataSize(); + if (!context.hasFollowers()) { + // When we do not have followers we do not maintain an in-memory log + // due to this the journalSize will never become anything close to the + // snapshot batch count. In fact will mostly be 1. + // Similarly since the journal's dataSize depends on the entries in the + // journal the journal's dataSize will never reach a value close to the + // memory threshold. + // By maintaining the dataSize outside the journal we are tracking essentially + // what we have written to the disk however since we no longer are in + // need of doing a snapshot just for the sake of freeing up memory we adjust + // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often + // as if we were maintaining a real snapshot + dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER; + } + return dataSizeForCheck; + } + @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry, final Procedure callback) { @@ -84,40 +122,8 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { context.getLogger().debug("{}: persist complete {}", context.getId(), replicatedLogEntry); int logEntrySize = replicatedLogEntry.size(); - - long dataSizeForCheck = dataSize(); - dataSizeSinceLastSnapshot += logEntrySize; - if (!context.hasFollowers()) { - // When we do not have followers we do not maintain an in-memory log - // due to this the journalSize will never become anything close to the - // snapshot batch count. In fact will mostly be 1. - // Similarly since the journal's dataSize depends on the entries in the - // journal the journal's dataSize will never reach a value close to the - // memory threshold. - // By maintaining the dataSize outside the journal we are tracking essentially - // what we have written to the disk however since we no longer are in - // need of doing a snapshot just for the sake of freeing up memory we adjust - // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often - // as if we were maintaining a real snapshot - dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER; - } - long journalSize = replicatedLogEntry.getIndex() + 1; - long dataThreshold = context.getTotalMemory() * - context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; - - if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 - || dataSizeForCheck > dataThreshold)) { - - boolean started = context.getSnapshotManager().capture(replicatedLogEntry, - currentBehavior.getReplicatedToAllIndex()); - - if(started){ - dataSizeSinceLastSnapshot = 0; - } - } - if (callback != null){ callback.apply(replicatedLogEntry); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 9a916625c9..9bbe285c29 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -82,6 +82,10 @@ public class SnapshotManager implements SnapshotState { this.createSnapshotProcedure = createSnapshotProcedure; } + public long getLastSequenceNumber() { + return lastSequenceNumber; + } + @VisibleForTesting public CaptureSnapshot getCaptureSnapshot() { return captureSnapshot; @@ -319,6 +323,7 @@ public class SnapshotManager implements SnapshotState { @Override public void commit(long sequenceNumber) { + LOG.debug("Snapshot success sequence number:", sequenceNumber); context.getReplicatedLog().snapshotCommit(); context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria( sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); @@ -365,6 +370,8 @@ public class SnapshotManager implements SnapshotState { this.term = -1L; if (!hasFollowers) { if(lastLogEntry != null) { + // since we have persisted the last-log-entry to persistent journal before the capture, + // we would want to snapshot from this entry. index = lastLogEntry.getIndex(); term = lastLogEntry.getTerm(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index d175289af5..afec6a5d3d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -306,5 +306,11 @@ public class AbstractReplicatedLogImplTest { @Override public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback) { } + + @Override + public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) { + } + + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 01ff6ce14f..e19b521610 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -243,7 +243,8 @@ public class MockRaftActorContext implements RaftActorContext { } public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl { - @Override public void appendAndPersist( + @Override + public void appendAndPersist( ReplicatedLogEntry replicatedLogEntry) { append(replicatedLogEntry); } @@ -253,6 +254,10 @@ public class MockRaftActorContext implements RaftActorContext { return -1; } + @Override + public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) { + } + @Override public void removeFromAndPersist(long index) { removeFrom(index); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java new file mode 100644 index 0000000000..a934586bbb --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import static org.junit.Assert.assertEquals; +import akka.actor.ActorRef; +import akka.persistence.SaveSnapshotSuccess; +import akka.testkit.TestActorRef; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recovery Integration Test for single node + */ +public class RecoveryIntegrationSingleNodeTest extends AbstractRaftActorIntegrationTest { + + static final Logger LOG = LoggerFactory.getLogger(RecoveryIntegrationSingleNodeTest.class); + + @Before + public void setup() { + leaderConfigParams = newLeaderConfigParams(); + } + + + @Test + public void testJournalReplayAfterSnapshotWithSingleNode() throws Exception { + + String persistenceId = factory.generateActorId("singleNode"); + TestActorRef singleNodeActorRef = newTestRaftActor(persistenceId, + ImmutableMap.builder().build(), leaderConfigParams); + + waitUntilLeader(singleNodeActorRef); + + ActorRef singleNodeCollectorActor = singleNodeActorRef.underlyingActor().collectorActor(); + RaftActorContext singleNodeContext = singleNodeActorRef.underlyingActor().getRaftActorContext(); + + + MockRaftActorContext.MockPayload payload0 = sendPayloadData(singleNodeActorRef, "zero"); + MockRaftActorContext.MockPayload payload1 = sendPayloadData(singleNodeActorRef, "one"); + MockRaftActorContext.MockPayload payload2 = sendPayloadData(singleNodeActorRef, "two"); + + MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 3); + + // this should trigger a snapshot + MockRaftActorContext.MockPayload payload3 = sendPayloadData(singleNodeActorRef, "three"); + + MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 4); + + //add 2 more + MockRaftActorContext.MockPayload payload4 = sendPayloadData(singleNodeActorRef, "four"); + MockRaftActorContext.MockPayload payload5 = sendPayloadData(singleNodeActorRef, "five"); + + + // Wait for snapshot complete. + MessageCollectorActor.expectFirstMatching(singleNodeCollectorActor, SaveSnapshotSuccess.class); + + MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 6); + + assertEquals("Last applied", 5, singleNodeContext.getLastApplied()); + + assertEquals("Incorrect State after snapshot success is received ", + Lists.newArrayList(payload0, payload1, payload2, payload3, payload4, payload5), singleNodeActorRef.underlyingActor().getState()); + + // we get 2 log entries (4 and 5 indexes) and 3 ApplyJournalEntries (for 3, 4, and 5 indexes) + assertEquals(5, InMemoryJournal.get(persistenceId).size()); + + List persistedSnapshots = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); + assertEquals(1, persistedSnapshots.size()); + + List snapshottedState = (List)MockRaftActor.toObject(persistedSnapshots.get(0).getState()); + assertEquals("Incorrect Snapshot", Lists.newArrayList(payload0, payload1, payload2, payload3), snapshottedState); + + //recovery logic starts + killActor(singleNodeActorRef); + + singleNodeActorRef = newTestRaftActor(persistenceId, + ImmutableMap.builder().build(), leaderConfigParams); + + singleNodeActorRef.underlyingActor().waitForRecoveryComplete(); + + assertEquals("Incorrect State after Recovery ", + Lists.newArrayList(payload0, payload1, payload2, payload3, payload4, payload5), singleNodeActorRef.underlyingActor().getState()); + + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java index fbe6fc4501..51cff356f2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.reset; @@ -50,9 +49,6 @@ public class ReplicatedLogImplTest { @Mock private RaftActorBehavior mockBehavior; - @Mock - private SnapshotManager mockSnapshotManager; - private RaftActorContext context; private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); @@ -62,12 +58,7 @@ public class ReplicatedLogImplTest { context = new RaftActorContextImpl(null, null, "test", new ElectionTermImpl(mockPersistence, "test", LOG), - -1, -1, Collections.emptyMap(), configParams, mockPersistence, LOG) { - @Override - public SnapshotManager getSnapshotManager() { - return mockSnapshotManager; - } - }; + -1, -1, Collections.emptyMap(), configParams, mockPersistence, LOG); } private void verifyPersist(Object message) throws Exception { @@ -103,7 +94,6 @@ public class ReplicatedLogImplTest { verifyPersist(logEntry); verify(mockCallback).apply(same(logEntry)); - verifyNoMoreInteractions(mockSnapshotManager); assertEquals("size", 2, log.size()); } @@ -122,13 +112,11 @@ public class ReplicatedLogImplTest { log.appendAndPersist(logEntry1); verifyPersist(logEntry1); - verifyNoMoreInteractions(mockSnapshotManager); reset(mockPersistence); log.appendAndPersist(logEntry2); verifyPersist(logEntry2); - verify(mockSnapshotManager).capture(same(logEntry2), eq(1L)); assertEquals("size", 2, log.size()); } @@ -149,22 +137,16 @@ public class ReplicatedLogImplTest { int dataSize = 600; MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize)); - doReturn(true).when(mockSnapshotManager).capture(same(logEntry), eq(1L)); - log.appendAndPersist(logEntry); verifyPersist(logEntry); - verify(mockSnapshotManager).capture(same(logEntry), eq(1L)); - - reset(mockPersistence, mockSnapshotManager); + reset(mockPersistence); logEntry = new MockReplicatedLogEntry(1, 3, new MockPayload("3", 5)); log.appendAndPersist(logEntry); verifyPersist(logEntry); - verifyNoMoreInteractions(mockSnapshotManager); - assertEquals("size", 2, log.size()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java index 0e2449e4b8..7142c47829 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java @@ -139,8 +139,8 @@ public class InMemoryJournal extends AsyncWriteJournal { } @Override - public Future doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, - long toSequenceNr, long max, final Procedure replayCallback) { + public Future doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr, + final long toSequenceNr, long max, final Procedure replayCallback) { return Futures.future(new Callable() { @Override public Void call() throws Exception { @@ -150,16 +150,18 @@ public class InMemoryJournal extends AsyncWriteJournal { } Map journal = journals.get(persistenceId); - if(journal == null) { + if (journal == null) { return null; } synchronized (journal) { for (Map.Entry entry : journal.entrySet()) { - PersistentRepr persistentMessage = - new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, - false, null, null); - replayCallback.apply(persistentMessage); + if (entry.getKey() >= fromSequenceNr && entry.getKey() <= toSequenceNr) { + PersistentRepr persistentMessage = + new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, + false, null, null); + replayCallback.apply(persistentMessage); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 49cfcbc9c2..c8953be5c4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -121,6 +121,31 @@ import scala.concurrent.duration.FiniteDuration; public class ShardTest extends AbstractShardTest { private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars"); + private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"; + + final CountDownLatch recoveryComplete = new CountDownLatch(1); + + protected Props newShardPropsWithRecoveryComplete() { + + Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new Shard(shardID, Collections.emptyMap(), + newDatastoreContext(), SCHEMA_CONTEXT) { + @Override + protected void onRecoveryComplete() { + try { + super.onRecoveryComplete(); + } finally { + recoveryComplete.countDown(); + } + } + }; + } + }; + return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); + } + @Test public void testRegisterChangeListener() throws Exception { new ShardTestKit(getSystem()) {{ @@ -497,7 +522,9 @@ public class ShardTest extends AbstractShardTest { @Test public void testApplyStateWithCandidatePayload() throws Exception { - TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); + TestActorRef shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState"); + + recoveryComplete.await(5, TimeUnit.SECONDS); NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node); @@ -542,8 +569,10 @@ public class ShardTest extends AbstractShardTest { final DataTreeModification writeMod = source.takeSnapshot().newModification(); writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA); + // Set up the InMemoryJournal. - InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod))); + InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod))); int nListEntries = 16; Set listEntryKeys = new HashSet<>(); @@ -558,11 +587,11 @@ public class ShardTest extends AbstractShardTest { final DataTreeModification mod = source.takeSnapshot().newModification(); mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1, payloadForModification(source, mod))); } - InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyJournalEntries(nListEntries)); testRecovery(listEntryKeys); @@ -576,7 +605,9 @@ public class ShardTest extends AbstractShardTest { // Set up the InMemoryJournal. - InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload( + InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA); + + InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload( new WriteModification(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); @@ -590,11 +621,11 @@ public class ShardTest extends AbstractShardTest { .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1, newModificationPayload(mod))); } - InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyJournalEntries(nListEntries)); testRecovery(listEntryKeys); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index a2309be48f..f1f96d4a7d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -166,9 +166,12 @@ public class PreLithiumShardTest extends AbstractShardTest { getNormalizedNode().toByteString().toByteArray(), Collections.emptyList(), 0, 1, -1, -1)); + InMemoryJournal.addEntry(shardID.toString(), 0, new String("Dummy data as snapshot sequence number is " + + "set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1")); + // Set up the InMemoryJournal. - InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( + InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( new WriteModification(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); @@ -183,7 +186,7 @@ public class PreLithiumShardTest extends AbstractShardTest { .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1, newLegacyPayload(mod))); } @@ -194,11 +197,11 @@ public class PreLithiumShardTest extends AbstractShardTest { .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1, newLegacyByteStringPayload(mod))); } - InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries)); + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyLogEntries(nListEntries)); testRecovery(listEntryKeys); }