X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDataChangeListenerRegistrationProxy.java;h=33cf8e9c1d292ca46f47cfb3efd9497ed38a63cc;hb=f41c5e6e6f6e10b36b1e4b1992877e38e718c8fb;hp=6f8bc633c7401cee2750aa7e9ce83bcbb525a8f2;hpb=66a6b6f931af3fcd1ce61263c457304cfbdc2bb5;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
index 6f8bc633c7..33cf8e9c1d 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
@@ -13,10 +13,11 @@ import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.dispatch.OnComplete;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
@@ -30,30 +31,30 @@ import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
/**
- * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
+ * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard.
+ *
*
* Registering a DataChangeListener on the Data Store creates a new instance of the ListenerRegistrationProxy
* The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
- *
*/
@SuppressWarnings("rawtypes")
public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
- private volatile ActorSelection listenerRegistrationActor;
private final AsyncDataChangeListener> listener;
- private ActorRef dataChangeListenerActor;
private final String shardName;
private final ActorContext actorContext;
+ private ActorRef dataChangeListenerActor;
+ private volatile ActorSelection listenerRegistrationActor;
private boolean closed = false;
public >>
- DataChangeListenerRegistrationProxy (
- String shardName, ActorContext actorContext, L listener) {
- this.shardName = shardName;
- this.actorContext = actorContext;
- this.listener = listener;
+ DataChangeListenerRegistrationProxy(final String shardName, final ActorContext actorContext,
+ final L listener) {
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.listener = Preconditions.checkNotNull(listener);
}
@VisibleForTesting
@@ -71,41 +72,41 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
return listener;
}
- private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
- if(listenerRegistrationActor == null) {
+ private void setListenerRegistrationActor(final ActorSelection listenerRegistrationActor) {
+ if (listenerRegistrationActor == null) {
return;
}
boolean sendCloseMessage = false;
- synchronized(this) {
- if(closed) {
+ synchronized (this) {
+ if (closed) {
sendCloseMessage = true;
} else {
this.listenerRegistrationActor = listenerRegistrationActor;
}
}
- if(sendCloseMessage) {
- listenerRegistrationActor.tell(new
- CloseDataChangeListenerRegistration().toSerializable(), null);
+ if (sendCloseMessage) {
+ listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+ ActorRef.noSender());
}
}
public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
dataChangeListenerActor = actorContext.getActorSystem().actorOf(
- DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
+ DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
Future findFuture = actorContext.findLocalShardAsync(shardName);
findFuture.onComplete(new OnComplete() {
@Override
- public void onComplete(Throwable failure, ActorRef shard) {
- if(failure instanceof LocalShardNotFoundException) {
- LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
- "cannot be registered", shardName, listener, path);
- } else if(failure != null) {
- LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
- "cannot be registered: {}", shardName, listener, path, failure);
+ public void onComplete(final Throwable failure, final ActorRef shard) {
+ if (failure instanceof LocalShardNotFoundException) {
+ LOG.debug("No local shard found for {} - DataChangeListener {} at path {} "
+ + "cannot be registered", shardName, listener, path);
+ } else if (failure != null) {
+ LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} "
+ + "cannot be registered: {}", shardName, listener, path, failure);
} else {
doRegistration(shard, path, scope);
}
@@ -113,22 +114,22 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
}, actorContext.getClientDispatcher());
}
- private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
- DataChangeScope scope) {
+ private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path,
+ final DataChangeScope scope) {
Future