Refactor Register*ListenerReply classes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxy.java
index acf630e2e95598e71fdbd786da628f3524a29408..2c0be57bc1be9c2ba6831ff05c5c2ebfe3065364 100644 (file)
@@ -11,11 +11,24 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 
 /**
  * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
@@ -24,25 +37,33 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
  * </p>
  */
+@SuppressWarnings("rawtypes")
 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
+
+    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+    private final String shardName;
+    private final ActorContext actorContext;
+    private ActorRef dataChangeListenerActor;
     private volatile ActorSelection listenerRegistrationActor;
-    private final AsyncDataChangeListener listener;
-    private final ActorRef dataChangeListenerActor;
     private boolean closed = false;
 
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-    DataChangeListenerRegistrationProxy(
-        ActorSelection listenerRegistrationActor,
-        L listener, ActorRef dataChangeListenerActor) {
-        this.listenerRegistrationActor = listenerRegistrationActor;
-        this.listener = listener;
-        this.dataChangeListenerActor = dataChangeListenerActor;
+            DataChangeListenerRegistrationProxy(String shardName, ActorContext actorContext, L listener) {
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.listener = Preconditions.checkNotNull(listener);
     }
 
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-    DataChangeListenerRegistrationProxy(
-        L listener, ActorRef dataChangeListenerActor) {
-        this(null, listener, dataChangeListenerActor);
+    @VisibleForTesting
+    ActorSelection getListenerRegistrationActor() {
+        return listenerRegistrationActor;
+    }
+
+    @VisibleForTesting
+    ActorRef getDataChangeListenerActor() {
+        return dataChangeListenerActor;
     }
 
     @Override
@@ -50,40 +71,89 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         return listener;
     }
 
-    public void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+    private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+        if (listenerRegistrationActor == null) {
+            return;
+        }
+
         boolean sendCloseMessage = false;
-        synchronized(this) {
-            if(closed) {
+        synchronized (this) {
+            if (closed) {
                 sendCloseMessage = true;
             } else {
                 this.listenerRegistrationActor = listenerRegistrationActor;
             }
         }
-        if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new
-                CloseDataChangeListenerRegistration().toSerializable(), null);
+
+        if (sendCloseMessage) {
+            listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+                    ActorRef.noSender());
         }
+    }
 
-        this.listenerRegistrationActor = listenerRegistrationActor;
+    public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
+
+        dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+                DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
+
+        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
+        findFuture.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(Throwable failure, ActorRef shard) {
+                if (failure instanceof LocalShardNotFoundException) {
+                    LOG.debug("No local shard found for {} - DataChangeListener {} at path {} "
+                            + "cannot be registered", shardName, listener, path);
+                } else if (failure != null) {
+                    LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} "
+                            + "cannot be registered: {}", shardName, listener, path, failure);
+                } else {
+                    doRegistration(shard, path, scope);
+                }
+            }
+        }, actorContext.getClientDispatcher());
     }
 
-    public ActorSelection getListenerRegistrationActor() {
-        return listenerRegistrationActor;
+    private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
+            DataChangeScope scope) {
+
+        Future<Object> future = actorContext.executeOperationAsync(shard,
+                new RegisterChangeListener(path, dataChangeListenerActor, scope,
+                    listener instanceof ClusteredDOMDataChangeListener),
+                actorContext.getDatastoreContext().getShardInitializationTimeout());
+
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object result) {
+                if (failure != null) {
+                    LOG.error("Failed to register DataChangeListener {} at path {}",
+                            listener, path.toString(), failure);
+                } else {
+                    RegisterDataTreeNotificationListenerReply reply = (RegisterDataTreeNotificationListenerReply)result;
+                    setListenerRegistrationActor(actorContext.actorSelection(
+                            reply.getListenerRegistrationPath()));
+                }
+            }
+        }, actorContext.getClientDispatcher());
     }
 
     @Override
     public void close() {
 
         boolean sendCloseMessage;
-        synchronized(this) {
+        synchronized (this) {
             sendCloseMessage = !closed && listenerRegistrationActor != null;
             closed = true;
         }
-        if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new
-                CloseDataChangeListenerRegistration().toSerializable(), null);
+
+        if (sendCloseMessage) {
+            listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+                    ActorRef.noSender());
+            listenerRegistrationActor = null;
         }
 
-        dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
+        if (dataChangeListenerActor != null) {
+            dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            dataChangeListenerActor = null;
+        }
     }
 }