Refactor Register*ListenerReply classes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxy.java
index 681132e660d9608f3dea75e58eb4a9f3809f8dd6..2c0be57bc1be9c2ba6831ff05c5c2ebfe3065364 100644 (file)
@@ -13,14 +13,16 @@ import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -40,19 +42,18 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
 
     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
 
-    private volatile ActorSelection listenerRegistrationActor;
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-    private ActorRef dataChangeListenerActor;
     private final String shardName;
     private final ActorContext actorContext;
+    private ActorRef dataChangeListenerActor;
+    private volatile ActorSelection listenerRegistrationActor;
     private boolean closed = false;
 
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-                                                              DataChangeListenerRegistrationProxy (
-            String shardName, ActorContext actorContext, L listener) {
-        this.shardName = shardName;
-        this.actorContext = actorContext;
-        this.listener = listener;
+            DataChangeListenerRegistrationProxy(String shardName, ActorContext actorContext, L listener) {
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.listener = Preconditions.checkNotNull(listener);
     }
 
     @VisibleForTesting
@@ -71,40 +72,40 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
     }
 
     private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
-        if(listenerRegistrationActor == null) {
+        if (listenerRegistrationActor == null) {
             return;
         }
 
         boolean sendCloseMessage = false;
-        synchronized(this) {
-            if(closed) {
+        synchronized (this) {
+            if (closed) {
                 sendCloseMessage = true;
             } else {
                 this.listenerRegistrationActor = listenerRegistrationActor;
             }
         }
 
-        if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new
-                CloseDataChangeListenerRegistration().toSerializable(), null);
+        if (sendCloseMessage) {
+            listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+                    ActorRef.noSender());
         }
     }
 
     public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
 
         dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-                DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
+                DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
 
         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
         findFuture.onComplete(new OnComplete<ActorRef>() {
             @Override
             public void onComplete(Throwable failure, ActorRef shard) {
-                if(failure instanceof LocalShardNotFoundException) {
-                    LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
-                            "cannot be registered", shardName, listener, path);
-                } else if(failure != null) {
-                    LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
-                            "cannot be registered: {}", shardName, listener, path, failure);
+                if (failure instanceof LocalShardNotFoundException) {
+                    LOG.debug("No local shard found for {} - DataChangeListener {} at path {} "
+                            "cannot be registered", shardName, listener, path);
+                } else if (failure != null) {
+                    LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} "
+                            "cannot be registered: {}", shardName, listener, path, failure);
                 } else {
                     doRegistration(shard, path, scope);
                 }
@@ -116,17 +117,18 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
             DataChangeScope scope) {
 
         Future<Object> future = actorContext.executeOperationAsync(shard,
-                new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+                new RegisterChangeListener(path, dataChangeListenerActor, scope,
+                    listener instanceof ClusteredDOMDataChangeListener),
                 actorContext.getDatastoreContext().getShardInitializationTimeout());
 
-        future.onComplete(new OnComplete<Object>(){
+        future.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object result) {
-                if(failure != null) {
+                if (failure != null) {
                     LOG.error("Failed to register DataChangeListener {} at path {}",
                             listener, path.toString(), failure);
                 } else {
-                    RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+                    RegisterDataTreeNotificationListenerReply reply = (RegisterDataTreeNotificationListenerReply)result;
                     setListenerRegistrationActor(actorContext.actorSelection(
                             reply.getListenerRegistrationPath()));
                 }
@@ -138,18 +140,18 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
     public void close() {
 
         boolean sendCloseMessage;
-        synchronized(this) {
+        synchronized (this) {
             sendCloseMessage = !closed && listenerRegistrationActor != null;
             closed = true;
         }
 
-        if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(),
+        if (sendCloseMessage) {
+            listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
                     ActorRef.noSender());
             listenerRegistrationActor = null;
         }
 
-        if(dataChangeListenerActor != null) {
+        if (dataChangeListenerActor != null) {
             dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
             dataChangeListenerActor = null;
         }