*/
package org.opendaylight.controller.cluster.datastore;
-import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.ExecutionContextExecutor;
RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
Assert.assertEquals("getPath", path, registerMsg.getPath());
Assert.assertEquals("getScope", scope, registerMsg.getScope());
+ Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
- reply(new RegisterChangeListenerReply(getRef().path()));
+ reply(new RegisterChangeListenerReply(getRef()));
for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
proxy.close();
// The listener registration actor should get a Close message
- expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
+ expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
+
+ // The DataChangeListener actor should be terminated
+ expectMsgClass(timeout, Terminated.class);
+
+ proxy.close();
+
+ expectNoMsg();
+ }};
+ }
+
+ @Test(timeout=10000)
+ public void testSuccessfulRegistrationForClusteredListener() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockClusteredListener =
+ Mockito.mock(ClusteredDOMDataChangeListener.class);
+
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard-1", actorContext, mockClusteredListener);
+
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init(path, scope);
+ }
+
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new LocalShardFound(getRef()));
+
+ RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
+ Assert.assertEquals("getPath", path, registerMsg.getPath());
+ Assert.assertEquals("getScope", scope, registerMsg.getScope());
+ Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
+
+ reply(new RegisterChangeListenerReply(getRef()));
+
+ for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
+ proxy.getListenerRegistrationActor());
+
+ watch(proxy.getDataChangeListenerActor());
+
+ proxy.close();
+
+ // The listener registration actor should get a Close message
+ expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
// The DataChangeListener actor should be terminated
expectMsgClass(timeout, Terminated.class);
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
- reply(new ActorNotInitialized());
+ reply(new NotInitializedException("not initialized"));
new Within(duration("1 seconds")) {
@Override
"testFailedRegistration");
doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
- MoreExecutors.sameThreadExecutor());
- doReturn(executor).when(mockActorSystem).dispatcher();
+ MoreExecutors.directExecutor());
+
ActorContext actorContext = mock(ActorContext.class);
+ doReturn(executor).when(actorContext).getClientDispatcher();
+
String shardName = "shard-1";
final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
shardName, actorContext, mockListener);
shardName, actorContext, mockListener);
doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
+ doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath();
doReturn(getSystem().actorSelection(getRef().path())).
when(actorContext).actorSelection(getRef().path());
doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
@Override
public Future<Object> answer(InvocationOnMock invocation) {
proxy.close();
- return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path()));
+ return Futures.successful((Object)new RegisterChangeListenerReply(getRef()));
}
};
proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
AsyncDataBroker.DataChangeScope.ONE);
- expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
+ expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.class);
Assert.assertEquals("getListenerRegistrationActor", null,
proxy.getListenerRegistrationActor());