Remove ListenerRegistration protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxy.java
index acf630e2e95598e71fdbd786da628f3524a29408..534beead0ed4d256c4626870fbee54582add8d9f 100644 (file)
@@ -11,11 +11,23 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 
 /**
  * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
@@ -24,25 +36,34 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
  * </p>
  */
+@SuppressWarnings("rawtypes")
 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
+
     private volatile ActorSelection listenerRegistrationActor;
-    private final AsyncDataChangeListener listener;
-    private final ActorRef dataChangeListenerActor;
+    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+    private ActorRef dataChangeListenerActor;
+    private final String shardName;
+    private final ActorContext actorContext;
     private boolean closed = false;
 
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-    DataChangeListenerRegistrationProxy(
-        ActorSelection listenerRegistrationActor,
-        L listener, ActorRef dataChangeListenerActor) {
-        this.listenerRegistrationActor = listenerRegistrationActor;
+                                                              DataChangeListenerRegistrationProxy (
+            String shardName, ActorContext actorContext, L listener) {
+        this.shardName = shardName;
+        this.actorContext = actorContext;
         this.listener = listener;
-        this.dataChangeListenerActor = dataChangeListenerActor;
     }
 
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-    DataChangeListenerRegistrationProxy(
-        L listener, ActorRef dataChangeListenerActor) {
-        this(null, listener, dataChangeListenerActor);
+    @VisibleForTesting
+    ActorSelection getListenerRegistrationActor() {
+        return listenerRegistrationActor;
+    }
+
+    @VisibleForTesting
+    ActorRef getDataChangeListenerActor() {
+        return dataChangeListenerActor;
     }
 
     @Override
@@ -50,7 +71,11 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         return listener;
     }
 
-    public void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+    private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+        if(listenerRegistrationActor == null) {
+            return;
+        }
+
         boolean sendCloseMessage = false;
         synchronized(this) {
             if(closed) {
@@ -59,16 +84,55 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                 this.listenerRegistrationActor = listenerRegistrationActor;
             }
         }
+
         if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new
-                CloseDataChangeListenerRegistration().toSerializable(), null);
+            listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, null);
         }
+    }
 
-        this.listenerRegistrationActor = listenerRegistrationActor;
+    public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
+
+        dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+                DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
+
+        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
+        findFuture.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(Throwable failure, ActorRef shard) {
+                if(failure instanceof LocalShardNotFoundException) {
+                    LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
+                            "cannot be registered", shardName, listener, path);
+                } else if(failure != null) {
+                    LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
+                            "cannot be registered: {}", shardName, listener, path, failure);
+                } else {
+                    doRegistration(shard, path, scope);
+                }
+            }
+        }, actorContext.getClientDispatcher());
     }
 
-    public ActorSelection getListenerRegistrationActor() {
-        return listenerRegistrationActor;
+    private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
+            DataChangeScope scope) {
+
+        Future<Object> future = actorContext.executeOperationAsync(shard,
+                new RegisterChangeListener(path, dataChangeListenerActor, scope,
+                    listener instanceof ClusteredDOMDataChangeListener),
+                actorContext.getDatastoreContext().getShardInitializationTimeout());
+
+        future.onComplete(new OnComplete<Object>(){
+            @Override
+            public void onComplete(Throwable failure, Object result) {
+                if(failure != null) {
+                    LOG.error("Failed to register DataChangeListener {} at path {}",
+                            listener, path.toString(), failure);
+                } else {
+                    RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+                    setListenerRegistrationActor(actorContext.actorSelection(
+                            reply.getListenerRegistrationPath()));
+                }
+            }
+        }, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -79,11 +143,15 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
             sendCloseMessage = !closed && listenerRegistrationActor != null;
             closed = true;
         }
+
         if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new
-                CloseDataChangeListenerRegistration().toSerializable(), null);
+            listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, ActorRef.noSender());
+            listenerRegistrationActor = null;
         }
 
-        dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
+        if(dataChangeListenerActor != null) {
+            dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            dataChangeListenerActor = null;
+        }
     }
 }