Usage of Collections.unmodifiableCollection is unsafe
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxy.java
index b2ae060c3d30c219615432ad59d0b495f93e4294..738d256369eaadfb668bf6c08c79ba54b0de39d8 100644 (file)
@@ -8,12 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.concurrent.TimeUnit;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.dispatch.OnComplete;
-import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -22,12 +22,12 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
 import scala.concurrent.Future;
 
 /**
@@ -42,21 +42,19 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
 
     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
 
-    public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
-
-    private volatile ActorSelection listenerRegistrationActor;
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-    private ActorRef dataChangeListenerActor;
     private final String shardName;
     private final ActorContext actorContext;
+    private ActorRef dataChangeListenerActor;
+    private volatile ActorSelection listenerRegistrationActor;
     private boolean closed = false;
 
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
                                                               DataChangeListenerRegistrationProxy (
             String shardName, ActorContext actorContext, L listener) {
-        this.shardName = shardName;
-        this.actorContext = actorContext;
-        this.listener = listener;
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.listener = Preconditions.checkNotNull(listener);
     }
 
     @VisibleForTesting
@@ -89,17 +87,16 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         }
 
         if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new
-                CloseDataChangeListenerRegistration().toSerializable(), null);
+            listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, null);
         }
     }
 
     public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
 
         dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-                DataChangeListener.props(listener));
+                DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
 
-        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT);
+        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
         findFuture.onComplete(new OnComplete<ActorRef>() {
             @Override
             public void onComplete(Throwable failure, ActorRef shard) {
@@ -113,15 +110,16 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                     doRegistration(shard, path, scope);
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
             DataChangeScope scope) {
 
         Future<Object> future = actorContext.executeOperationAsync(shard,
-                new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
-                REGISTER_TIMEOUT);
+                new RegisterChangeListener(path, dataChangeListenerActor, scope,
+                    listener instanceof ClusteredDOMDataChangeListener),
+                actorContext.getDatastoreContext().getShardInitializationTimeout());
 
         future.onComplete(new OnComplete<Object>(){
             @Override
@@ -135,7 +133,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                             reply.getListenerRegistrationPath()));
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -148,8 +146,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         }
 
         if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(),
-                    ActorRef.noSender());
+            listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, ActorRef.noSender());
             listenerRegistrationActor = null;
         }