+ };
+ }
+
+ @Test(timeout = 10000)
+ public void testSuccessfulRegistrationForClusteredListener() {
+ new JavaTestKit(getSystem()) {
+ {
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockClusteredListener =
+ Mockito.mock(ClusteredDOMDataChangeListener.class);
+
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard-1", actorContext, mockClusteredListener);
+
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+ new Thread(() -> proxy.init(path, scope)).start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new LocalShardFound(getRef()));
+
+ RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
+ Assert.assertEquals("getPath", path, registerMsg.getPath());
+ Assert.assertEquals("getScope", scope, registerMsg.getScope());
+ Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
+
+ reply(new RegisterDataTreeNotificationListenerReply(getRef()));
+
+ for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }