+ distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
+ mock(AsyncDataChangeListener.class),
+ AsyncDataBroker.DataChangeScope.BASE);
+
+ assertNotNull(registration);
+
+ assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
+ }
+
+ @Test
+ public void testRegisterChangeListenerWhenSuccessfulReplyReceived() throws Exception {
+ ActorContext actorContext = mock(ActorContext.class);
+
+ distributedDataStore = new DistributedDataStore(actorContext);
+ distributedDataStore.onGlobalContextUpdated(
+ TestModel.createTestContext());
+
+ ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
+
+ // Make Future successful
+ Future f = Futures.successful(new RegisterChangeListenerReply(doNothingActorRef.path()));
+
+ // Setup the mocks
+ ActorSystem actorSystem = mock(ActorSystem.class);
+ ActorSelection actorSelection = mock(ActorSelection.class);
+
+ when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
+ when(actorSystem.dispatcher()).thenReturn(executor);
+ when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
+ when(actorContext.getActorSystem()).thenReturn(actorSystem);
+ when(actorContext
+ .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f);
+ when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
+
+ ListenerRegistration registration =
+ distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
+ mock(AsyncDataChangeListener.class),
+ AsyncDataBroker.DataChangeScope.BASE);
+
+ assertNotNull(registration);
+
+ assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
+
+ ActorSelection listenerRegistrationActor =
+ ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
+
+ assertNotNull(listenerRegistrationActor);
+
+ assertEquals(actorSelection, listenerRegistrationActor);
+ }
+
+ @Test
+ public void testRegisterChangeListenerWhenSuccessfulReplyFailed() throws Exception {
+ ActorContext actorContext = mock(ActorContext.class);
+
+ distributedDataStore = new DistributedDataStore(actorContext);
+ distributedDataStore.onGlobalContextUpdated(
+ TestModel.createTestContext());
+
+ ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
+
+ // Make Future fail
+ Future f = Futures.failed(new IllegalArgumentException());
+
+ // Setup the mocks
+ ActorSystem actorSystem = mock(ActorSystem.class);
+ ActorSelection actorSelection = mock(ActorSelection.class);
+
+ when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
+ when(actorSystem.dispatcher()).thenReturn(executor);
+ when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
+ when(actorContext.getActorSystem()).thenReturn(actorSystem);
+ when(actorContext
+ .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f);
+ when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
+
+ ListenerRegistration registration =
+ distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
+ mock(AsyncDataChangeListener.class),
+ AsyncDataBroker.DataChangeScope.BASE);
+
+ assertNotNull(registration);
+
+ assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
+
+ ActorSelection listenerRegistrationActor =
+ ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();