X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDataChangeListenerRegistrationProxy.java;h=2c0be57bc1be9c2ba6831ff05c5c2ebfe3065364;hb=90735c91ce3390a731afc446b4e7314c544ab9d6;hp=acf630e2e95598e71fdbd786da628f3524a29408;hpb=dd281c0e33267296ad3babbffd03e1122cdb127e;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
index acf630e2e9..2c0be57bc1 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
@@ -11,11 +11,24 @@ package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
/**
* ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
@@ -24,25 +37,33 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
* The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
*
*/
+@SuppressWarnings("rawtypes")
public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
+
+ private final AsyncDataChangeListener> listener;
+ private final String shardName;
+ private final ActorContext actorContext;
+ private ActorRef dataChangeListenerActor;
private volatile ActorSelection listenerRegistrationActor;
- private final AsyncDataChangeListener listener;
- private final ActorRef dataChangeListenerActor;
private boolean closed = false;
public >>
- DataChangeListenerRegistrationProxy(
- ActorSelection listenerRegistrationActor,
- L listener, ActorRef dataChangeListenerActor) {
- this.listenerRegistrationActor = listenerRegistrationActor;
- this.listener = listener;
- this.dataChangeListenerActor = dataChangeListenerActor;
+ DataChangeListenerRegistrationProxy(String shardName, ActorContext actorContext, L listener) {
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.listener = Preconditions.checkNotNull(listener);
}
- public >>
- DataChangeListenerRegistrationProxy(
- L listener, ActorRef dataChangeListenerActor) {
- this(null, listener, dataChangeListenerActor);
+ @VisibleForTesting
+ ActorSelection getListenerRegistrationActor() {
+ return listenerRegistrationActor;
+ }
+
+ @VisibleForTesting
+ ActorRef getDataChangeListenerActor() {
+ return dataChangeListenerActor;
}
@Override
@@ -50,40 +71,89 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
return listener;
}
- public void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+ private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+ if (listenerRegistrationActor == null) {
+ return;
+ }
+
boolean sendCloseMessage = false;
- synchronized(this) {
- if(closed) {
+ synchronized (this) {
+ if (closed) {
sendCloseMessage = true;
} else {
this.listenerRegistrationActor = listenerRegistrationActor;
}
}
- if(sendCloseMessage) {
- listenerRegistrationActor.tell(new
- CloseDataChangeListenerRegistration().toSerializable(), null);
+
+ if (sendCloseMessage) {
+ listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+ ActorRef.noSender());
}
+ }
- this.listenerRegistrationActor = listenerRegistrationActor;
+ public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
+
+ dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+ DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
+
+ Future findFuture = actorContext.findLocalShardAsync(shardName);
+ findFuture.onComplete(new OnComplete() {
+ @Override
+ public void onComplete(Throwable failure, ActorRef shard) {
+ if (failure instanceof LocalShardNotFoundException) {
+ LOG.debug("No local shard found for {} - DataChangeListener {} at path {} "
+ + "cannot be registered", shardName, listener, path);
+ } else if (failure != null) {
+ LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} "
+ + "cannot be registered: {}", shardName, listener, path, failure);
+ } else {
+ doRegistration(shard, path, scope);
+ }
+ }
+ }, actorContext.getClientDispatcher());
}
- public ActorSelection getListenerRegistrationActor() {
- return listenerRegistrationActor;
+ private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
+ DataChangeScope scope) {
+
+ Future