+ @Test
+ public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ String testName = "testClusteredDataTreeChangeListenerRegistration";
+ final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+ final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+ final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+ Shard.builder().id(followerShardID).
+ datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+ peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+ "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+ final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+ Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+ peerAddresses(Collections.singletonMap(followerShardID.toString(),
+ "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+ leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+ String leaderPath = waitUntilLeader(followerShard);
+ assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+ actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+ followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();