Remove ListenerRegistration protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxy.java
index 06f3afc57cb19d13dfd75448ce59dcf1a1e6bf39..534beead0ed4d256c4626870fbee54582add8d9f 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -20,12 +21,12 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
 import scala.concurrent.Future;
 
 /**
@@ -85,15 +86,14 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         }
 
         if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new
-                CloseDataChangeListenerRegistration().toSerializable(), null);
+            listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, null);
         }
     }
 
     public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
 
         dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-                DataChangeListener.props(listener));
+                DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
 
         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
         findFuture.onComplete(new OnComplete<ActorRef>() {
@@ -109,14 +109,15 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                     doRegistration(shard, path, scope);
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
             DataChangeScope scope) {
 
         Future<Object> future = actorContext.executeOperationAsync(shard,
-                new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+                new RegisterChangeListener(path, dataChangeListenerActor, scope,
+                    listener instanceof ClusteredDOMDataChangeListener),
                 actorContext.getDatastoreContext().getShardInitializationTimeout());
 
         future.onComplete(new OnComplete<Object>(){
@@ -131,7 +132,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                             reply.getListenerRegistrationPath()));
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -144,8 +145,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         }
 
         if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(),
-                    ActorRef.noSender());
+            listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, ActorRef.noSender());
             listenerRegistrationActor = null;
         }