BUG 2676 : Use notification-dispatcher for DataChangeListener actors
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxy.java
index b2ae060c3d30c219615432ad59d0b495f93e4294..681132e660d9608f3dea75e58eb4a9f3809f8dd6 100644 (file)
@@ -8,12 +8,11 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.concurrent.TimeUnit;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.dispatch.OnComplete;
-import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -27,7 +26,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
 import scala.concurrent.Future;
 
 /**
@@ -42,8 +40,6 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
 
     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
 
-    public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
-
     private volatile ActorSelection listenerRegistrationActor;
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
     private ActorRef dataChangeListenerActor;
@@ -97,9 +93,9 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
     public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
 
         dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-                DataChangeListener.props(listener));
+                DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
 
-        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT);
+        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
         findFuture.onComplete(new OnComplete<ActorRef>() {
             @Override
             public void onComplete(Throwable failure, ActorRef shard) {
@@ -113,7 +109,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                     doRegistration(shard, path, scope);
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
@@ -121,7 +117,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
 
         Future<Object> future = actorContext.executeOperationAsync(shard,
                 new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
-                REGISTER_TIMEOUT);
+                actorContext.getDatastoreContext().getShardInitializationTimeout());
 
         future.onComplete(new OnComplete<Object>(){
             @Override
@@ -135,7 +131,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                             reply.getListenerRegistrationPath()));
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     @Override