X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=7dfbd668b811231b4b32edd76a04987c671aba99;hb=8177f76b3e8bd802cc8e7a05ba3f192f219ab0ee;hp=14fc3a12bd9b97bf891586eb3902c502fa18e142;hpb=0d4c11af06567b4692b8894bbe2cac16cb4db0ad;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 14fc3a12bd..7dfbd668b8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -17,7 +17,9 @@ import akka.actor.Props; import akka.dispatch.Dispatchers; import akka.dispatch.OnComplete; import akka.japi.Creator; +import akka.japi.Procedure; import akka.pattern.Patterns; +import akka.persistence.SnapshotSelectionCriteria; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Function; @@ -30,6 +32,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -43,6 +46,7 @@ import org.junit.Test; import org.mockito.InOrder; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; @@ -66,10 +70,15 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -364,13 +373,41 @@ public class ShardTest extends AbstractActorTest { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplySnapshot"); - NormalizedNodeToNodeCodec codec = - new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT); + InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); + store.onGlobalContextUpdated(SCHEMA_CONTEXT); + + writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + YangInstanceIdentifier root = YangInstanceIdentifier.builder().build(); + NormalizedNode expected = readStore(store, root); + + ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create( + SerializationUtils.serializeNormalizedNode(expected), + Collections.emptyList(), 1, 2, 3, 4)); + + shard.underlyingActor().onReceiveCommand(applySnapshot); + + NormalizedNode actual = readStore(shard, root); + + assertEquals("Root node", expected, actual); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + @Test + public void testApplyHelium2VersionSnapshot() throws Exception { + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), + "testApplySnapshot"); + + NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT); + + InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); + store.onGlobalContextUpdated(SCHEMA_CONTEXT); - writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); YangInstanceIdentifier root = YangInstanceIdentifier.builder().build(); - NormalizedNode expected = readStore(shard, root); + NormalizedNode expected = readStore(store, root); NormalizedNodeMessages.Container encode = codec.encode(expected); @@ -382,7 +419,7 @@ public class ShardTest extends AbstractActorTest { NormalizedNode actual = readStore(shard, root); - assertEquals(expected, actual); + assertEquals("Root node", expected, actual); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } @@ -423,7 +460,6 @@ public class ShardTest extends AbstractActorTest { shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - @SuppressWarnings("serial") @Test public void testRecovery() throws Exception { @@ -432,20 +468,13 @@ public class ShardTest extends AbstractActorTest { InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null); testStore.onGlobalContextUpdated(SCHEMA_CONTEXT); - DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction(); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready(); - commitCohort.preCommit().get(); - commitCohort.commit().get(); + writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction(); - NormalizedNode root = readTx.read(YangInstanceIdentifier.builder().build()).get().get(); + NormalizedNode root = readStore(testStore, YangInstanceIdentifier.builder().build()); InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( - new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode( - root). - getNormalizedNode().toByteString().toByteArray(), - Collections.emptyList(), 0, 1, -1, -1)); + SerializationUtils.serializeNormalizedNode(root), + Collections.emptyList(), 0, 1, -1, -1)); // Set up the InMemoryJournal. @@ -455,31 +484,63 @@ public class ShardTest extends AbstractActorTest { int nListEntries = 16; Set listEntryKeys = new HashSet<>(); - int i = 1; - // Add some of the legacy CompositeModificationPayload - for(; i <= 2; i++) { + // Add some ModificationPayload entries + for(int i = 1; i <= nListEntries; i++) { listEntryKeys.add(Integer.valueOf(i)); YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, - newLegacyPayload(mod))); + newModificationPayload(mod))); } - // Add some of the legacy CompositeModificationByteStringPayload - for(; i <= 5; i++) { + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, + new ApplyJournalEntries(nListEntries)); + + testRecovery(listEntryKeys); + } + + @Test + public void testHelium2VersionRecovery() throws Exception { + + // Set up the InMemorySnapshotStore. + + InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null); + testStore.onGlobalContextUpdated(SCHEMA_CONTEXT); + + writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + NormalizedNode root = readStore(testStore, YangInstanceIdentifier.builder().build()); + + InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( + new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root). + getNormalizedNode().toByteString().toByteArray(), + Collections.emptyList(), 0, 1, -1, -1)); + + // Set up the InMemoryJournal. + + InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( + new WriteModification(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); + + int nListEntries = 16; + Set listEntryKeys = new HashSet<>(); + int i = 1; + + // Add some CompositeModificationPayload entries + for(; i <= 8; i++) { listEntryKeys.add(Integer.valueOf(i)); YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, - newLegacyByteStringPayload(mod))); + newLegacyPayload(mod))); } - // Add some of the ModificationPayload + // Add some CompositeModificationByteStringPayload entries for(; i <= nListEntries; i++) { listEntryKeys.add(Integer.valueOf(i)); YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) @@ -487,16 +548,22 @@ public class ShardTest extends AbstractActorTest { Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, - newModificationPayload(mod))); + newLegacyByteStringPayload(mod))); } - InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, - new ApplyLogEntries(nListEntries)); + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries)); + + testRecovery(listEntryKeys); + } + private void testRecovery(Set listEntryKeys) throws Exception { // Create the actor and wait for recovery complete. + int nListEntries = listEntryKeys.size(); + final CountDownLatch recoveryComplete = new CountDownLatch(1); + @SuppressWarnings("serial") Creator creator = new Creator() { @Override public Shard create() throws Exception { @@ -1319,19 +1386,54 @@ public class ShardTest extends AbstractActorTest { } @Test - public void testCreateSnapshot() throws IOException, InterruptedException { - testCreateSnapshot(true, "testCreateSnapshot"); + public void testCreateSnapshot() throws Exception { + testCreateSnapshot(true, "testCreateSnapshot"); } @Test - public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException { + public void testCreateSnapshotWithNonPersistentData() throws Exception { testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData"); } @SuppressWarnings("serial") - public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException { - final DatastoreContext dataStoreContext = DatastoreContext.newBuilder(). - shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build(); + public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{ + + final AtomicReference savedSnapshot = new AtomicReference<>(); + class DelegatingPersistentDataProvider implements DataPersistenceProvider { + DataPersistenceProvider delegate; + + DelegatingPersistentDataProvider(DataPersistenceProvider delegate) { + this.delegate = delegate; + } + + @Override + public boolean isRecoveryApplicable() { + return delegate.isRecoveryApplicable(); + } + + @Override + public void persist(T o, Procedure procedure) { + delegate.persist(o, procedure); + } + + @Override + public void saveSnapshot(Object o) { + savedSnapshot.set(o); + delegate.saveSnapshot(o); + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + delegate.deleteSnapshots(criteria); + } + + @Override + public void deleteMessages(long sequenceNumber) { + delegate.deleteMessages(sequenceNumber); + } + } + + dataStoreContextBuilder.persistent(persistent); new ShardTestKit(getSystem()) {{ final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); @@ -1340,6 +1442,18 @@ public class ShardTest extends AbstractActorTest { public Shard create() throws Exception { return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { + + DelegatingPersistentDataProvider delegating; + + @Override + protected DataPersistenceProvider persistence() { + if(delegating == null) { + delegating = new DelegatingPersistentDataProvider(super.persistence()); + } + + return delegating; + } + @Override protected void commitSnapshot(final long sequenceNumber) { super.commitSnapshot(sequenceNumber); @@ -1354,16 +1468,40 @@ public class ShardTest extends AbstractActorTest { waitUntilLeader(shard); - shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef()); + writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build()); + + CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1); + shard.tell(capture, getRef()); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); + assertTrue("Invalid saved snapshot " + savedSnapshot.get(), + savedSnapshot.get() instanceof Snapshot); + + verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot); + latch.set(new CountDownLatch(1)); - shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef()); + savedSnapshot.set(null); + + shard.tell(capture, getRef()); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); + assertTrue("Invalid saved snapshot " + savedSnapshot.get(), + savedSnapshot.get() instanceof Snapshot); + + verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + private void verifySnapshot(Snapshot snapshot, NormalizedNode expectedRoot) { + + NormalizedNode actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState()); + assertEquals("Root node", expectedRoot, actual); + }}; } @@ -1431,6 +1569,58 @@ public class ShardTest extends AbstractActorTest { } + @Test + public void testOnDatastoreContext() { + new ShardTestKit(getSystem()) {{ + dataStoreContextBuilder.persistent(true); + + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext"); + + assertEquals("isRecoveryApplicable", true, + shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + + waitUntilLeader(shard); + + shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender()); + + assertEquals("isRecoveryApplicable", false, + shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + + shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender()); + + assertEquals("isRecoveryApplicable", true, + shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testRegisterRoleChangeListener() throws Exception { + new ShardTestKit(getSystem()) { + { + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testRegisterRoleChangeListener"); + + waitUntilLeader(shard); + + TestActorRef listener = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + shard.tell(new RegisterRoleChangeListener(), listener); + + // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore + // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary + // sleep. + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + + List allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class); + + assertEquals(1, allMatching.size()); + }}; + } + private NormalizedNode readStore(final InMemoryDOMDataStore store) throws ReadFailedException { DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); @@ -1470,7 +1660,12 @@ public class ShardTest extends AbstractActorTest { static NormalizedNode readStore(final TestActorRef shard, final YangInstanceIdentifier id) throws ExecutionException, InterruptedException { - DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction(); + return readStore(shard.underlyingActor().getDataStore(), id); + } + + public static NormalizedNode readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id) + throws ExecutionException, InterruptedException { + DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); CheckedFuture>, ReadFailedException> future = transaction.read(id); @@ -1483,9 +1678,14 @@ public class ShardTest extends AbstractActorTest { return node; } - private void writeToStore(final TestActorRef shard, final YangInstanceIdentifier id, final NormalizedNode node) - throws ExecutionException, InterruptedException { - DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction(); + static void writeToStore(final TestActorRef shard, final YangInstanceIdentifier id, + final NormalizedNode node) throws ExecutionException, InterruptedException { + writeToStore(shard.underlyingActor().getDataStore(), id, node); + } + + public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id, + final NormalizedNode node) throws ExecutionException, InterruptedException { + DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); transaction.write(id, node);