BUG 1735 Registering a data change listener should be asynchronous
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxy.java
index 89cc9695251d18b2efc747b0de2b2882431a8de5..acf630e2e95598e71fdbd786da628f3524a29408 100644 (file)
@@ -8,11 +8,13 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 /**
@@ -23,15 +25,24 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * </p>
  */
 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
-    private final ActorSelection listenerRegistrationActor;
+    private volatile ActorSelection listenerRegistrationActor;
     private final AsyncDataChangeListener listener;
+    private final ActorRef dataChangeListenerActor;
+    private boolean closed = false;
 
-    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
     DataChangeListenerRegistrationProxy(
         ActorSelection listenerRegistrationActor,
-        L listener) {
+        L listener, ActorRef dataChangeListenerActor) {
         this.listenerRegistrationActor = listenerRegistrationActor;
         this.listener = listener;
+        this.dataChangeListenerActor = dataChangeListenerActor;
+    }
+
+    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+    DataChangeListenerRegistrationProxy(
+        L listener, ActorRef dataChangeListenerActor) {
+        this(null, listener, dataChangeListenerActor);
     }
 
     @Override
@@ -39,8 +50,40 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         return listener;
     }
 
+    public void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+        boolean sendCloseMessage = false;
+        synchronized(this) {
+            if(closed) {
+                sendCloseMessage = true;
+            } else {
+                this.listenerRegistrationActor = listenerRegistrationActor;
+            }
+        }
+        if(sendCloseMessage) {
+            listenerRegistrationActor.tell(new
+                CloseDataChangeListenerRegistration().toSerializable(), null);
+        }
+
+        this.listenerRegistrationActor = listenerRegistrationActor;
+    }
+
+    public ActorSelection getListenerRegistrationActor() {
+        return listenerRegistrationActor;
+    }
+
     @Override
     public void close() {
-        listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration(), null);
+
+        boolean sendCloseMessage;
+        synchronized(this) {
+            sendCloseMessage = !closed && listenerRegistrationActor != null;
+            closed = true;
+        }
+        if(sendCloseMessage) {
+            listenerRegistrationActor.tell(new
+                CloseDataChangeListenerRegistration().toSerializable(), null);
+        }
+
+        dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
     }
 }