Bug 8231: Fix testChangeListenerRegistration failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerSupport.java
index 2e26e6ee36d170d4bb0d3da3ae3d276d8f62305c..6b0d8294d40313f3142cc9024722c827d9ebc41c 100644 (file)
@@ -9,102 +9,41 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 final class DataChangeListenerSupport extends AbstractDataListenerSupport<
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener,
-            DelayedDataChangeListenerRegistration, DataChangeListenerRegistration<
-                    AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
-
-    private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+            DelayedDataChangeListenerRegistration> {
 
     DataChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
-    Collection<ActorSelection> getListenerActors() {
-        return new ArrayList<>(listenerActors);
-    }
-
     @Override
-    DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            createDelegate(final RegisterChangeListener message) {
-        final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
-
-        // Notify the listener if notifications should be enabled or not
-        // If this shard is the leader then it will enable notifications else
-        // it will not
-        dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
-
-        // Now store a reference to the data change listener so it can be notified
-        // at a later point if notifications should be enabled or disabled
-        if (!message.isRegisterOnAllInstances()) {
-            addListenerActor(dataChangeListenerPath);
-        }
+    void doRegistration(final RegisterChangeListener message, final ActorRef registrationActor) {
+        final ActorSelection listenerActor = processListenerRegistrationMessage(message);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                new DataChangeListenerProxy(dataChangeListenerPath);
+                new DataChangeListenerProxy(listenerActor);
 
         log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
 
-        Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-                Optional<DataTreeCandidate>> regEntry = getShard().getDataStore().registerChangeListener(
-                        message.getPath(), listener, message.getScope());
-
-        getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue());
-
-        listenerActors.add(dataChangeListenerPath);
-        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            delegate = regEntry.getKey();
-        return new DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-                NormalizedNode<?,?>>>() {
-            @Override
-            public void close() {
-                listenerActors.remove(dataChangeListenerPath);
-                delegate.close();
-            }
-
-            @Override
-            public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
-                return delegate.getInstance();
-            }
-
-            @Override
-            public YangInstanceIdentifier getPath() {
-                return delegate.getPath();
-            }
-
-            @Override
-            public DataChangeScope getScope() {
-                return delegate.getScope();
-            }
-        };
-    }
-
-    @Override
-    protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message) {
-        return new DelayedDataChangeListenerRegistration(message);
+        final ShardDataTree shardDataTree = getShard().getDataStore();
+        shardDataTree.registerDataChangeListener(message.getPath(), listener, message.getScope(),
+                shardDataTree.readCurrentData(), registration -> registrationActor.tell(
+                        new DataTreeNotificationListenerRegistrationActor.SetRegistration(registration, () ->
+                            removeListenerActor(listenerActor)), ActorRef.noSender()));
     }
 
     @Override
-    protected ActorRef newRegistrationActor(
-            ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
-        return createActor(DataChangeListenerRegistrationActor.props(registration));
+    protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message,
+            ActorRef registrationActor) {
+        return new DelayedDataChangeListenerRegistration(message, registrationActor);
     }
 
     @Override