From: Tom Pantelis Date: Sat, 7 Mar 2015 04:25:57 +0000 (-0500) Subject: Refactor ShardTest X-Git-Tag: release/lithium~402^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=b9508ff048e882f4227abbaa9b6ac991bb45f9ab Refactor ShardTest Refactored the helium compatibility tests in ShardTest into a new PreLithiumShardTest class to keep them separate. This will make it easier to remove the Helium test code when it's no longer needed. Since it needs the same setup as ShardTest and some of the utility methods, I moved all the setup and utility methods to an AbstractShardTest to avoid duplication. This also makes ShardTest smaller and easier to follow. I also moved the ShardTransactionHeliumBackwardsCompatibilityTest to the compat package. Change-Id: Ie46441adbd0be4c4a655f5db640519705a6ced29 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index e704e42465..94be1b0dc1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -981,7 +981,7 @@ public class Shard extends RaftActor { } @VisibleForTesting - InMemoryDOMDataStore getDataStore() { + public InMemoryDOMDataStore getDataStore() { return store; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java new file mode 100644 index 0000000000..2c41b001f8 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -0,0 +1,292 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.japi.Creator; +import akka.testkit.TestActorRef; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +/** + * Abstract base for shard unit tests. + * + * @author Thomas Pantelis + */ +public abstract class AbstractShardTest extends AbstractActorTest{ + protected static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext(); + + private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger(); + + protected final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build(); + + protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder(). + shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000). + shardHeartbeatIntervalInMillis(100); + + @Before + public void setUp() { + InMemorySnapshotStore.clear(); + InMemoryJournal.clear(); + } + + @After + public void tearDown() { + InMemorySnapshotStore.clear(); + InMemoryJournal.clear(); + } + + protected DatastoreContext newDatastoreContext() { + return dataStoreContextBuilder.build(); + } + + protected Props newShardProps() { + return Shard.props(shardID, Collections.emptyMap(), + newDatastoreContext(), SCHEMA_CONTEXT); + } + + protected void testRecovery(Set listEntryKeys) throws Exception { + // Create the actor and wait for recovery complete. + + int nListEntries = listEntryKeys.size(); + + final CountDownLatch recoveryComplete = new CountDownLatch(1); + + @SuppressWarnings("serial") + Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new Shard(shardID, Collections.emptyMap(), + newDatastoreContext(), SCHEMA_CONTEXT) { + @Override + protected void onRecoveryComplete() { + try { + super.onRecoveryComplete(); + } finally { + recoveryComplete.countDown(); + } + } + }; + } + }; + + TestActorRef shard = TestActorRef.create(getSystem(), + Props.create(new DelegatingShardCreator(creator)), "testRecovery"); + + assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS)); + + // Verify data in the data store. + + NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); + assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", + outerList.getValue() instanceof Iterable); + for(Object entry: (Iterable) outerList.getValue()) { + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", + entry instanceof MapEntryNode); + MapEntryNode mapEntry = (MapEntryNode)entry; + Optional> idLeaf = + mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); + assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); + Object value = idLeaf.get().getValue(); + assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value, + listEntryKeys.remove(value)); + } + + if(!listEntryKeys.isEmpty()) { + fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " + + listEntryKeys); + } + + assertEquals("Last log index", nListEntries, + shard.underlyingActor().getShardMBean().getLastLogIndex()); + assertEquals("Commit index", nListEntries, + shard.underlyingActor().getShardMBean().getCommitIndex()); + assertEquals("Last applied", nListEntries, + shard.underlyingActor().getShardMBean().getLastApplied()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + protected void verifyLastLogIndex(TestActorRef shard, long expectedValue) { + for(int i = 0; i < 20 * 5; i++) { + long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex(); + if(lastLogIndex == expectedValue) { + break; + } + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex()); + } + + protected NormalizedNode readStore(final InMemoryDOMDataStore store) throws ReadFailedException { + DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); + CheckedFuture>, ReadFailedException> read = + transaction.read(YangInstanceIdentifier.builder().build()); + + Optional> optional = read.checkedGet(); + + NormalizedNode normalizedNode = optional.get(); + + transaction.close(); + + return normalizedNode; + } + + protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, + final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode data, + final MutableCompositeModification modification) { + return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null); + } + + protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, + final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode data, + final MutableCompositeModification modification, + final Function> preCommit) { + + DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction(); + tx.write(path, data); + DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, tx.ready(), preCommit); + + modification.addModification(new WriteModification(path, data)); + + return cohort; + } + + protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName, + final DOMStoreThreePhaseCommitCohort actual) { + return createDelegatingMockCohort(cohortName, actual, null); + } + + protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName, + final DOMStoreThreePhaseCommitCohort actual, + final Function> preCommit) { + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName); + + doAnswer(new Answer>() { + @Override + public ListenableFuture answer(final InvocationOnMock invocation) { + return actual.canCommit(); + } + }).when(cohort).canCommit(); + + doAnswer(new Answer>() { + @Override + public ListenableFuture answer(final InvocationOnMock invocation) throws Throwable { + return actual.preCommit(); + } + }).when(cohort).preCommit(); + + doAnswer(new Answer>() { + @Override + public ListenableFuture answer(final InvocationOnMock invocation) throws Throwable { + return actual.commit(); + } + }).when(cohort).commit(); + + doAnswer(new Answer>() { + @Override + public ListenableFuture answer(final InvocationOnMock invocation) throws Throwable { + return actual.abort(); + } + }).when(cohort).abort(); + + return cohort; + } + + public static NormalizedNode readStore(final TestActorRef shard, final YangInstanceIdentifier id) + throws ExecutionException, InterruptedException { + return readStore(shard.underlyingActor().getDataStore(), id); + } + + public static NormalizedNode readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id) + throws ExecutionException, InterruptedException { + DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); + + CheckedFuture>, ReadFailedException> future = + transaction.read(id); + + Optional> optional = future.get(); + NormalizedNode node = optional.isPresent()? optional.get() : null; + + transaction.close(); + + return node; + } + + public static void writeToStore(final TestActorRef shard, final YangInstanceIdentifier id, + final NormalizedNode node) throws ExecutionException, InterruptedException { + writeToStore(shard.underlyingActor().getDataStore(), id, node); + } + + public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id, + final NormalizedNode node) throws ExecutionException, InterruptedException { + DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); + + transaction.write(id, node); + + DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + commitCohort.preCommit().get(); + commitCohort.commit().get(); + } + + @SuppressWarnings("serial") + public static final class DelegatingShardCreator implements Creator { + private final Creator delegate; + + DelegatingShardCreator(final Creator delegate) { + this.delegate = delegate; + } + + @Override + public Shard create() throws Exception { + return delegate.create(); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index d930b2519e..f38b510947 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -5,8 +5,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -24,7 +22,6 @@ import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -38,16 +35,10 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -67,7 +58,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; @@ -79,7 +69,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -87,19 +76,13 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -108,46 +91,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -public class ShardTest extends AbstractActorTest { - - private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext(); - - private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger(); - - private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build(); - - private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder(). - shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000). - shardHeartbeatIntervalInMillis(100); - - @Before - public void setUp() { - Builder newBuilder = DatastoreContext.newBuilder(); - InMemorySnapshotStore.clear(); - InMemoryJournal.clear(); - } - - @After - public void tearDown() { - InMemorySnapshotStore.clear(); - InMemoryJournal.clear(); - } - - private DatastoreContext newDatastoreContext() { - return dataStoreContextBuilder.build(); - } - - private Props newShardProps() { - return Shard.props(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT); - } - +public class ShardTest extends AbstractShardTest { @Test public void testRegisterChangeListener() throws Exception { new ShardTestKit(getSystem()) {{ @@ -395,36 +343,6 @@ public class ShardTest extends AbstractActorTest { shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - @Test - public void testApplyHelium2VersionSnapshot() throws Exception { - TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), - "testApplySnapshot"); - - NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT); - - InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); - store.onGlobalContextUpdated(SCHEMA_CONTEXT); - - writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - YangInstanceIdentifier root = YangInstanceIdentifier.builder().build(); - NormalizedNode expected = readStore(store, root); - - NormalizedNodeMessages.Container encode = codec.encode(expected); - - ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create( - encode.getNormalizedNode().toByteString().toByteArray(), - Collections.emptyList(), 1, 2, 3, 4)); - - shard.underlyingActor().onReceiveCommand(applySnapshot); - - NormalizedNode actual = readStore(shard, root); - - assertEquals("Root node", expected, actual); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - @Test public void testApplyState() throws Exception { @@ -443,24 +361,6 @@ public class ShardTest extends AbstractActorTest { shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - @Test - public void testApplyStateLegacy() throws Exception { - - TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy"); - - NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - - ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, - newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node)))); - - shard.underlyingActor().onReceiveCommand(applyState); - - NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); - assertEquals("Applied state", node, actual); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - @Test public void testRecovery() throws Exception { @@ -479,7 +379,7 @@ public class ShardTest extends AbstractActorTest { // Set up the InMemoryJournal. - InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( + InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload( new WriteModification(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); @@ -503,141 +403,6 @@ public class ShardTest extends AbstractActorTest { testRecovery(listEntryKeys); } - @Test - public void testHelium2VersionRecovery() throws Exception { - - // Set up the InMemorySnapshotStore. - - InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null); - testStore.onGlobalContextUpdated(SCHEMA_CONTEXT); - - writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - NormalizedNode root = readStore(testStore, YangInstanceIdentifier.builder().build()); - - InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( - new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root). - getNormalizedNode().toByteString().toByteArray(), - Collections.emptyList(), 0, 1, -1, -1)); - - // Set up the InMemoryJournal. - - InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( - new WriteModification(TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); - - int nListEntries = 16; - Set listEntryKeys = new HashSet<>(); - int i = 1; - - // Add some CompositeModificationPayload entries - for(; i <= 8; i++) { - listEntryKeys.add(Integer.valueOf(i)); - YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); - Modification mod = new MergeModification(path, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, - newLegacyPayload(mod))); - } - - // Add some CompositeModificationByteStringPayload entries - for(; i <= nListEntries; i++) { - listEntryKeys.add(Integer.valueOf(i)); - YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); - Modification mod = new MergeModification(path, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, - newLegacyByteStringPayload(mod))); - } - - InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries)); - - testRecovery(listEntryKeys); - } - - private void testRecovery(Set listEntryKeys) throws Exception { - // Create the actor and wait for recovery complete. - - int nListEntries = listEntryKeys.size(); - - final CountDownLatch recoveryComplete = new CountDownLatch(1); - - @SuppressWarnings("serial") - Creator creator = new Creator() { - @Override - public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { - @Override - protected void onRecoveryComplete() { - try { - super.onRecoveryComplete(); - } finally { - recoveryComplete.countDown(); - } - } - }; - } - }; - - TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(creator)), "testRecovery"); - - assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS)); - - // Verify data in the data store. - - NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); - assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", - outerList.getValue() instanceof Iterable); - for(Object entry: (Iterable) outerList.getValue()) { - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", - entry instanceof MapEntryNode); - MapEntryNode mapEntry = (MapEntryNode)entry; - Optional> idLeaf = - mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); - assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); - Object value = idLeaf.get().getValue(); - assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value, - listEntryKeys.remove(value)); - } - - if(!listEntryKeys.isEmpty()) { - fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " + - listEntryKeys); - } - - assertEquals("Last log index", nListEntries, - shard.underlyingActor().getShardMBean().getLastLogIndex()); - assertEquals("Commit index", nListEntries, - shard.underlyingActor().getShardMBean().getCommitIndex()); - assertEquals("Last applied", nListEntries, - shard.underlyingActor().getShardMBean().getLastApplied()); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - - private CompositeModificationPayload newLegacyPayload(final Modification... mods) { - MutableCompositeModification compMod = new MutableCompositeModification(); - for(Modification mod: mods) { - compMod.addModification(mod); - } - - return new CompositeModificationPayload(compMod.toSerializable()); - } - - private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) { - MutableCompositeModification compMod = new MutableCompositeModification(); - for(Modification mod: mods) { - compMod.addModification(mod); - } - - return new CompositeModificationByteStringPayload(compMod.toSerializable()); - } - private ModificationPayload newModificationPayload(final Modification... mods) throws IOException { MutableCompositeModification compMod = new MutableCompositeModification(); for(Modification mod: mods) { @@ -647,59 +412,6 @@ public class ShardTest extends AbstractActorTest { return new ModificationPayload(compMod); } - private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, - final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode data, - final MutableCompositeModification modification) { - return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null); - } - - private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, - final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode data, - final MutableCompositeModification modification, - final Function> preCommit) { - - DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction(); - tx.write(path, data); - final DOMStoreThreePhaseCommitCohort realCohort = tx.ready(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName); - - doAnswer(new Answer>() { - @Override - public ListenableFuture answer(final InvocationOnMock invocation) { - return realCohort.canCommit(); - } - }).when(cohort).canCommit(); - - doAnswer(new Answer>() { - @Override - public ListenableFuture answer(final InvocationOnMock invocation) throws Throwable { - if(preCommit != null) { - return preCommit.apply(realCohort); - } else { - return realCohort.preCommit(); - } - } - }).when(cohort).preCommit(); - - doAnswer(new Answer>() { - @Override - public ListenableFuture answer(final InvocationOnMock invocation) throws Throwable { - return realCohort.commit(); - } - }).when(cohort).commit(); - - doAnswer(new Answer>() { - @Override - public ListenableFuture answer(final InvocationOnMock invocation) throws Throwable { - return realCohort.abort(); - } - }).when(cohort).abort(); - - modification.addModification(new WriteModification(path, data)); - - return cohort; - } - @SuppressWarnings({ "unchecked" }) @Test public void testConcurrentThreePhaseCommits() throws Throwable { @@ -887,18 +599,6 @@ public class ShardTest extends AbstractActorTest { }}; } - private void verifyLastLogIndex(TestActorRef shard, long expectedValue) { - for(int i = 0; i < 20 * 5; i++) { - long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex(); - if(lastLogIndex == expectedValue) { - break; - } - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - } - - assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex()); - } - @Test public void testCommitWithPersistenceDisabled() throws Throwable { dataStoreContextBuilder.persistent(false); @@ -1567,7 +1267,6 @@ public class ShardTest extends AbstractActorTest { shard2.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; - } @Test @@ -1640,21 +1339,6 @@ public class ShardTest extends AbstractActorTest { shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - - private NormalizedNode readStore(final InMemoryDOMDataStore store) throws ReadFailedException { - DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); - CheckedFuture>, ReadFailedException> read = - transaction.read(YangInstanceIdentifier.builder().build()); - - Optional> optional = read.checkedGet(); - - NormalizedNode normalizedNode = optional.get(); - - transaction.close(); - - return normalizedNode; - } - private void commitTransaction(final DOMStoreWriteTransaction transaction) { DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); ListenableFuture future = @@ -1666,64 +1350,4 @@ public class ShardTest extends AbstractActorTest { } catch (InterruptedException | ExecutionException e) { } } - - private AsyncDataChangeListener> noOpDataChangeListener() { - return new AsyncDataChangeListener>() { - @Override - public void onDataChanged( - final AsyncDataChangeEvent> change) { - - } - }; - } - - static NormalizedNode readStore(final TestActorRef shard, final YangInstanceIdentifier id) - throws ExecutionException, InterruptedException { - return readStore(shard.underlyingActor().getDataStore(), id); - } - - public static NormalizedNode readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id) - throws ExecutionException, InterruptedException { - DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); - - CheckedFuture>, ReadFailedException> future = - transaction.read(id); - - Optional> optional = future.get(); - NormalizedNode node = optional.isPresent()? optional.get() : null; - - transaction.close(); - - return node; - } - - static void writeToStore(final TestActorRef shard, final YangInstanceIdentifier id, - final NormalizedNode node) throws ExecutionException, InterruptedException { - writeToStore(shard.underlyingActor().getDataStore(), id, node); - } - - public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id, - final NormalizedNode node) throws ExecutionException, InterruptedException { - DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); - - transaction.write(id, node); - - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); - commitCohort.preCommit().get(); - commitCohort.commit().get(); - } - - @SuppressWarnings("serial") - private static final class DelegatingShardCreator implements Creator { - private final Creator delegate; - - DelegatingShardCreator(final Creator delegate) { - this.delegate = delegate; - } - - @Override - public Shard create() throws Exception { - return delegate.create(); - } - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java index fa15db6949..2d1b14c269 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java @@ -13,6 +13,8 @@ import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import akka.util.Timeout; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.Assert; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; @@ -21,12 +23,9 @@ import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -class ShardTestKit extends JavaTestKit { +public class ShardTestKit extends JavaTestKit { - ShardTestKit(ActorSystem actorSystem) { + protected ShardTestKit(ActorSystem actorSystem) { super(actorSystem); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java new file mode 100644 index 0000000000..ded5226d30 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -0,0 +1,394 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.compat; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.inOrder; +import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import akka.dispatch.Dispatchers; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.testkit.TestActorRef; +import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.MoreExecutors; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Test; +import org.mockito.InOrder; +import org.opendaylight.controller.cluster.datastore.AbstractShardTest; +import org.opendaylight.controller.cluster.datastore.Shard; +import org.opendaylight.controller.cluster.datastore.ShardTestKit; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; +import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +/** + * Unit tests for backwards compatibility with pre-Lithium versions. + * + * @author Thomas Pantelis + */ +public class PreLithiumShardTest extends AbstractShardTest { + + private CompositeModificationPayload newLegacyPayload(final Modification... mods) { + MutableCompositeModification compMod = new MutableCompositeModification(); + for(Modification mod: mods) { + compMod.addModification(mod); + } + + return new CompositeModificationPayload(compMod.toSerializable()); + } + + private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) { + MutableCompositeModification compMod = new MutableCompositeModification(); + for(Modification mod: mods) { + compMod.addModification(mod); + } + + return new CompositeModificationByteStringPayload(compMod.toSerializable()); + } + + private ModificationPayload newModificationPayload(final Modification... mods) throws IOException { + MutableCompositeModification compMod = new MutableCompositeModification(); + for(Modification mod: mods) { + compMod.addModification(mod); + } + + return new ModificationPayload(compMod); + } + + @Test + public void testApplyHelium2VersionSnapshot() throws Exception { + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), + "testApplyHelium2VersionSnapshot"); + + NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT); + + InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); + store.onGlobalContextUpdated(SCHEMA_CONTEXT); + + writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + YangInstanceIdentifier root = YangInstanceIdentifier.builder().build(); + NormalizedNode expected = readStore(store, root); + + NormalizedNodeMessages.Container encode = codec.encode(expected); + + ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create( + encode.getNormalizedNode().toByteString().toByteArray(), + Collections.emptyList(), 1, 2, 3, 4)); + + shard.underlyingActor().onReceiveCommand(applySnapshot); + + NormalizedNode actual = readStore(shard, root); + + assertEquals("Root node", expected, actual); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + @Test + public void testHelium2VersionApplyStateLegacy() throws Exception { + + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testHelium2VersionApplyStateLegacy"); + + NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, + newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node)))); + + shard.underlyingActor().onReceiveCommand(applyState); + + NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); + assertEquals("Applied state", node, actual); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + @Test + public void testHelium2VersionRecovery() throws Exception { + + // Set up the InMemorySnapshotStore. + + InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null); + testStore.onGlobalContextUpdated(SCHEMA_CONTEXT); + + writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + NormalizedNode root = readStore(testStore, YangInstanceIdentifier.builder().build()); + + InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( + new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root). + getNormalizedNode().toByteString().toByteArray(), + Collections.emptyList(), 0, 1, -1, -1)); + + // Set up the InMemoryJournal. + + InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( + new WriteModification(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); + + int nListEntries = 16; + Set listEntryKeys = new HashSet<>(); + int i = 1; + + // Add some CompositeModificationPayload entries + for(; i <= 8; i++) { + listEntryKeys.add(Integer.valueOf(i)); + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); + Modification mod = new MergeModification(path, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); + InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + newLegacyPayload(mod))); + } + + // Add some CompositeModificationByteStringPayload entries + for(; i <= nListEntries; i++) { + listEntryKeys.add(Integer.valueOf(i)); + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); + Modification mod = new MergeModification(path, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); + InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + newLegacyByteStringPayload(mod))); + } + + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries)); + + testRecovery(listEntryKeys); + } + + @SuppressWarnings({ "unchecked" }) + @Test + public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testConcurrentThreePhaseCommits"); + + waitUntilLeader(shard); + + // Setup 3 simulated transactions with mock cohorts backed by real cohorts. + + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), + modification2); + + String transactionID3 = "tx3"; + MutableCompositeModification modification3 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), + modification3); + + long timeoutSec = 5; + final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); + final Timeout timeout = new Timeout(duration); + + // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( + expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); + + // Send the CanCommitTransaction message for the first Tx. + + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + // Send the ForwardedReadyTransaction for the next 2 Tx's. + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + + // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and + // processed after the first Tx completes. + + Future canCommitFuture1 = Patterns.ask(shard, + new CanCommitTransaction(transactionID2).toSerializable(), timeout); + + Future canCommitFuture2 = Patterns.ask(shard, + new CanCommitTransaction(transactionID3).toSerializable(), timeout); + + // Send the CommitTransaction message for the first Tx. After it completes, it should + // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd. + + shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + // Wait for the next 2 Tx's to complete. + + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch commitLatch = new CountDownLatch(2); + + class OnFutureComplete extends OnComplete { + private final Class expRespType; + + OnFutureComplete(final Class expRespType) { + this.expRespType = expRespType; + } + + @Override + public void onComplete(final Throwable error, final Object resp) { + if(error != null) { + caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error)); + } else { + try { + assertEquals("Commit response type", expRespType, resp.getClass()); + onSuccess(resp); + } catch (Exception e) { + caughtEx.set(e); + } + } + } + + void onSuccess(final Object resp) throws Exception { + } + } + + class OnCommitFutureComplete extends OnFutureComplete { + OnCommitFutureComplete() { + super(CommitTransactionReply.SERIALIZABLE_CLASS); + } + + @Override + public void onComplete(final Throwable error, final Object resp) { + super.onComplete(error, resp); + commitLatch.countDown(); + } + } + + class OnCanCommitFutureComplete extends OnFutureComplete { + private final String transactionID; + + OnCanCommitFutureComplete(final String transactionID) { + super(CanCommitTransactionReply.SERIALIZABLE_CLASS); + this.transactionID = transactionID; + } + + @Override + void onSuccess(final Object resp) throws Exception { + CanCommitTransactionReply canCommitReply = + CanCommitTransactionReply.fromSerializable(resp); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + Future commitFuture = Patterns.ask(shard, + new CommitTransaction(transactionID).toSerializable(), timeout); + commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher()); + } + } + + canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), + getSystem().dispatcher()); + + canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), + getSystem().dispatcher()); + + boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS); + + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Commits complete", true, done); + + InOrder inOrder = inOrder(cohort1, cohort2, cohort3); + inOrder.verify(cohort1).canCommit(); + inOrder.verify(cohort1).preCommit(); + inOrder.verify(cohort1).commit(); + inOrder.verify(cohort2).canCommit(); + inOrder.verify(cohort2).preCommit(); + inOrder.verify(cohort2).commit(); + inOrder.verify(cohort3).canCommit(); + inOrder.verify(cohort3).preCommit(); + inOrder.verify(cohort3).commit(); + + // Verify data in the data store. + + NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); + assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", + outerList.getValue() instanceof Iterable); + Object entry = ((Iterable)outerList.getValue()).iterator().next(); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", + entry instanceof MapEntryNode); + MapEntryNode mapEntry = (MapEntryNode)entry; + Optional> idLeaf = + mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); + assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); + assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); + + verifyLastLogIndex(shard, 2); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java similarity index 94% rename from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java rename to opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java index 1d1b08b5f8..2a29d2c089 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.cluster.datastore; +package org.opendaylight.controller.cluster.datastore.compat; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -16,6 +16,13 @@ import akka.testkit.TestActorRef; import java.util.Collections; import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.Shard; +import org.opendaylight.controller.cluster.datastore.ShardTest; +import org.opendaylight.controller.cluster.datastore.ShardTestKit; +import org.opendaylight.controller.cluster.datastore.TransactionProxy; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; @@ -46,7 +53,6 @@ import scala.concurrent.duration.FiniteDuration; */ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest { - @SuppressWarnings("unchecked") @Test public void testTransactionCommit() throws Exception { new ShardTestKit(getSystem()) {{