X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=76745a442ca0e9d09100e447e7c6a25ce54d6162;hp=6cfde54af73e9e147867bf4c4c9f41be6c2dd6c8;hb=20a32e6459fd1e27e7669bf1ebc7742b96787b94;hpb=de64c6bbf2d5aeb51f4036f9dd606a9bf6f71afb diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 6cfde54af7..76745a442c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -59,6 +59,8 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; @@ -69,9 +71,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; -import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; -import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -112,10 +113,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; @@ -146,8 +146,8 @@ public class ShardTest extends AbstractShardTest { shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); - final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"), - RegisterChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"), + RegisterDataTreeNotificationListenerReply.class); final String replyPath = reply.getListenerRegistrationPath().toString(); assertTrue("Incorrect reply path: " + replyPath, replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*")); @@ -197,14 +197,11 @@ public class ShardTest extends AbstractShardTest { // original ElectionTimeout message to proceed with the election. firstElectionTimeout = false; final ActorRef self = getSelf(); - new Thread() { - @Override - public void run() { - Uninterruptibles.awaitUninterruptibly( - onChangeListenerRegistered, 5, TimeUnit.SECONDS); - self.tell(message, self); - } - }.start(); + new Thread(() -> { + Uninterruptibles.awaitUninterruptibly( + onChangeListenerRegistered, 5, TimeUnit.SECONDS); + self.tell(message, self); + }).start(); onFirstElectionTimeout.countDown(); } else { @@ -236,8 +233,8 @@ public class ShardTest extends AbstractShardTest { shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef()); - final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeNotificationListenerReply.class); assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); // Sanity check - verify the shard is not the leader yet. @@ -276,8 +273,8 @@ public class ShardTest extends AbstractShardTest { shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef()); - final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"), - RegisterDataTreeChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"), + RegisterDataTreeNotificationListenerReply.class); final String replyPath = reply.getListenerRegistrationPath().toString(); assertTrue("Incorrect reply path: " + replyPath, replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*")); @@ -306,14 +303,11 @@ public class ShardTest extends AbstractShardTest { if (message instanceof ElectionTimeout && firstElectionTimeout) { firstElectionTimeout = false; final ActorRef self = getSelf(); - new Thread() { - @Override - public void run() { - Uninterruptibles.awaitUninterruptibly( - onChangeListenerRegistered, 5, TimeUnit.SECONDS); - self.tell(message, self); - } - }.start(); + new Thread(() -> { + Uninterruptibles.awaitUninterruptibly( + onChangeListenerRegistered, 5, TimeUnit.SECONDS); + self.tell(message, self); + }).start(); onFirstElectionTimeout.countDown(); } else { @@ -340,8 +334,8 @@ public class ShardTest extends AbstractShardTest { assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef()); - final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterDataTreeChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeNotificationListenerReply.class); assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); shard.tell(FindLeader.INSTANCE, getRef()); @@ -373,8 +367,9 @@ public class ShardTest extends AbstractShardTest { CreateTransactionReply.class); final String path = reply.getTransactionPath().toString(); - assertTrue("Unexpected transaction path " + path, path - .startsWith("akka://test/user/testCreateTransaction/shard-member-1:ShardTransactionTest@0:")); + assertTrue("Unexpected transaction path " + path, path.startsWith(String.format( + "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:", + shardID.getShardName(), shardID.getMemberName().getName()))); } }; } @@ -394,8 +389,9 @@ public class ShardTest extends AbstractShardTest { CreateTransactionReply.class); final String path = reply.getTransactionPath().toString(); - assertTrue("Unexpected transaction path " + path, path.startsWith( - "akka://test/user/testCreateTransactionOnChain/shard-member-1:ShardTransactionTest@0:")); + assertTrue("Unexpected transaction path " + path, path.startsWith(String.format( + "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:", + shardID.getShardName(), shardID.getMemberName().getName()))); } }; } @@ -428,8 +424,8 @@ public class ShardTest extends AbstractShardTest { ShardTestKit.waitUntilLeader(shard); - final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - store.setSchemaContext(SCHEMA_CONTEXT); + final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, + SCHEMA_CONTEXT); final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier( new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) @@ -469,8 +465,8 @@ public class ShardTest extends AbstractShardTest { ShardTestKit.waitUntilLeader(shard); - final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - store.setSchemaContext(SCHEMA_CONTEXT); + final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, + SCHEMA_CONTEXT); final DataTreeModification writeMod = store.takeSnapshot().newModification(); final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -622,7 +618,6 @@ public class ShardTest extends AbstractShardTest { final ReadyTransactionReply readyReply = ReadyTransactionReply .fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); - // Send the CanCommitTransaction message for the first Tx. shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); @@ -670,9 +665,10 @@ public class ShardTest extends AbstractShardTest { final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS); - if (caughtEx.get() != null) { - Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class); - Throwables.propagate(caughtEx.get()); + final Throwable t = caughtEx.get(); + if (t != null) { + Throwables.propagateIfPossible(t, Exception.class); + throw new RuntimeException(t); } assertEquals("Commits complete", true, done); @@ -694,7 +690,7 @@ public class ShardTest extends AbstractShardTest { verifyOuterListEntry(shard, 1); - verifyLastApplied(shard, 2); + verifyLastApplied(shard, 5); } }; } @@ -814,8 +810,8 @@ public class ShardTest extends AbstractShardTest { final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); if (failure != null) { - Throwables.propagateIfInstanceOf(failure.cause(), Exception.class); - Throwables.propagate(failure.cause()); + Throwables.propagateIfPossible(failure.cause(), Exception.class); + throw new RuntimeException(failure.cause()); } } }; @@ -1154,7 +1150,7 @@ public class ShardTest extends AbstractShardTest { private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception { new ShardTestKit(getSystem()) { { - final TipProducingDataTree dataTree = createDelegatingMockDataTree(); + final DataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCommitWhenTransactionHasModifications-" + readWrite); @@ -1189,6 +1185,10 @@ public class ShardTest extends AbstractShardTest { inOrder.verify(dataTree).prepare(any(DataTreeModification.class)); inOrder.verify(dataTree).commit(any(DataTreeCandidate.class)); + // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into + // the journal + Thread.sleep(HEARTBEAT_MILLIS * 2); + shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef()); final ShardStats shardStats = expectMsgClass(duration, ShardStats.class); @@ -1198,7 +1198,7 @@ public class ShardTest extends AbstractShardTest { // Commit index should advance as we do not have an empty // modification - assertEquals(0, shardStats.getCommitIndex()); + assertEquals(1, shardStats.getCommitIndex()); } }; } @@ -1207,7 +1207,7 @@ public class ShardTest extends AbstractShardTest { public void testCommitPhaseFailure() throws Exception { new ShardTestKit(getSystem()) { { - final TipProducingDataTree dataTree = createDelegatingMockDataTree(); + final DataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCommitPhaseFailure"); @@ -1284,7 +1284,7 @@ public class ShardTest extends AbstractShardTest { public void testPreCommitPhaseFailure() throws Exception { new ShardTestKit(getSystem()) { { - final TipProducingDataTree dataTree = createDelegatingMockDataTree(); + final DataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPreCommitPhaseFailure"); @@ -1352,7 +1352,7 @@ public class ShardTest extends AbstractShardTest { public void testCanCommitPhaseFailure() throws Exception { new ShardTestKit(getSystem()) { { - final TipProducingDataTree dataTree = createDelegatingMockDataTree(); + final DataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCanCommitPhaseFailure"); @@ -1399,7 +1399,7 @@ public class ShardTest extends AbstractShardTest { private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception { new ShardTestKit(getSystem()) { { - final TipProducingDataTree dataTree = createDelegatingMockDataTree(); + final DataTree dataTree = createDelegatingMockDataTree(); final TestActorRef shard = actorFactory.createTestActor( newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite); @@ -1987,8 +1987,8 @@ public class ShardTest extends AbstractShardTest { */ @Test public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException { - final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - store.setSchemaContext(SCHEMA_CONTEXT); + final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, + SCHEMA_CONTEXT); final DataTreeModification putTransaction = store.takeSnapshot().newModification(); putTransaction.write(TestModel.TEST_PATH, @@ -2017,13 +2017,13 @@ public class ShardTest extends AbstractShardTest { .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build(); final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext) - .schemaContext(SCHEMA_CONTEXT).props(); + .schemaContextProvider(() -> SCHEMA_CONTEXT).props(); final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder() .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build(); final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext) - .schemaContext(SCHEMA_CONTEXT).props(); + .schemaContextProvider(() -> SCHEMA_CONTEXT).props(); new ShardTestKit(getSystem()) { { @@ -2075,8 +2075,7 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - final TestActorRef listener = - TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props()); shard.tell(new RegisterRoleChangeListener(), listener); @@ -2121,10 +2120,10 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testClusteredDataChangeListenerDelayedRegistration() throws Exception { + public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception { new ShardTestKit(getSystem()) { { - final String testName = "testClusteredDataChangeListenerDelayedRegistration"; + final String testName = "testClusteredDataChangeListenerWithDelayedRegistration"; dataStoreContextBuilder.shardElectionTimeoutFactor(1000) .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); @@ -2143,8 +2142,8 @@ public class ShardTest extends AbstractShardTest { shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); - final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeNotificationListenerReply.class); assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) @@ -2171,14 +2170,14 @@ public class ShardTest extends AbstractShardTest { .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()) .peerAddresses(Collections.singletonMap(leaderShardID.toString(), "akka://test/user/" + leaderShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() + .schemaContextProvider(() -> SCHEMA_CONTEXT).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); final TestActorRef leaderShard = actorFactory .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()) .peerAddresses(Collections.singletonMap(followerShardID.toString(), "akka://test/user/" + followerShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() + .schemaContextProvider(() -> SCHEMA_CONTEXT).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); @@ -2193,9 +2192,9 @@ public class ShardTest extends AbstractShardTest { followerShard.tell( new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); - final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterChangeListenerReply.class); - assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeNotificationListenerReply.class); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -2205,10 +2204,10 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception { + public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception { new ShardTestKit(getSystem()) { { - final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration"; + final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration"; dataStoreContextBuilder.shardElectionTimeoutFactor(1000) .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); @@ -2225,8 +2224,8 @@ public class ShardTest extends AbstractShardTest { waitUntilNoLeader(shard); shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); - final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterDataTreeChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeNotificationListenerReply.class); assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) @@ -2237,6 +2236,43 @@ public class ShardTest extends AbstractShardTest { }; } + @Test + public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception { + new ShardTestKit(getSystem()) { + { + final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed"; + dataStoreContextBuilder.shardElectionTimeoutFactor(1000) + .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); + + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, + TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener")); + + setupInMemorySnapshotStore(); + + final TestActorRef shard = actorFactory.createTestActor( + newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(testName + "-shard")); + + waitUntilNoLeader(shard); + + shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeNotificationListenerReply.class); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); + + final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath()); + regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef()); + expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class); + + shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) + .customRaftPolicyImplementation(null).build(), ActorRef.noSender()); + + listener.expectNoMoreChanges("Received unexpected change after close"); + } + }; + } + @Test public void testClusteredDataTreeChangeListenerRegistration() throws Exception { new ShardTestKit(getSystem()) { @@ -2253,14 +2289,14 @@ public class ShardTest extends AbstractShardTest { .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()) .peerAddresses(Collections.singletonMap(leaderShardID.toString(), "akka://test/user/" + leaderShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() + .schemaContextProvider(() -> SCHEMA_CONTEXT).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); final TestActorRef leaderShard = actorFactory .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()) .peerAddresses(Collections.singletonMap(followerShardID.toString(), "akka://test/user/" + followerShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() + .schemaContextProvider(() -> SCHEMA_CONTEXT).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); @@ -2273,8 +2309,8 @@ public class ShardTest extends AbstractShardTest { actorFactory.generateActorId(testName + "-DataTreeChangeListener")); followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); - final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterDataTreeChangeListenerReply.class); + final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeNotificationListenerReply.class); assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -2286,7 +2322,8 @@ public class ShardTest extends AbstractShardTest { @Test public void testServerRemoved() throws Exception { - final TestActorRef parent = actorFactory.createTestActor(MessageCollectorActor.props()); + final TestActorRef parent = actorFactory.createTestActor(MessageCollectorActor.props() + .withDispatcher(Dispatchers.DefaultDispatcherId())); final ActorRef shard = parent.underlyingActor().context().actorOf( newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),