- new ShardTestKit(getSystem()) {{
- String testName = "testClusteredDataChangeListenerRegistration";
- final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
- actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
-
- final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
- actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
-
- final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
- Shard.builder().id(followerShardID).
- datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
- peerAddresses(Collections.singletonMap(leaderShardID.toString(),
- "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
-
- final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
- Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
- peerAddresses(Collections.singletonMap(followerShardID.toString(),
- "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
-
- leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
- String leaderPath = waitUntilLeader(followerShard);
- assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
- actorFactory.generateActorId(testName + "-DataChangeListener"));
-
- followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
- final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterChangeListenerReply.class);
- assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
-
- writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- listener.waitForChangeEvents();
- }};
+ new ShardTestKit(getSystem()) {
+ {
+ final String testName = "testClusteredDataChangeListenerRegistration";
+ final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
+
+ final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
+
+ final TestActorRef<Shard> followerShard = actorFactory
+ .createTestActor(Shard.builder().id(followerShardID)
+ .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
+ .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+ "akka://test/user/" + leaderShardID.toString()))
+ .schemaContext(SCHEMA_CONTEXT).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+ final TestActorRef<Shard> leaderShard = actorFactory
+ .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
+ .peerAddresses(Collections.singletonMap(followerShardID.toString(),
+ "akka://test/user/" + followerShardID.toString()))
+ .schemaContext(SCHEMA_CONTEXT).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+ leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+ final String leaderPath = waitUntilLeader(followerShard);
+ assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
+ actorFactory.generateActorId(testName + "-DataChangeListener"));
+
+ followerShard.tell(
+ new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
+ getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+
+ writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
+ }
+ };
+ }
+
+ @Test
+ public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
+ dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+ TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+ setupInMemorySnapshotStore();
+
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(testName + "-shard"));
+
+ waitUntilNoLeader(shard);
+
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
+ .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
+
+ listener.waitForChangeEvents();
+ }
+ };