X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDataChangeListenerSupport.java;h=57e20059e69cfed0229a15ae67f642b3a385d684;hb=8dfdfb5627c0434a4d253945a8f590f9c66f4777;hp=f4b6bcc9fd56801a2ea077151a947627e2099595;hpb=65f9c2ce82b354a6b3e022be309783886b5d2184;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java index f4b6bcc9fd..57e20059e6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java @@ -10,10 +10,15 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.Collections; import java.util.Map.Entry; +import java.util.Set; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -26,14 +31,20 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport< DelayedDataChangeListenerRegistration, DataChangeListenerRegistration< AsyncDataChangeListener>>> { + private final Set listenerActors = Sets.newConcurrentHashSet(); + DataChangeListenerSupport(final Shard shard) { super(shard); } + Collection getListenerActors() { + return Collections.unmodifiableCollection(listenerActors); + } + @Override - Entry>>, - Optional> createDelegate(final RegisterChangeListener message) { - ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath()); + DataChangeListenerRegistration>> + createDelegate(final RegisterChangeListener message) { + final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath()); // Notify the listener if notifications should be enabled or not // If this shard is the leader then it will enable notifications else @@ -57,7 +68,32 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport< getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue()); - return regEntry; + listenerActors.add(dataChangeListenerPath); + final DataChangeListenerRegistration>> + delegate = regEntry.getKey(); + return new DataChangeListenerRegistration>>() { + @Override + public void close() { + listenerActors.remove(dataChangeListenerPath); + delegate.close(); + } + + @Override + public AsyncDataChangeListener> getInstance() { + return delegate.getInstance(); + } + + @Override + public YangInstanceIdentifier getPath() { + return delegate.getPath(); + } + + @Override + public DataChangeScope getScope() { + return delegate.getScope(); + } + }; } @Override