X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=d2a08495cdf1eda71f4319560bf09cc36fe9ba2a;hb=8e1d3c4f9001fbc8a5d3d3bea57916c5099078b2;hp=46fdfacbe956497db88acba4e9d2de831aa6991e;hpb=4944f54d4e1fc24404d55e4ab74b6de212844dcd;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 46fdfacbe9..d2a08495cd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -7,6 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.endsWith; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -52,7 +55,6 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -91,30 +93,31 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.messages.Payload; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.tree.api.DataTree; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -169,7 +172,7 @@ public class ShardTest extends AbstractShardTest { public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception { final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1); final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1); - final Creator creator = new Creator() { + final Creator creator = new Creator<>() { boolean firstElectionTimeout = true; @Override @@ -202,8 +205,8 @@ public class ShardTest extends AbstractShardTest { final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path), "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener"); - final TestActorRef shard = actorFactory.createTestActor( - Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), + final TestActorRef shard = actorFactory.createTestActor(Props.create(Shard.class, + new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration"); final ShardTestKit testKit = new ShardTestKit(getSystem()); @@ -240,8 +243,7 @@ public class ShardTest extends AbstractShardTest { CreateTransactionReply.class); final String path = reply.getTransactionPath().toString(); - assertTrue("Unexpected transaction path " + path, path.startsWith(String.format( - "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:", + assertThat(path, containsString(String.format("/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:", shardID.getShardName(), shardID.getMemberName().getName()))); } @@ -259,8 +261,8 @@ public class ShardTest extends AbstractShardTest { CreateTransactionReply.class); final String path = reply.getTransactionPath().toString(); - assertTrue("Unexpected transaction path " + path, path.startsWith(String.format( - "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:", + assertThat(path, containsString(String.format( + "/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:", shardID.getShardName(), shardID.getMemberName().getName()))); } @@ -292,15 +294,17 @@ public class ShardTest extends AbstractShardTest { final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, SCHEMA_CONTEXT); - final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) - .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild( - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build(); + final ContainerNode container = Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)) + .build()) + .build(); writeToStore(store, TestModel.TEST_PATH, container); - final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY; - final NormalizedNode expected = readStore(store, root); + final YangInstanceIdentifier root = YangInstanceIdentifier.empty(); + final NormalizedNode expected = readStore(store, root); final Snapshot snapshot = Snapshot.create(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)), Collections.emptyList(), 1, 2, 3, 4, -1, null, null); @@ -344,7 +348,7 @@ public class ShardTest extends AbstractShardTest { while (sw.elapsed(TimeUnit.SECONDS) <= 5) { Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS); - final NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); + final NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); if (actual != null) { assertEquals("Applied state", node, actual); return; @@ -373,7 +377,7 @@ public class ShardTest extends AbstractShardTest { // Add some ModificationPayload entries for (int i = 1; i <= nListEntries; i++) { - listEntryKeys.add(Integer.valueOf(i)); + listEntryKeys.add(i); final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); @@ -389,7 +393,7 @@ public class ShardTest extends AbstractShardTest { InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyJournalEntries(nListEntries)); - testRecovery(listEntryKeys); + testRecovery(listEntryKeys, true); } @Test @@ -479,7 +483,9 @@ public class ShardTest extends AbstractShardTest { ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef()); final ReadyTransactionReply readyReply = ReadyTransactionReply .fromSerializable(testKit.expectMsgClass(duration, ReadyTransactionReply.class)); - assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); + + String pathSuffix = shard.path().toString().replaceFirst("akka://test", ""); + assertThat(readyReply.getCohortPath(), endsWith(pathSuffix)); // Send the CanCommitTransaction message for the first Tx. shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef()); @@ -680,9 +686,10 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier transactionID = nextTransactionId(); - final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + final ContainerNode invalidData = Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")) + .build(); BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION); batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData)); @@ -749,7 +756,7 @@ public class ShardTest extends AbstractShardTest { // Verify data in the data store. - final NormalizedNode actualNode = readStore(shard, path); + final NormalizedNode actualNode = readStore(shard, path); assertEquals("Stored node", containerNode, actualNode); } @@ -757,7 +764,7 @@ public class ShardTest extends AbstractShardTest { public void testOnBatchedModificationsWhenNotLeader() { final AtomicBoolean overrideLeaderCalls = new AtomicBoolean(); final ShardTestKit testKit = new ShardTestKit(getSystem()); - final Creator creator = new Creator() { + final Creator creator = new Creator<>() { private static final long serialVersionUID = 1L; @Override @@ -777,8 +784,8 @@ public class ShardTest extends AbstractShardTest { } }; - final TestActorRef shard = actorFactory.createTestActor(Props - .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), + final TestActorRef shard = actorFactory.createTestActor(Props.create(Shard.class, + new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testOnBatchedModificationsWhenNotLeader"); ShardTestKit.waitUntilLeader(shard); @@ -839,7 +846,7 @@ public class ShardTest extends AbstractShardTest { ShardTestKit.waitUntilLeader(shard); final TransactionIdentifier transactionID = nextTransactionId(); - final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); if (readWrite) { shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH, containerNode, true), testKit.getRef()); @@ -850,7 +857,7 @@ public class ShardTest extends AbstractShardTest { testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class); - final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); + final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); } @@ -883,7 +890,7 @@ public class ShardTest extends AbstractShardTest { testKit.expectMsgClass(CommitTransactionReply.class); - final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); + final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode); } @@ -928,7 +935,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef()); testKit.expectMsgClass(CommitTransactionReply.class); - final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); + final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode); } @@ -947,7 +954,7 @@ public class ShardTest extends AbstractShardTest { final Duration duration = Duration.ofSeconds(5); final TransactionIdentifier transactionID = nextTransactionId(); - final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false), testKit.getRef()); testKit.expectMsgClass(duration, ReadyTransactionReply.class); @@ -964,7 +971,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef()); testKit.expectMsgClass(duration, CommitTransactionReply.class); - final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); + final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); } @@ -1084,7 +1091,7 @@ public class ShardTest extends AbstractShardTest { // Wait for the 2nd Tx to complete the canCommit phase. final CountDownLatch latch = new CountDownLatch(1); - canCommitFuture.onComplete(new OnComplete() { + canCommitFuture.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object resp) { latch.countDown(); @@ -1154,7 +1161,7 @@ public class ShardTest extends AbstractShardTest { // Wait for the 2nd Tx to complete the canCommit phase. final CountDownLatch latch = new CountDownLatch(1); - canCommitFuture.onComplete(new OnComplete() { + canCommitFuture.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object resp) { latch.countDown(); @@ -1182,7 +1189,7 @@ public class ShardTest extends AbstractShardTest { final Duration duration = Duration.ofSeconds(5); final TransactionIdentifier transactionID1 = nextTransactionId(); - doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")) + doThrow(new DataValidationFailedException(YangInstanceIdentifier.empty(), "mock canCommit failure")) .doNothing().when(dataTree).validate(any(DataTreeModification.class)); shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, @@ -1223,7 +1230,7 @@ public class ShardTest extends AbstractShardTest { ShardTestKit.waitUntilLeader(shard); - doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")) + doThrow(new DataValidationFailedException(YangInstanceIdentifier.empty(), "mock canCommit failure")) .doNothing().when(dataTree).validate(any(DataTreeModification.class)); final Duration duration = Duration.ofSeconds(5); @@ -1260,8 +1267,7 @@ public class ShardTest extends AbstractShardTest { final ShardTestKit testKit = new ShardTestKit(getSystem()); final Creator creator = () -> new Shard(newShardBuilder()) { @Override - void persistPayload(final Identifier id, final Payload payload, - final boolean batchHint) { + void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) { // Simulate an AbortTransaction message occurring during // replication, after // persisting and before finishing the commit to the @@ -1272,8 +1278,8 @@ public class ShardTest extends AbstractShardTest { } }; - final TestActorRef shard = actorFactory.createTestActor(Props - .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), + final TestActorRef shard = actorFactory.createTestActor(Props.create(Shard.class, + new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortWithCommitPending"); ShardTestKit.waitUntilLeader(shard); @@ -1292,7 +1298,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef()); testKit.expectMsgClass(duration, CommitTransactionReply.class); - final NormalizedNode node = readStore(shard, TestModel.TEST_PATH); + final NormalizedNode node = readStore(shard, TestModel.TEST_PATH); // Since we're simulating an abort occurring during replication // and before finish commit, @@ -1360,7 +1366,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef()); testKit.expectMsgClass(duration, CommitTransactionReply.class); - final NormalizedNode node = readStore(shard, listNodePath); + final NormalizedNode node = readStore(shard, listNodePath); assertNotNull(listNodePath + " not found", node); } @@ -1520,7 +1526,7 @@ public class ShardTest extends AbstractShardTest { testKit.expectMsgClass(duration, CommitTransactionReply.class); - final NormalizedNode node = readStore(shard, TestModel.TEST2_PATH); + final NormalizedNode node = readStore(shard, TestModel.TEST2_PATH); assertNotNull(TestModel.TEST2_PATH + " not found", node); } @@ -1705,9 +1711,9 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.persistent(persistent); - class TestShard extends Shard { + final class TestShard extends Shard { - protected TestShard(final AbstractBuilder builder) { + TestShard(final AbstractBuilder builder) { super(builder); setPersistence(new TestPersistentDataProvider(super.persistence())); } @@ -1730,14 +1736,13 @@ public class ShardTest extends AbstractShardTest { final Creator creator = () -> new TestShard(newShardBuilder()); - final TestActorRef shard = actorFactory.createTestActor(Props - .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), - shardActorName); + final TestActorRef shard = actorFactory.createTestActor(Props.create(Shard.class, + new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), shardActorName); ShardTestKit.waitUntilLeader(shard); writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - final NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.EMPTY); + final NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.empty()); // Trigger creation of a snapshot by ensuring final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext(); @@ -1749,7 +1754,7 @@ public class ShardTest extends AbstractShardTest { } private static void awaitAndValidateSnapshot(final AtomicReference latch, - final AtomicReference savedSnapshot, final NormalizedNode expectedRoot) + final AtomicReference savedSnapshot, final NormalizedNode expectedRoot) throws InterruptedException { assertTrue("Snapshot saved", latch.get().await(5, TimeUnit.SECONDS)); @@ -1761,9 +1766,9 @@ public class ShardTest extends AbstractShardTest { savedSnapshot.set(null); } - private static void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) { - final NormalizedNode actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot().getRootNode().get(); - assertEquals("Root node", expectedRoot, actual); + private static void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) { + assertEquals("Root node", expectedRoot, + ((ShardSnapshotState)snapshot.getState()).getSnapshot().getRootNode().orElseThrow()); } /** @@ -1780,16 +1785,16 @@ public class ShardTest extends AbstractShardTest { commitTransaction(store, putTransaction); - final NormalizedNode expected = readStore(store, YangInstanceIdentifier.EMPTY); + final NormalizedNode expected = readStore(store, YangInstanceIdentifier.empty()); final DataTreeModification writeTransaction = store.takeSnapshot().newModification(); - writeTransaction.delete(YangInstanceIdentifier.EMPTY); - writeTransaction.write(YangInstanceIdentifier.EMPTY, expected); + writeTransaction.delete(YangInstanceIdentifier.empty()); + writeTransaction.write(YangInstanceIdentifier.empty(), expected); commitTransaction(store, writeTransaction); - final NormalizedNode actual = readStore(store, YangInstanceIdentifier.EMPTY); + final NormalizedNode actual = readStore(store, YangInstanceIdentifier.empty()); assertEquals(expected, actual); } @@ -1856,7 +1861,7 @@ public class ShardTest extends AbstractShardTest { ShardLeaderStateChanged.class); assertTrue("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent()); assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(), - leaderStateChanged.getLocalShardDataTree().get()); + leaderStateChanged.getLocalShardDataTree().orElseThrow()); MessageCollectorActor.clearMessages(listener);