- new ShardTestKit(getSystem()) {{
- final String testName = "testClusteredDataChangeListenerRegistration";
- final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
- MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
-
- final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
- MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
-
- final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
- Shard.builder().id(followerShardID).
- datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
- peerAddresses(Collections.singletonMap(leaderShardID.toString(),
- "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
-
- final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
- Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
- peerAddresses(Collections.singletonMap(followerShardID.toString(),
- "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
-
- leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
- final String leaderPath = waitUntilLeader(followerShard);
- assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
- actorFactory.generateActorId(testName + "-DataChangeListener"));
-
- followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
- final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterChangeListenerReply.class);
- assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
-
- writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- listener.waitForChangeEvents();
- }};
+ new ShardTestKit(getSystem()) {
+ {
+ final String testName = "testClusteredDataChangeListenerRegistration";
+ final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
+
+ final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
+
+ final TestActorRef<Shard> followerShard = actorFactory
+ .createTestActor(Shard.builder().id(followerShardID)
+ .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
+ .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+ "akka://test/user/" + leaderShardID.toString()))
+ .schemaContext(SCHEMA_CONTEXT).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+ final TestActorRef<Shard> leaderShard = actorFactory
+ .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
+ .peerAddresses(Collections.singletonMap(followerShardID.toString(),
+ "akka://test/user/" + followerShardID.toString()))
+ .schemaContext(SCHEMA_CONTEXT).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+ leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+ final String leaderPath = waitUntilLeader(followerShard);
+ assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ actorFactory.generateActorId(testName + "-DataChangeListener"));
+
+ followerShard.tell(
+ new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
+ getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+ writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
+ }
+ };