X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=4dbdce0c079fbfa8e0b53b4304817184a73c2cfc;hb=a6af137c30470b86d4bc624d4c48cb686495a182;hp=0bd34aac3ee768eb9c4abc0f872b0e83cf67f547;hpb=ec870dee9bacb971f11bc747b69e84ac37f5d746;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 0bd34aac3e..4dbdce0c07 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -34,10 +34,10 @@ import akka.util.Timeout; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.util.concurrent.Uninterruptibles; -import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -70,17 +70,14 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; -import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; -import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; -import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; @@ -105,8 +102,6 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; @@ -114,10 +109,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; @@ -129,137 +123,6 @@ public class ShardTest extends AbstractShardTest { private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in " + "InMemorySnapshotStore and journal recovery seq number will start from 1"; - @Test - public void testRegisterChangeListener() throws Exception { - new ShardTestKit(getSystem()) { - { - final TestActorRef shard = actorFactory.createTestActor( - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testRegisterChangeListener"); - - waitUntilLeader(shard); - - shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender()); - - final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, - TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener"); - - shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor, - AsyncDataBroker.DataChangeScope.BASE, true), getRef()); - - final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"), - RegisterChangeListenerReply.class); - final String replyPath = reply.getListenerRegistrationPath().toString(); - assertTrue("Incorrect reply path: " + replyPath, - replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*")); - - final YangInstanceIdentifier path = TestModel.TEST_PATH; - writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - listener.waitForChangeEvents(path); - } - }; - } - - @SuppressWarnings("serial") - @Test - public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception { - // This test tests the timing window in which a change listener is registered before the - // shard becomes the leader. We verify that the listener is registered and notified of the - // existing data when the shard becomes the leader. - // For this test, we want to send the RegisterChangeListener message after the shard - // has recovered from persistence and before it becomes the leader. So we subclass - // Shard to override onReceiveCommand and, when the first ElectionTimeout is received, - // we know that the shard has been initialized to a follower and has started the - // election process. The following 2 CountDownLatches are used to coordinate the - // ElectionTimeout with the sending of the RegisterChangeListener message. - final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1); - final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1); - final Creator creator = new Creator() { - boolean firstElectionTimeout = true; - - @Override - public Shard create() throws Exception { - // Use a non persistent provider because this test actually invokes persist on the journal - // this will cause all other messages to not be queued properly after that. - // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when - // it does do a persist) - return new Shard(newShardBuilder()) { - @Override - public void handleCommand(final Object message) { - if (message instanceof ElectionTimeout && firstElectionTimeout) { - // Got the first ElectionTimeout. We don't forward it to the - // base Shard yet until we've sent the RegisterChangeListener - // message. So we signal the onFirstElectionTimeout latch to tell - // the main thread to send the RegisterChangeListener message and - // start a thread to wait on the onChangeListenerRegistered latch, - // which the main thread signals after it has sent the message. - // After the onChangeListenerRegistered is triggered, we send the - // original ElectionTimeout message to proceed with the election. - firstElectionTimeout = false; - final ActorRef self = getSelf(); - new Thread() { - @Override - public void run() { - Uninterruptibles.awaitUninterruptibly( - onChangeListenerRegistered, 5, TimeUnit.SECONDS); - self.tell(message, self); - } - }.start(); - - onFirstElectionTimeout.countDown(); - } else { - super.handleCommand(message); - } - } - }; - } - }; - - setupInMemorySnapshotStore(); - - final YangInstanceIdentifier path = TestModel.TEST_PATH; - final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), - "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener"); - - final TestActorRef shard = actorFactory.createTestActor( - Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), - "testRegisterChangeListenerWhenNotLeaderInitially"); - - new ShardTestKit(getSystem()) { - { - // Wait until the shard receives the first ElectionTimeout - // message. - assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); - - // Now send the RegisterChangeListener and wait for the reply. - shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false), - getRef()); - - final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterChangeListenerReply.class); - assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); - - // Sanity check - verify the shard is not the leader yet. - shard.tell(FindLeader.INSTANCE, getRef()); - final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class); - assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent()); - - // Signal the onChangeListenerRegistered latch to tell the - // thread above to proceed - // with the election process. - onChangeListenerRegistered.countDown(); - - // Wait for the shard to become the leader and notify our - // listener with the existing - // data in the store. - listener.waitForChangeEvents(path); - } - }; - } - @Test public void testRegisterDataTreeChangeListener() throws Exception { new ShardTestKit(getSystem()) { @@ -278,8 +141,8 @@ public class ShardTest extends AbstractShardTest { shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef()); - final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"), - RegisterDataTreeChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"), + RegisterDataTreeNotificationListenerReply.class); final String replyPath = reply.getListenerRegistrationPath().toString(); assertTrue("Incorrect reply path: " + replyPath, replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*")); @@ -301,21 +164,18 @@ public class ShardTest extends AbstractShardTest { boolean firstElectionTimeout = true; @Override - public Shard create() throws Exception { + public Shard create() { return new Shard(newShardBuilder()) { @Override public void handleCommand(final Object message) { if (message instanceof ElectionTimeout && firstElectionTimeout) { firstElectionTimeout = false; final ActorRef self = getSelf(); - new Thread() { - @Override - public void run() { - Uninterruptibles.awaitUninterruptibly( - onChangeListenerRegistered, 5, TimeUnit.SECONDS); - self.tell(message, self); - } - }.start(); + new Thread(() -> { + Uninterruptibles.awaitUninterruptibly( + onChangeListenerRegistered, 5, TimeUnit.SECONDS); + self.tell(message, self); + }).start(); onFirstElectionTimeout.countDown(); } else { @@ -342,8 +202,8 @@ public class ShardTest extends AbstractShardTest { assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef()); - final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterDataTreeChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeNotificationListenerReply.class); assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); shard.tell(FindLeader.INSTANCE, getRef()); @@ -405,7 +265,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testPeerAddressResolved() throws Exception { + public void testPeerAddressResolved() { new ShardTestKit(getSystem()) { { final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), @@ -432,8 +292,8 @@ public class ShardTest extends AbstractShardTest { ShardTestKit.waitUntilLeader(shard); - final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - store.setSchemaContext(SCHEMA_CONTEXT); + final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, + SCHEMA_CONTEXT); final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier( new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) @@ -473,8 +333,8 @@ public class ShardTest extends AbstractShardTest { ShardTestKit.waitUntilLeader(shard); - final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - store.setSchemaContext(SCHEMA_CONTEXT); + final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, + SCHEMA_CONTEXT); final DataTreeModification writeMod = store.takeSnapshot().newModification(); final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -571,7 +431,7 @@ public class ShardTest extends AbstractShardTest { } } - void onSuccess(final Object resp) throws Exception { + void onSuccess(final Object resp) { } } @@ -596,7 +456,7 @@ public class ShardTest extends AbstractShardTest { } @Override - void onSuccess(final Object resp) throws Exception { + void onSuccess(final Object resp) { final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(resp); assertEquals("Can commit", true, canCommitReply.getCanCommit()); @@ -673,9 +533,10 @@ public class ShardTest extends AbstractShardTest { final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS); - if (caughtEx.get() != null) { - Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class); - Throwables.propagate(caughtEx.get()); + final Throwable t = caughtEx.get(); + if (t != null) { + Throwables.propagateIfPossible(t, Exception.class); + throw new RuntimeException(t); } assertEquals("Commits complete", true, done); @@ -697,13 +558,13 @@ public class ShardTest extends AbstractShardTest { verifyOuterListEntry(shard, 1); - verifyLastApplied(shard, 2); + verifyLastApplied(shard, 5); } }; } @Test - public void testBatchedModificationsWithNoCommitOnReady() throws Exception { + public void testBatchedModificationsWithNoCommitOnReady() { new ShardTestKit(getSystem()) { { final TestActorRef shard = actorFactory.createTestActor( @@ -756,7 +617,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testBatchedModificationsWithCommitOnReady() throws Exception { + public void testBatchedModificationsWithCommitOnReady() { new ShardTestKit(getSystem()) { { final TestActorRef shard = actorFactory.createTestActor( @@ -809,7 +670,7 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier transactionID = nextTransactionId(); final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION); - batched.setReady(true); + batched.setReady(); batched.setTotalMessagesSent(2); shard.tell(batched, getRef()); @@ -817,15 +678,15 @@ public class ShardTest extends AbstractShardTest { final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); if (failure != null) { - Throwables.propagateIfInstanceOf(failure.cause(), Exception.class); - Throwables.propagate(failure.cause()); + Throwables.propagateIfPossible(failure.cause(), Exception.class); + throw new RuntimeException(failure.cause()); } } }; } @Test - public void testBatchedModificationsWithOperationFailure() throws Exception { + public void testBatchedModificationsWithOperationFailure() { new ShardTestKit(getSystem()) { { final TestActorRef shard = actorFactory.createTestActor( @@ -852,7 +713,7 @@ public class ShardTest extends AbstractShardTest { final Throwable cause = failure.cause(); batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION); - batched.setReady(true); + batched.setReady(); batched.setTotalMessagesSent(2); shard.tell(batched, getRef()); @@ -864,7 +725,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testBatchedModificationsOnTransactionChain() throws Exception { + public void testBatchedModificationsOnTransactionChain() { new ShardTestKit(getSystem()) { { final TestActorRef shard = actorFactory.createTestActor( @@ -927,7 +788,7 @@ public class ShardTest extends AbstractShardTest { private static final long serialVersionUID = 1L; @Override - public Shard create() throws Exception { + public Shard create() { return new Shard(newShardBuilder()) { @Override protected boolean isLeader() { @@ -983,7 +844,8 @@ public class ShardTest extends AbstractShardTest { failure = expectMsgClass(Failure.class); assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); - shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef()); + shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()), + getRef()); failure = expectMsgClass(Failure.class); assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); } @@ -991,16 +853,16 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testReadyWithReadWriteImmediateCommit() throws Exception { + public void testReadyWithReadWriteImmediateCommit() { testReadyWithImmediateCommit(true); } @Test - public void testReadyWithWriteOnlyImmediateCommit() throws Exception { + public void testReadyWithWriteOnlyImmediateCommit() { testReadyWithImmediateCommit(false); } - private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception { + private void testReadyWithImmediateCommit(final boolean readWrite) { new ShardTestKit(getSystem()) { { final TestActorRef shard = actorFactory.createTestActor( @@ -1028,7 +890,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testReadyLocalTransactionWithImmediateCommit() throws Exception { + public void testReadyLocalTransactionWithImmediateCommit() { new ShardTestKit(getSystem()) { { final TestActorRef shard = actorFactory.createTestActor( @@ -1048,7 +910,8 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier txId = nextTransactionId(); modification.ready(); - final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true); + final ReadyLocalTransaction readyMessage = + new ReadyLocalTransaction(txId, modification, true, Optional.empty()); shard.tell(readyMessage, getRef()); @@ -1061,7 +924,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception { + public void testReadyLocalTransactionWithThreePhaseCommit() { new ShardTestKit(getSystem()) { { final TestActorRef shard = actorFactory.createTestActor( @@ -1081,7 +944,8 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier txId = nextTransactionId(); modification.ready(); - final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false); + final ReadyLocalTransaction readyMessage = + new ReadyLocalTransaction(txId, modification, false, Optional.empty()); shard.tell(readyMessage, getRef()); @@ -1106,7 +970,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testReadWriteCommitWithPersistenceDisabled() throws Exception { + public void testReadWriteCommitWithPersistenceDisabled() { dataStoreContextBuilder.persistent(false); new ShardTestKit(getSystem()) { { @@ -1157,7 +1021,7 @@ public class ShardTest extends AbstractShardTest { private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception { new ShardTestKit(getSystem()) { { - final TipProducingDataTree dataTree = createDelegatingMockDataTree(); + final DataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCommitWhenTransactionHasModifications-" + readWrite); @@ -1192,6 +1056,10 @@ public class ShardTest extends AbstractShardTest { inOrder.verify(dataTree).prepare(any(DataTreeModification.class)); inOrder.verify(dataTree).commit(any(DataTreeCandidate.class)); + // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into + // the journal + Thread.sleep(HEARTBEAT_MILLIS * 2); + shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef()); final ShardStats shardStats = expectMsgClass(duration, ShardStats.class); @@ -1201,7 +1069,7 @@ public class ShardTest extends AbstractShardTest { // Commit index should advance as we do not have an empty // modification - assertEquals(0, shardStats.getCommitIndex()); + assertEquals(1, shardStats.getCommitIndex()); } }; } @@ -1210,7 +1078,7 @@ public class ShardTest extends AbstractShardTest { public void testCommitPhaseFailure() throws Exception { new ShardTestKit(getSystem()) { { - final TipProducingDataTree dataTree = createDelegatingMockDataTree(); + final DataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCommitPhaseFailure"); @@ -1287,7 +1155,7 @@ public class ShardTest extends AbstractShardTest { public void testPreCommitPhaseFailure() throws Exception { new ShardTestKit(getSystem()) { { - final TipProducingDataTree dataTree = createDelegatingMockDataTree(); + final DataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPreCommitPhaseFailure"); @@ -1355,7 +1223,7 @@ public class ShardTest extends AbstractShardTest { public void testCanCommitPhaseFailure() throws Exception { new ShardTestKit(getSystem()) { { - final TipProducingDataTree dataTree = createDelegatingMockDataTree(); + final DataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCanCommitPhaseFailure"); @@ -1402,7 +1270,7 @@ public class ShardTest extends AbstractShardTest { private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception { new ShardTestKit(getSystem()) { { - final TipProducingDataTree dataTree = createDelegatingMockDataTree(); + final DataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite); @@ -1444,7 +1312,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testAbortWithCommitPending() throws Exception { + public void testAbortWithCommitPending() { new ShardTestKit(getSystem()) { { final Creator creator = () -> new Shard(newShardBuilder()) { @@ -1627,7 +1495,7 @@ public class ShardTest extends AbstractShardTest { // } @Test - public void testTransactionCommitWithPriorExpiredCohortEntries() throws Exception { + public void testTransactionCommitWithPriorExpiredCohortEntries() { dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) { { @@ -1666,7 +1534,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Exception { + public void testTransactionCommitWithSubsequentExpiredCohortEntry() { dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) { { @@ -1705,7 +1573,7 @@ public class ShardTest extends AbstractShardTest { .apply(modification3); modification3.ready(); final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, - true); + true, Optional.empty()); shard.tell(readyMessage, getRef()); // Commit the first Tx. After completing, the second should @@ -1726,7 +1594,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testCanCommitBeforeReadyFailure() throws Exception { + public void testCanCommitBeforeReadyFailure() { new ShardTestKit(getSystem()) { { final TestActorRef shard = actorFactory.createTestActor( @@ -1793,7 +1661,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testAbortAfterReady() throws Exception { + public void testAbortAfterReady() { dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); new ShardTestKit(getSystem()) { { @@ -1838,7 +1706,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testAbortQueuedTransaction() throws Exception { + public void testAbortQueuedTransaction() { new ShardTestKit(getSystem()) { { final TestActorRef shard = actorFactory.createTestActor( @@ -1965,7 +1833,7 @@ public class ShardTest extends AbstractShardTest { } private void awaitAndValidateSnapshot(final NormalizedNode expectedRoot) - throws InterruptedException, IOException { + throws InterruptedException { assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot); @@ -1976,8 +1844,7 @@ public class ShardTest extends AbstractShardTest { savedSnapshot.set(null); } - private void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) - throws IOException { + private void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) { final NormalizedNode actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot() .getRootNode().get(); assertEquals("Root node", expectedRoot, actual); @@ -1989,9 +1856,9 @@ public class ShardTest extends AbstractShardTest { * This test simply verifies that the applySnapShot logic will work. */ @Test - public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException { - final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - store.setSchemaContext(SCHEMA_CONTEXT); + public void testInMemoryDataTreeRestore() throws DataValidationFailedException { + final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, + SCHEMA_CONTEXT); final DataTreeModification putTransaction = store.takeSnapshot().newModification(); putTransaction.write(TestModel.TEST_PATH, @@ -2020,13 +1887,13 @@ public class ShardTest extends AbstractShardTest { .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build(); final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext) - .schemaContext(SCHEMA_CONTEXT).props(); + .schemaContextProvider(() -> SCHEMA_CONTEXT).props(); final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder() .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build(); final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext) - .schemaContext(SCHEMA_CONTEXT).props(); + .schemaContextProvider(() -> SCHEMA_CONTEXT).props(); new ShardTestKit(getSystem()) { { @@ -2069,7 +1936,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testRegisterRoleChangeListener() throws Exception { + public void testRegisterRoleChangeListener() { new ShardTestKit(getSystem()) { { final TestActorRef shard = actorFactory.createTestActor( @@ -2078,8 +1945,7 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - final TestActorRef listener = - TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props()); shard.tell(new RegisterRoleChangeListener(), listener); @@ -2107,7 +1973,7 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testFollowerInitialSyncStatus() throws Exception { + public void testFollowerInitialSyncStatus() { final TestActorRef shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testFollowerInitialSyncStatus"); @@ -2123,90 +1989,6 @@ public class ShardTest extends AbstractShardTest { assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus()); } - @Test - public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception { - new ShardTestKit(getSystem()) { - { - final String testName = "testClusteredDataChangeListenerWithDelayedRegistration"; - dataStoreContextBuilder.shardElectionTimeoutFactor(1000) - .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); - - final YangInstanceIdentifier path = TestModel.TEST_PATH; - final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), - actorFactory.generateActorId(testName + "-DataChangeListener")); - - setupInMemorySnapshotStore(); - - final TestActorRef shard = actorFactory.createTestActor( - newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), - actorFactory.generateActorId(testName + "-shard")); - - waitUntilNoLeader(shard); - - shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), - getRef()); - final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterChangeListenerReply.class); - assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); - - shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) - .customRaftPolicyImplementation(null).build(), ActorRef.noSender()); - - listener.waitForChangeEvents(); - } - }; - } - - @Test - public void testClusteredDataChangeListenerRegistration() throws Exception { - new ShardTestKit(getSystem()) { - { - final String testName = "testClusteredDataChangeListenerRegistration"; - final ShardIdentifier followerShardID = ShardIdentifier.create("inventory", - MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config"); - - final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory", - MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config"); - - final TestActorRef followerShard = actorFactory - .createTestActor(Shard.builder().id(followerShardID) - .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()) - .peerAddresses(Collections.singletonMap(leaderShardID.toString(), - "akka://test/user/" + leaderShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() - .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); - - final TestActorRef leaderShard = actorFactory - .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()) - .peerAddresses(Collections.singletonMap(followerShardID.toString(), - "akka://test/user/" + followerShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() - .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); - - leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); - final String leaderPath = waitUntilLeader(followerShard); - assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath); - - final YangInstanceIdentifier path = TestModel.TEST_PATH; - final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), - actorFactory.generateActorId(testName + "-DataChangeListener")); - - followerShard.tell( - new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), - getRef()); - final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterChangeListenerReply.class); - assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); - - writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - listener.waitForChangeEvents(); - } - }; - } - @Test public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception { new ShardTestKit(getSystem()) { @@ -2228,8 +2010,8 @@ public class ShardTest extends AbstractShardTest { waitUntilNoLeader(shard); shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); - final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterDataTreeChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeNotificationListenerReply.class); assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) @@ -2261,8 +2043,8 @@ public class ShardTest extends AbstractShardTest { waitUntilNoLeader(shard); shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); - final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterDataTreeChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeNotificationListenerReply.class); assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath()); @@ -2293,14 +2075,14 @@ public class ShardTest extends AbstractShardTest { .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()) .peerAddresses(Collections.singletonMap(leaderShardID.toString(), "akka://test/user/" + leaderShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() + .schemaContextProvider(() -> SCHEMA_CONTEXT).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); final TestActorRef leaderShard = actorFactory .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()) .peerAddresses(Collections.singletonMap(followerShardID.toString(), "akka://test/user/" + followerShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() + .schemaContextProvider(() -> SCHEMA_CONTEXT).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); @@ -2313,8 +2095,8 @@ public class ShardTest extends AbstractShardTest { actorFactory.generateActorId(testName + "-DataTreeChangeListener")); followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); - final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterDataTreeChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeNotificationListenerReply.class); assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -2325,8 +2107,9 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testServerRemoved() throws Exception { - final TestActorRef parent = actorFactory.createTestActor(MessageCollectorActor.props()); + public void testServerRemoved() { + final TestActorRef parent = actorFactory.createTestActor(MessageCollectorActor.props() + .withDispatcher(Dispatchers.DefaultDispatcherId())); final ActorRef shard = parent.underlyingActor().context().actorOf( newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),