+ @Test
+ public void testFailedRegistration() {
+ new JavaTestKit(getSystem()) {{
+ ActorSystem mockActorSystem = mock(ActorSystem.class);
+
+ ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class),
+ "testFailedRegistration");
+ doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
+ ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
+ MoreExecutors.sameThreadExecutor());
+ doReturn(executor).when(mockActorSystem).dispatcher();
+
+ ActorContext actorContext = mock(ActorContext.class);
+
+ String shardName = "shard-1";
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ shardName, actorContext, mockListener);
+
+ doReturn(mockActorSystem).when(actorContext).getActorSystem();
+ doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+ doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
+ doReturn(Futures.failed(new RuntimeException("mock"))).
+ when(actorContext).executeOperationAsync(any(ActorRef.class),
+ any(Object.class), any(Timeout.class));
+
+ proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
+ AsyncDataBroker.DataChangeScope.ONE);
+
+ Assert.assertEquals("getListenerRegistrationActor", null,
+ proxy.getListenerRegistrationActor());
+ }};
+ }
+
+ @Test
+ public void testCloseBeforeRegistration() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = mock(ActorContext.class);
+
+ String shardName = "shard-1";
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ shardName, actorContext, mockListener);
+
+ doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
+ doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(getSystem().actorSelection(getRef().path())).
+ when(actorContext).actorSelection(getRef().path());
+ doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+ doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
+
+ Answer<Future<Object>> answer = new Answer<Future<Object>>() {
+ @Override
+ public Future<Object> answer(InvocationOnMock invocation) {
+ proxy.close();
+ return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path()));
+ }
+ };
+
+ doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
+ any(Object.class), any(Timeout.class));
+
+ proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
+ AsyncDataBroker.DataChangeScope.ONE);
+
+ expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
+
+ Assert.assertEquals("getListenerRegistrationActor", null,
+ proxy.getListenerRegistrationActor());
+ }};