+ @Test
+ public void testClusteredDataChangeListenerRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final String testName = "testClusteredDataChangeListenerRegistration";
+ final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
+
+ final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
+
+ final TestActorRef<Shard> followerShard = actorFactory
+ .createTestActor(Shard.builder().id(followerShardID)
+ .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
+ .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+ "akka://test/user/" + leaderShardID.toString()))
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+ final TestActorRef<Shard> leaderShard = actorFactory
+ .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
+ .peerAddresses(Collections.singletonMap(followerShardID.toString(),
+ "akka://test/user/" + followerShardID.toString()))
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+ leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+ final String leaderPath = waitUntilLeader(followerShard);
+ assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
+ actorFactory.generateActorId(testName + "-DataChangeListener"));
+
+ followerShard.tell(
+ new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
+ getRef());
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
+ }
+ };
+ }
+
+ @Test
+ public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
+ dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+ TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+ setupInMemorySnapshotStore();
+
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(testName + "-shard"));
+
+ waitUntilNoLeader(shard);
+
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
+ .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
+
+ listener.waitForChangeEvents();
+ }
+ };
+ }
+
+ @Test
+ public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
+ dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+ TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+ setupInMemorySnapshotStore();
+
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(testName + "-shard"));
+
+ waitUntilNoLeader(shard);
+
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
+ regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+ expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
+
+ shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
+ .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
+
+ listener.expectNoMoreChanges("Received unexpected change after close");
+ }
+ };
+ }
+
+ @Test
+ public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final String testName = "testClusteredDataTreeChangeListenerRegistration";
+ final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
+
+ final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
+
+ final TestActorRef<Shard> followerShard = actorFactory
+ .createTestActor(Shard.builder().id(followerShardID)
+ .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
+ .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+ "akka://test/user/" + leaderShardID.toString()))
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+ final TestActorRef<Shard> leaderShard = actorFactory
+ .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
+ .peerAddresses(Collections.singletonMap(followerShardID.toString(),
+ "akka://test/user/" + followerShardID.toString()))
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+ leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+ final String leaderPath = waitUntilLeader(followerShard);
+ assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
+ actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+ followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeNotificationListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
+ }
+ };
+ }
+
+ @Test
+ public void testServerRemoved() throws Exception {
+ final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
+
+ final ActorRef shard = parent.underlyingActor().context().actorOf(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testServerRemoved");
+
+ shard.tell(new ServerRemoved("test"), ActorRef.noSender());
+
+ MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);