Bug 8231: Fix testChangeListenerRegistration failure 08/55108/5
authorTom Pantelis <tompantelis@gmail.com>
Fri, 14 Apr 2017 13:03:51 +0000 (09:03 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Tue, 18 Apr 2017 19:45:12 +0000 (15:45 -0400)
As described in Bug 8231, the sharing of the ListenerTree between the
ShardDataTree and the ShardDataTreeNotificationPublisherActor is
problematic. Therefore the ListenerTree (wrapped by the
DefaultShardDataTreeChangeListenerPublisher) is now owned by the
ShardDataTreeNotificationPublisherActor. On registration, a RegisterListener
messages is sent to the ShardDataTreeNotificationPublisherActor to perform
the on-boarding of the new listener, ie it atomically generates and sends
the initial notification and then adds the listener to the ListenerTree.

This change necessitated some refactoring of the DataChangeListenerSupport
class et al wrt to how the ListenerRegistration is handled. Prior the
ListenerRegistration was passed on creation of the registration actor. This
is now done indirectly by sending a SetRegistration message to the
registration actor via a Consumer callback passed in the RegisterListener
message. When the ListenerRegistration is obtained by the
ShardDataChangePublisherActor, it invokes the Consumer callback.

When a registration is initially delayed due to no leader, the
DelayedListenerRegistration is sent to the registration actor. When the
leader is elected later on, the actual ListenerRegistration is sent and
replaces the DelayedListenerRegistration.

The DOMDataTreeChangeListener registration classes were changed/refactored
similarly.

In addition, the 2 specific registration actor classes were replaced by a
generic reusable DataTreeNotificationListenerRegistrationActor that handles
both listener types. Also the 2 CloseData*ListenerRegistration and
CloseData*ListenerRegistrationReply messages were consolidated.

Change-Id: I79ac76b8044609351e5dd8367b691b589ea35075
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
44 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationActor.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangePublisherActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisherActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnerChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeNotificationListenerRegistration.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java with 62% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeNotificationListenerRegistrationReply.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java with 62% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActorTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataTreeChangeListener.java

index 0821951..ac7a933 100644 (file)
@@ -12,31 +12,36 @@ import akka.actor.ActorSelection;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EventListener;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class AbstractDataListenerSupport<L extends EventListener, M extends ListenerRegistrationMessage,
-        D extends DelayedListenerRegistration<L, M>, R extends ListenerRegistration<L>>
-                extends LeaderLocalDelegateFactory<M, R> {
+        D extends DelayedListenerRegistration<L, M>> extends LeaderLocalDelegateFactory<M> {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
-    private final ArrayList<D> delayedListenerOnAllRegistrations = new ArrayList<>();
-    private final Collection<ActorSelection> actors = new ArrayList<>();
+    private final Collection<D> delayedListenerRegistrations = ConcurrentHashMap.newKeySet();
+    private final Collection<D> delayedListenerOnAllRegistrations = ConcurrentHashMap.newKeySet();
+    private final Collection<ActorSelection> leaderOnlyListenerActors = ConcurrentHashMap.newKeySet();
+    private final Collection<ActorSelection> allListenerActors = ConcurrentHashMap.newKeySet();
 
     protected AbstractDataListenerSupport(Shard shard) {
         super(shard);
     }
 
+    Collection<ActorSelection> getListenerActors() {
+        return new ArrayList<>(allListenerActors);
+    }
+
     @Override
     void onLeadershipChange(boolean isLeader, boolean hasLeader) {
         log.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader);
 
         final EnableNotification msg = new EnableNotification(isLeader);
-        for (ActorSelection dataChangeListener : actors) {
+        for (ActorSelection dataChangeListener : leaderOnlyListenerActors) {
             dataChangeListener.tell(msg, getSelf());
         }
 
@@ -46,7 +51,6 @@ abstract class AbstractDataListenerSupport<L extends EventListener, M extends Li
             }
 
             delayedListenerOnAllRegistrations.clear();
-            delayedListenerOnAllRegistrations.trimToSize();
         }
 
         if (isLeader) {
@@ -55,7 +59,6 @@ abstract class AbstractDataListenerSupport<L extends EventListener, M extends Li
             }
 
             delayedListenerRegistrations.clear();
-            delayedListenerRegistrations.trimToSize();
         }
     }
 
@@ -63,41 +66,61 @@ abstract class AbstractDataListenerSupport<L extends EventListener, M extends Li
     void onMessage(M message, boolean isLeader, boolean hasLeader) {
         log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader);
 
-        final ListenerRegistration<L> registration;
+        ActorRef registrationActor = createActor(DataTreeNotificationListenerRegistrationActor.props());
+
         if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
-            registration = createDelegate(message);
+            doRegistration(message, registrationActor);
         } else {
             log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
-            D delayedReg = newDelayedListenerRegistration(message);
+            D delayedReg = newDelayedListenerRegistration(message, registrationActor);
+            Collection<D> delayedRegList;
             if (message.isRegisterOnAllInstances()) {
-                delayedListenerOnAllRegistrations.add(delayedReg);
+                delayedRegList = delayedListenerOnAllRegistrations;
             } else {
-                delayedListenerRegistrations.add(delayedReg);
+                delayedRegList = delayedListenerRegistrations;
             }
 
-            registration = delayedReg;
+            delayedRegList.add(delayedReg);
+            registrationActor.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(
+                    delayedReg, () -> delayedRegList.remove(delayedReg)), ActorRef.noSender());
         }
 
-        ActorRef registrationActor = newRegistrationActor(registration);
-
         log.debug("{}: {} sending reply, listenerRegistrationPath = {} ", persistenceId(), logName(),
                 registrationActor.path());
 
         tellSender(newRegistrationReplyMessage(registrationActor));
     }
 
+    protected ActorSelection processListenerRegistrationMessage(M message) {
+        final ActorSelection listenerActor = selectActor(message.getListenerActorPath());
+
+        // We have a leader so enable the listener.
+        listenerActor.tell(new EnableNotification(true), getSelf());
+
+        if (!message.isRegisterOnAllInstances()) {
+            // This is a leader-only registration so store a reference to the listener actor so it can be notified
+            // at a later point if notifications should be enabled or disabled.
+            leaderOnlyListenerActors.add(listenerActor);
+        }
+
+        allListenerActors.add(listenerActor);
+
+        return listenerActor;
+    }
+
     protected Logger log() {
         return log;
     }
 
-    protected void addListenerActor(ActorSelection actor) {
-        actors.add(actor);
+    protected void removeListenerActor(ActorSelection listenerActor) {
+        allListenerActors.remove(listenerActor);
+        leaderOnlyListenerActors.remove(listenerActor);
     }
 
-    protected abstract D newDelayedListenerRegistration(M message);
+    abstract void doRegistration(M message, ActorRef registrationActor);
 
-    protected abstract ActorRef newRegistrationActor(ListenerRegistration<L> registration);
+    protected abstract D newDelayedListenerRegistration(M message, ActorRef registrationActor);
 
     protected abstract Object newRegistrationReplyMessage(ActorRef registrationActor);
 
index b7356d2..6b348fb 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorContext;
 import akka.actor.ActorRef;
+import akka.actor.Props;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -28,38 +29,40 @@ abstract class AbstractShardDataTreeNotificationPublisherActorProxy implements S
 
     private final ActorContext actorContext;
     private final String actorName;
+    private final String logContext;
     private ActorRef notifierActor;
 
-    protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName) {
+    protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName,
+            String logContext) {
         this.actorContext = actorContext;
         this.actorName = actorName;
+        this.logContext = logContext;
     }
 
-    protected AbstractShardDataTreeNotificationPublisherActorProxy(
-            AbstractShardDataTreeNotificationPublisherActorProxy other) {
-        this.actorContext = null;
-        this.actorName = null;
-        this.notifierActor = other.getNotifierActor();
+    protected abstract Props props();
+
+    protected final String actorName() {
+        return actorName;
     }
 
-    protected abstract ShardDataTreeNotificationPublisher getDelegatePublisher();
+    protected final String logContext() {
+        return logContext;
+    }
 
     @Override
     public void publishChanges(DataTreeCandidate candidate, String logContext) {
-        getNotifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications(
-                getDelegatePublisher(), candidate, logContext), ActorRef.noSender());
+        notifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications(candidate),
+                ActorRef.noSender());
     }
 
-    private ActorRef getNotifierActor() {
+    protected final ActorRef notifierActor() {
         if (notifierActor == null) {
             LOG.debug("Creating actor {}", actorName);
 
             String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
                     Dispatchers.DispatcherType.Notification);
-            notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props(actorName)
-                    .withDispatcher(dispatcher).withMailbox(
-                            org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX),
-                    actorName);
+            notifierActor = actorContext.actorOf(props().withDispatcher(dispatcher).withMailbox(
+                    org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName);
         }
 
         return notifierActor;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationActor.java
deleted file mode 100644 (file)
index 89062dc..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.japi.Creator;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public class DataChangeListenerRegistrationActor extends AbstractUntypedActor {
-
-    private final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-        registration;
-
-    public DataChangeListenerRegistrationActor(
-        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
-        this.registration = registration;
-    }
-
-    @Override
-    public void handleReceive(Object message) {
-        if (message instanceof CloseDataChangeListenerRegistration) {
-            closeListenerRegistration();
-        } else {
-            unknownMessage(message);
-        }
-    }
-
-    public static Props props(final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-            NormalizedNode<?, ?>>> registration) {
-        return Props.create(new DataChangeListenerRegistrationCreator(registration));
-    }
-
-    private void closeListenerRegistration() {
-        registration.close();
-
-        if (isValidSender(getSender())) {
-            getSender().tell(CloseDataChangeListenerRegistrationReply.INSTANCE, getSelf());
-        }
-
-        getSelf().tell(PoisonPill.getInstance(), getSelf());
-    }
-
-    private static class DataChangeListenerRegistrationCreator
-                                            implements Creator<DataChangeListenerRegistrationActor> {
-        private static final long serialVersionUID = 1L;
-
-        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
-                + "create remote instances of this actor and thus don't need it to be Serializable.")
-        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-                                                           NormalizedNode<?, ?>>> registration;
-
-        DataChangeListenerRegistrationCreator(
-                ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-                                                             NormalizedNode<?, ?>>> registration) {
-            this.registration = registration;
-        }
-
-        @Override
-        public DataChangeListenerRegistrationActor create() throws Exception {
-            return new DataChangeListenerRegistrationActor(registration);
-        }
-    }
-}
index f0f4b7b..3fe3933 100644 (file)
@@ -15,7 +15,7 @@ import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -86,7 +86,8 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         }
 
         if (sendCloseMessage) {
-            listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, null);
+            listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+                    ActorRef.noSender());
         }
     }
 
@@ -145,7 +146,8 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         }
 
         if (sendCloseMessage) {
-            listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, ActorRef.noSender());
+            listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+                    ActorRef.noSender());
             listenerRegistrationActor = null;
         }
 
index 2e26e6e..6b0d829 100644 (file)
@@ -9,102 +9,41 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 final class DataChangeListenerSupport extends AbstractDataListenerSupport<
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener,
-            DelayedDataChangeListenerRegistration, DataChangeListenerRegistration<
-                    AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
-
-    private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+            DelayedDataChangeListenerRegistration> {
 
     DataChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
-    Collection<ActorSelection> getListenerActors() {
-        return new ArrayList<>(listenerActors);
-    }
-
     @Override
-    DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            createDelegate(final RegisterChangeListener message) {
-        final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
-
-        // Notify the listener if notifications should be enabled or not
-        // If this shard is the leader then it will enable notifications else
-        // it will not
-        dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
-
-        // Now store a reference to the data change listener so it can be notified
-        // at a later point if notifications should be enabled or disabled
-        if (!message.isRegisterOnAllInstances()) {
-            addListenerActor(dataChangeListenerPath);
-        }
+    void doRegistration(final RegisterChangeListener message, final ActorRef registrationActor) {
+        final ActorSelection listenerActor = processListenerRegistrationMessage(message);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                new DataChangeListenerProxy(dataChangeListenerPath);
+                new DataChangeListenerProxy(listenerActor);
 
         log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
 
-        Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-                Optional<DataTreeCandidate>> regEntry = getShard().getDataStore().registerChangeListener(
-                        message.getPath(), listener, message.getScope());
-
-        getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue());
-
-        listenerActors.add(dataChangeListenerPath);
-        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            delegate = regEntry.getKey();
-        return new DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-                NormalizedNode<?,?>>>() {
-            @Override
-            public void close() {
-                listenerActors.remove(dataChangeListenerPath);
-                delegate.close();
-            }
-
-            @Override
-            public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
-                return delegate.getInstance();
-            }
-
-            @Override
-            public YangInstanceIdentifier getPath() {
-                return delegate.getPath();
-            }
-
-            @Override
-            public DataChangeScope getScope() {
-                return delegate.getScope();
-            }
-        };
-    }
-
-    @Override
-    protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message) {
-        return new DelayedDataChangeListenerRegistration(message);
+        final ShardDataTree shardDataTree = getShard().getDataStore();
+        shardDataTree.registerDataChangeListener(message.getPath(), listener, message.getScope(),
+                shardDataTree.readCurrentData(), registration -> registrationActor.tell(
+                        new DataTreeNotificationListenerRegistrationActor.SetRegistration(registration, () ->
+                            removeListenerActor(listenerActor)), ActorRef.noSender()));
     }
 
     @Override
-    protected ActorRef newRegistrationActor(
-            ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
-        return createActor(DataChangeListenerRegistrationActor.props(registration));
+    protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message,
+            ActorRef registrationActor) {
+        return new DelayedDataChangeListenerRegistration(message, registrationActor);
     }
 
     @Override
index f60e676..dd280a6 100644 (file)
@@ -15,7 +15,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -55,7 +55,8 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
     @Override
     protected synchronized void removeRegistration() {
         if (listenerRegistrationActor != null) {
-            listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender());
+            listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+                    ActorRef.noSender());
             listenerRegistrationActor = null;
         }
 
@@ -94,7 +95,7 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
         }
 
         // This registration has already been closed, notify the actor
-        actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
+        actor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), null);
     }
 
     private void doRegistration(final ActorRef shard) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java
deleted file mode 100644 (file)
index 17deeeb..0000000
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.japi.Creator;
-import com.google.common.base.Preconditions;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-
-/**
- * Actor co-located with a shard. It exists only to terminate the registration when
- * asked to do so via {@link CloseDataTreeChangeListenerRegistration}.
- */
-public final class DataTreeChangeListenerRegistrationActor extends AbstractUntypedActor {
-    private final ListenerRegistration<DOMDataTreeChangeListener> registration;
-
-    public DataTreeChangeListenerRegistrationActor(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
-        this.registration = Preconditions.checkNotNull(registration);
-    }
-
-    @Override
-    protected void handleReceive(Object message) throws Exception {
-        if (message instanceof CloseDataTreeChangeListenerRegistration) {
-            registration.close();
-            if (isValidSender(getSender())) {
-                getSender().tell(CloseDataTreeChangeListenerRegistrationReply.getInstance(), getSelf());
-            }
-
-            getSelf().tell(PoisonPill.getInstance(), getSelf());
-        } else {
-            unknownMessage(message);
-        }
-    }
-
-    public static Props props(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
-        return Props.create(new DataTreeChangeListenerRegistrationCreator(registration));
-    }
-
-    private static final class DataTreeChangeListenerRegistrationCreator
-            implements Creator<DataTreeChangeListenerRegistrationActor> {
-        private static final long serialVersionUID = 1L;
-
-        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
-                + "create remote instances of this actor and thus don't need it to be Serializable.")
-        final ListenerRegistration<DOMDataTreeChangeListener> registration;
-
-        DataTreeChangeListenerRegistrationCreator(ListenerRegistration<DOMDataTreeChangeListener> registration) {
-            this.registration = Preconditions.checkNotNull(registration);
-        }
-
-        @Override
-        public DataTreeChangeListenerRegistrationActor create() {
-            return new DataTreeChangeListenerRegistrationActor(registration);
-        }
-    }
-}
index 9a44b47..4f70327 100644 (file)
@@ -9,84 +9,37 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
-        RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration,
-        ListenerRegistration<DOMDataTreeChangeListener>> {
-
-    private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+        RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration> {
 
     DataTreeChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
-    Collection<ActorSelection> getListenerActors() {
-        return new ArrayList<>(listenerActors);
-    }
-
     @Override
-    ListenerRegistration<DOMDataTreeChangeListener> createDelegate(
-            final RegisterDataTreeChangeListener message) {
-        final ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
+    void doRegistration(final RegisterDataTreeChangeListener message, final ActorRef registrationActor) {
+        final ActorSelection listenerActor = processListenerRegistrationMessage(message);
 
-        // Notify the listener if notifications should be enabled or not
-        // If this shard is the leader then it will enable notifications else
-        // it will not
-        dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
-
-        // Now store a reference to the data change listener so it can be notified
-        // at a later point if notifications should be enabled or disabled
-        if (!message.isRegisterOnAllInstances()) {
-            addListenerActor(dataChangeListenerPath);
-        }
-
-        DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath);
+        DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(listenerActor);
 
         log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
 
-        Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> regEntry =
-                getShard().getDataStore().registerTreeChangeListener(message.getPath(), listener);
-
-        getShard().getDataStore().notifyOfInitialData(message.getPath(),
-                regEntry.getKey().getInstance(), regEntry.getValue());
-
-        listenerActors.add(dataChangeListenerPath);
-        final ListenerRegistration<DOMDataTreeChangeListener> delegate = regEntry.getKey();
-        return new ListenerRegistration<DOMDataTreeChangeListener>() {
-            @Override
-            public DOMDataTreeChangeListener getInstance() {
-                return delegate.getInstance();
-            }
-
-            @Override
-            public void close() {
-                listenerActors.remove(dataChangeListenerPath);
-                delegate.close();
-            }
-        };
+        final ShardDataTree shardDataTree = getShard().getDataStore();
+        shardDataTree.registerTreeChangeListener(message.getPath(),
+                listener, shardDataTree.readCurrentData(), registration -> registrationActor.tell(
+                        new DataTreeNotificationListenerRegistrationActor.SetRegistration(registration, () ->
+                            removeListenerActor(listenerActor)), ActorRef.noSender()));
     }
 
     @Override
     protected DelayedDataTreeListenerRegistration newDelayedListenerRegistration(
-            RegisterDataTreeChangeListener message) {
-        return new DelayedDataTreeListenerRegistration(message);
-    }
-
-    @Override
-    protected ActorRef newRegistrationActor(ListenerRegistration<DOMDataTreeChangeListener> registration) {
-        return createActor(DataTreeChangeListenerRegistrationActor.props(registration));
+            RegisterDataTreeChangeListener message, ActorRef registrationActor) {
+        return new DelayedDataTreeListenerRegistration(message, registrationActor);
     }
 
     @Override
index a947734..98083a0 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -14,6 +16,7 @@ import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeE
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -59,14 +62,26 @@ final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeLi
     }
 
     @Override
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            DataChangeListenerRegistration<L> registerDataChangeListener(YangInstanceIdentifier path, L listener,
-                    DataChangeScope scope) {
-        return dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+    public void registerDataChangeListener(YangInstanceIdentifier path,
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                    onRegistration) {
+        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                registration = dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+
+        onRegistration.accept(registration);
+
+        if (initialState.isPresent()) {
+            notifySingleListener(path, listener, scope, initialState.get());
+        }
     }
 
-    @Override
-    public ShardDataChangeListenerPublisher newInstance() {
-        return new DefaultShardDataChangeListenerPublisher();
+    static void notifySingleListener(final YangInstanceIdentifier path,
+            final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
+            final DataChangeScope scope, final DataTreeCandidate initialState) {
+        DefaultShardDataChangeListenerPublisher publisher = new DefaultShardDataChangeListenerPublisher();
+        publisher.registerDataChangeListener(path, listener, scope, Optional.absent(), noop -> { });
+        publisher.publishChanges(initialState, "");
     }
 }
index a63859e..02326e0 100644 (file)
@@ -7,9 +7,11 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Optional;
 import java.util.Collection;
+import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
 import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTreeChangePublisher;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -34,11 +36,6 @@ final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStore
         processCandidateTree(candidate);
     }
 
-    @Override
-    public ShardDataTreeChangeListenerPublisher newInstance() {
-        return new DefaultShardDataTreeChangeListenerPublisher();
-    }
-
     @Override
     protected void notifyListener(AbstractDOMDataTreeChangeListenerRegistration<?> registration,
             Collection<DataTreeCandidate> changes) {
@@ -51,18 +48,32 @@ final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStore
     }
 
     @Override
-    public <L extends org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> ListenerRegistration<L>
-            registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
-        final AbstractDOMDataTreeChangeListenerRegistration<DOMDataTreeChangeListener> registration =
-            super.registerTreeChangeListener(treeId, (org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener)
-                changes -> listener.onDataTreeChanged(changes));
+    public void registerTreeChangeListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+        AbstractDOMDataTreeChangeListenerRegistration<org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener>
+            registration = super.registerTreeChangeListener(treeId,
+                (org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener)changes ->
+                    listener.onDataTreeChanged(changes));
+
+        onRegistration.accept(
+            new org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration<
+                    DOMDataTreeChangeListener>(listener) {
+                @Override
+                protected void removeRegistration() {
+                    registration.close();
+                }
+            });
+
+        if (initialState.isPresent()) {
+            notifySingleListener(treeId, listener, initialState.get());
+        }
+    }
 
-        return new org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration<L>(
-                listener) {
-            @Override
-            protected void removeRegistration() {
-                registration.close();
-            }
-        };
+    static void notifySingleListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
+            DataTreeCandidate state) {
+        DefaultShardDataTreeChangeListenerPublisher publisher = new DefaultShardDataTreeChangeListenerPublisher();
+        publisher.registerTreeChangeListener(treeId, listener, Optional.absent(), noop -> { });
+        publisher.publishChanges(state, "");
     }
 }
index c49c3de..a8fe100 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -15,7 +16,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 final class DelayedDataChangeListenerRegistration extends DelayedListenerRegistration<
            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener> {
 
-    DelayedDataChangeListenerRegistration(final RegisterChangeListener registerChangeListener) {
-        super(registerChangeListener);
+    DelayedDataChangeListenerRegistration(final RegisterChangeListener registerChangeListener,
+            final ActorRef registrationActor) {
+        super(registerChangeListener, registrationActor);
     }
-}
\ No newline at end of file
+}
index 35f7308..c67115a 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 
@@ -18,8 +19,9 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 final class DelayedDataTreeListenerRegistration
         extends DelayedListenerRegistration<DOMDataTreeChangeListener, RegisterDataTreeChangeListener> {
 
-    DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) {
-        super(registerTreeChangeListener);
+    DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener,
+            final ActorRef registrationActor) {
+        super(registerTreeChangeListener, registrationActor);
     }
 }
 
index 8d73bc6..18d23aa 100644 (file)
@@ -7,33 +7,32 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import java.util.EventListener;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 
-abstract class DelayedListenerRegistration<L extends EventListener, M> implements ListenerRegistration<L> {
+abstract class DelayedListenerRegistration<L extends EventListener, M extends ListenerRegistrationMessage>
+        implements ListenerRegistration<L> {
     private final M registrationMessage;
-    private volatile ListenerRegistration<L> delegate;
+    private final ActorRef registrationActor;
 
     @GuardedBy("this")
     private boolean closed;
 
-    protected DelayedListenerRegistration(M registrationMessage) {
+    protected DelayedListenerRegistration(M registrationMessage, ActorRef registrationActor) {
         this.registrationMessage = registrationMessage;
+        this.registrationActor = registrationActor;
     }
 
     M getRegistrationMessage() {
         return registrationMessage;
     }
 
-    ListenerRegistration<L> getDelegate() {
-        return delegate;
-    }
-
-    synchronized <R extends ListenerRegistration<L>> void createDelegate(
-            final LeaderLocalDelegateFactory<M, R> factory) {
+    synchronized void createDelegate(final AbstractDataListenerSupport<L, M, ?> support) {
         if (!closed) {
-            this.delegate = factory.createDelegate(registrationMessage);
+            support.doRegistration(registrationMessage, registrationActor);
         }
     }
 
@@ -50,11 +49,6 @@ abstract class DelayedListenerRegistration<L extends EventListener, M> implement
 
     @Override
     public synchronized void close() {
-        if (!closed) {
-            closed = true;
-            if (delegate != null) {
-                delegate.close();
-            }
-        }
+        closed = true;
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java
deleted file mode 100644 (file)
index cd1b548..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-/**
- * Base class for factories instantiating delegates.
- *
- * @param <M> message type
- * @param <D> delegate type
- */
-abstract class DelegateFactory<M, D> {
-    abstract D createDelegate(M message);
-}
index 0f76629..5f58bc9 100644 (file)
@@ -20,7 +20,7 @@ import com.google.common.base.Preconditions;
  * @param <D> delegate type
  * @param <M> message type
  */
-abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
+abstract class LeaderLocalDelegateFactory<M> {
     private final Shard shard;
 
     protected LeaderLocalDelegateFactory(final Shard shard) {
index 39ee810..aca69c2 100644 (file)
@@ -188,9 +188,9 @@ public class Shard extends RaftActor {
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
         ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
-                new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
+                new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
         ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
-                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
+                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
         if (builder.getDataTree() != null) {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
                     treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
index 87a0c8d..3558107 100644 (file)
@@ -7,11 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 /**
  * Interface for a class that generates and publishes notifications for DataChangeListeners.
@@ -19,8 +22,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * @author Thomas Pantelis
  */
 interface ShardDataChangeListenerPublisher extends ShardDataTreeNotificationPublisher {
-    ShardDataChangeListenerPublisher newInstance();
-
-    <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L>
-            registerDataChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope);
+    void registerDataChangeListener(YangInstanceIdentifier path,
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                onRegistration);
 }
index e3c7183..a17c603 100644 (file)
@@ -8,12 +8,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 /**
  * Implementation of ShardDataChangeListenerPublisher that offloads the generation and publication
@@ -25,30 +30,22 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 class ShardDataChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
         implements ShardDataChangeListenerPublisher {
 
-    private final ShardDataChangeListenerPublisher delegatePublisher = new DefaultShardDataChangeListenerPublisher();
-
-    ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) {
-        super(actorContext, actorName);
-    }
-
-    private ShardDataChangeListenerPublisherActorProxy(ShardDataChangeListenerPublisherActorProxy other) {
-        super(other);
-    }
-
-    @Override
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            DataChangeListenerRegistration<L> registerDataChangeListener(YangInstanceIdentifier path, L listener,
-                    DataChangeScope scope) {
-        return delegatePublisher.registerDataChangeListener(path, listener, scope);
+    ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName, String logContext) {
+        super(actorContext, actorName, logContext);
     }
 
     @Override
-    public ShardDataChangeListenerPublisher newInstance() {
-        return new ShardDataChangeListenerPublisherActorProxy(this);
+    public void registerDataChangeListener(YangInstanceIdentifier path,
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                onRegistration) {
+        notifierActor().tell(new ShardDataChangePublisherActor.RegisterListener(path, listener, scope, initialState,
+                onRegistration), ActorRef.noSender());
     }
 
     @Override
-    protected ShardDataTreeNotificationPublisher getDelegatePublisher() {
-        return delegatePublisher;
+    protected Props props() {
+        return ShardDataChangePublisherActor.props(actorName(), logContext());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangePublisherActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangePublisherActor.java
new file mode 100644 (file)
index 0000000..cb7d80d
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.function.Consumer;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Actor used to generate and publish DataChange notifications.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardDataChangePublisherActor
+        extends ShardDataTreeNotificationPublisherActor<ShardDataChangeListenerPublisher> {
+
+    private ShardDataChangePublisherActor(final String name, final String logContext) {
+        super(new DefaultShardDataChangeListenerPublisher(), name, logContext);
+    }
+
+    @Override
+    protected void handleReceive(Object message) {
+        if (message instanceof RegisterListener) {
+            RegisterListener reg = (RegisterListener)message;
+            if (reg.initialState.isPresent()) {
+                DefaultShardDataChangeListenerPublisher.notifySingleListener(reg.path, reg.listener, reg.scope,
+                        reg.initialState.get());
+            }
+
+            publisher().registerDataChangeListener(reg.path, reg.listener, reg.scope, Optional.absent(),
+                    reg.onRegistration);
+        } else {
+            super.handleReceive(message);
+        }
+    }
+
+    static Props props(final String name, final String logContext) {
+        return Props.create(ShardDataChangePublisherActor.class, name, logContext);
+    }
+
+    static class RegisterListener {
+        private final YangInstanceIdentifier path;
+        private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+        private final DataChangeScope scope;
+        private final Optional<DataTreeCandidate> initialState;
+        private final Consumer<ListenerRegistration<
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> onRegistration;
+
+        RegisterListener(final YangInstanceIdentifier path,
+                final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
+                final DataChangeScope scope, final Optional<DataTreeCandidate> initialState,
+                final Consumer<ListenerRegistration<
+                    AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> onRegistration) {
+            this.path = Preconditions.checkNotNull(path);
+            this.listener = Preconditions.checkNotNull(listener);
+            this.scope = Preconditions.checkNotNull(scope);
+            this.initialState = Preconditions.checkNotNull(initialState);
+            this.onRegistration = Preconditions.checkNotNull(onRegistration);
+        }
+    }
+}
index 9abb002..f7d9827 100644 (file)
@@ -24,7 +24,6 @@ import com.google.common.primitives.UnsignedLong;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.IOException;
-import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -63,7 +62,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -544,25 +542,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         dataChangeListenerPublisher.publishChanges(candidate, logContext);
     }
 
-    void notifyOfInitialData(final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-            NormalizedNode<?, ?>>> listenerReg, final Optional<DataTreeCandidate> currentState) {
-        if (currentState.isPresent()) {
-            ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance();
-            localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
-                    listenerReg.getScope());
-            localPublisher.publishChanges(currentState.get(), logContext);
-        }
-    }
-
-    void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
-            final Optional<DataTreeCandidate> currentState) {
-        if (currentState.isPresent()) {
-            ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance();
-            localPublisher.registerTreeChangeListener(path, listener);
-            localPublisher.publishChanges(currentState.get(), logContext);
-        }
-    }
-
     /**
      * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
      * replication callbacks.
@@ -615,29 +594,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
     }
 
-    Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-            Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
-                    final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
-                    final DataChangeScope scope) {
-        DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
-                dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
-
-        return new SimpleEntry<>(reg, readCurrentData());
+    void registerDataChangeListener(YangInstanceIdentifier path,
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                    onRegistration) {
+        dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration);
     }
 
-    private Optional<DataTreeCandidate> readCurrentData() {
+    Optional<DataTreeCandidate> readCurrentData() {
         final Optional<NormalizedNode<?, ?>> currentState =
                 dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
         return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
             YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
     }
 
-    public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>>
-            registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
-        final ListenerRegistration<DOMDataTreeChangeListener> reg =
-                treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
-
-        return new SimpleEntry<>(reg, readCurrentData());
+    public void registerTreeChangeListener(YangInstanceIdentifier path, DOMDataTreeChangeListener listener,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+        treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
     }
 
     int getQueueSize() {
index d4a5156..7f20bee 100644 (file)
@@ -7,13 +7,20 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 /**
  * Interface for a class that generates and publishes notifications for DataTreeChangeListeners.
  *
  * @author Thomas Pantelis
  */
-interface ShardDataTreeChangeListenerPublisher extends ShardDataTreeNotificationPublisher, DOMStoreTreeChangePublisher {
-    ShardDataTreeChangeListenerPublisher newInstance();
+interface ShardDataTreeChangeListenerPublisher extends ShardDataTreeNotificationPublisher {
+    void registerTreeChangeListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration);
 }
index 7196f83..ceaeecc 100644 (file)
@@ -8,10 +8,15 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 /**
  * Implementation of ShardDataTreeChangeListenerPublisher that offloads the generation and publication
@@ -23,30 +28,20 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 class ShardDataTreeChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
         implements ShardDataTreeChangeListenerPublisher {
 
-    private final ShardDataTreeChangeListenerPublisher delegatePublisher =
-            new DefaultShardDataTreeChangeListenerPublisher();
-
-    ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) {
-        super(actorContext, actorName);
-    }
-
-    private ShardDataTreeChangeListenerPublisherActorProxy(ShardDataTreeChangeListenerPublisherActorProxy other) {
-        super(other);
-    }
-
-    @Override
-    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
-            YangInstanceIdentifier treeId, L listener) {
-        return delegatePublisher.registerTreeChangeListener(treeId, listener);
+    ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName, String logContext) {
+        super(actorContext, actorName, logContext);
     }
 
     @Override
-    public ShardDataTreeChangeListenerPublisher newInstance() {
-        return new ShardDataTreeChangeListenerPublisherActorProxy(this);
+    public void registerTreeChangeListener(YangInstanceIdentifier treeId,
+            DOMDataTreeChangeListener listener, Optional<DataTreeCandidate> currentState,
+            Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+        notifierActor().tell(new ShardDataTreeChangePublisherActor.RegisterListener(treeId, listener, currentState,
+                onRegistration), ActorRef.noSender());
     }
 
     @Override
-    protected ShardDataTreeNotificationPublisher getDelegatePublisher() {
-        return delegatePublisher;
+    protected Props props() {
+        return ShardDataTreeChangePublisherActor.props(actorName(), logContext());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisherActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisherActor.java
new file mode 100644 (file)
index 0000000..f054ccd
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.function.Consumer;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Actor used to generate and publish DataTreeChange notifications.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardDataTreeChangePublisherActor
+        extends ShardDataTreeNotificationPublisherActor<ShardDataTreeChangeListenerPublisher> {
+
+    private ShardDataTreeChangePublisherActor(final String name, final String logContext) {
+        super(new DefaultShardDataTreeChangeListenerPublisher(), name, logContext);
+    }
+
+    @Override
+    protected void handleReceive(Object message) {
+        if (message instanceof RegisterListener) {
+            RegisterListener reg = (RegisterListener)message;
+            if (reg.initialState.isPresent()) {
+                DefaultShardDataTreeChangeListenerPublisher.notifySingleListener(reg.path, reg.listener,
+                        reg.initialState.get());
+            }
+
+            publisher().registerTreeChangeListener(reg.path, reg.listener, Optional.absent(), reg.onRegistration);
+        } else {
+            super.handleReceive(message);
+        }
+    }
+
+    static Props props(final String name, final String logContext) {
+        return Props.create(ShardDataTreeChangePublisherActor.class, name, logContext);
+    }
+
+    static class RegisterListener {
+        private final YangInstanceIdentifier path;
+        private final DOMDataTreeChangeListener listener;
+        private final Optional<DataTreeCandidate> initialState;
+        private final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration;
+
+        RegisterListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+                final Optional<DataTreeCandidate> initialState,
+                final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+            this.path = Preconditions.checkNotNull(path);
+            this.listener = Preconditions.checkNotNull(listener);
+            this.initialState = Preconditions.checkNotNull(initialState);
+            this.onRegistration = Preconditions.checkNotNull(onRegistration);
+        }
+    }
+}
index 76c52c9..ea08830 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.Props;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
@@ -19,31 +18,43 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
  *
  * @author Thomas Pantelis
  */
-public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActor {
+public class ShardDataTreeNotificationPublisherActor<T extends ShardDataTreeNotificationPublisher>
+        extends AbstractUntypedActor {
+    private final T publisher;
     private final Stopwatch timer = Stopwatch.createUnstarted();
     private final String name;
+    private final String logContext;
 
-    private ShardDataTreeNotificationPublisherActor(String name) {
+    protected ShardDataTreeNotificationPublisherActor(final T publisher, final String name, final String logContext) {
+        this.publisher = publisher;
         this.name = name;
+        this.logContext = logContext;
+    }
+
+    protected T publisher() {
+        return publisher;
+    }
+
+    protected String logContext() {
+        return logContext;
     }
 
     @Override
     protected void handleReceive(Object message) {
         if (message instanceof PublishNotifications) {
-            PublishNotifications publisher = (PublishNotifications)message;
+            PublishNotifications toPublish = (PublishNotifications)message;
             timer.start();
 
             try {
-                publisher.publish();
+                publisher.publishChanges(toPublish.candidate, logContext);
             } finally {
                 long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
 
                 if (elapsedTime >= ShardDataTreeNotificationPublisher.PUBLISH_DELAY_THRESHOLD_IN_MS) {
                     LOG.warn("{}: Generation of change events for {} took longer than expected. Elapsed time: {}",
-                            publisher.logContext, name, timer);
+                            logContext, name, timer);
                 } else {
-                    LOG.debug("{}: Elapsed time for generation of change events for {}: {}", publisher.logContext,
-                            name, timer);
+                    LOG.debug("{}: Elapsed time for generation of change events for {}: {}", logContext, name, timer);
                 }
 
                 timer.reset();
@@ -51,24 +62,11 @@ public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActo
         }
     }
 
-    static Props props(String notificationType) {
-        return Props.create(ShardDataTreeNotificationPublisherActor.class, notificationType);
-    }
-
     static class PublishNotifications {
-        private final ShardDataTreeNotificationPublisher publisher;
         private final DataTreeCandidate candidate;
-        private final String logContext;
 
-        PublishNotifications(ShardDataTreeNotificationPublisher publisher, DataTreeCandidate candidate,
-                String logContext) {
-            this.publisher = publisher;
+        PublishNotifications(DataTreeCandidate candidate) {
             this.candidate = candidate;
-            this.logContext = logContext;
-        }
-
-        private void publish() {
-            publisher.publishChanges(candidate, logContext);
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActor.java
new file mode 100644 (file)
index 0000000..2a60abb
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Actor co-located with a shard. It exists only to terminate the registration when
+ * asked to do so via {@link CloseDataTreeNotificationListenerRegistration}.
+ */
+public final class DataTreeNotificationListenerRegistrationActor extends AbstractUntypedActor {
+    @VisibleForTesting
+    static long killDelay = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);
+
+    private ListenerRegistration<?> registration;
+    private Runnable onClose;
+    private boolean closed;
+    private Cancellable killSchedule;
+
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if (message instanceof CloseDataTreeNotificationListenerRegistration) {
+            closeListenerRegistration();
+            if (isValidSender(getSender())) {
+                getSender().tell(CloseDataTreeNotificationListenerRegistrationReply.getInstance(), getSelf());
+            }
+        } else if (message instanceof SetRegistration) {
+            registration = ((SetRegistration)message).registration;
+            onClose = ((SetRegistration)message).onClose;
+            if (closed) {
+                closeListenerRegistration();
+            }
+        } else {
+            unknownMessage(message);
+        }
+    }
+
+    private void closeListenerRegistration() {
+        closed = true;
+        if (registration != null) {
+            registration.close();
+            onClose.run();
+            registration = null;
+
+            if (killSchedule == null) {
+                killSchedule = getContext().system().scheduler().scheduleOnce(Duration.create(killDelay,
+                        TimeUnit.MILLISECONDS), getSelf(), PoisonPill.getInstance(), getContext().dispatcher(),
+                        ActorRef.noSender());
+            }
+        }
+    }
+
+    public static Props props() {
+        return Props.create(DataTreeNotificationListenerRegistrationActor.class);
+    }
+
+    public static class SetRegistration {
+        private final ListenerRegistration<?> registration;
+        private final Runnable onClose;
+
+        public SetRegistration(final ListenerRegistration<?> registration, final Runnable onClose) {
+            this.registration = Preconditions.checkNotNull(registration);
+            this.onClose = Preconditions.checkNotNull(onClose);
+        }
+    }
+}
index 1989bad..6094e24 100644 (file)
@@ -12,6 +12,7 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
 
+import com.google.common.base.Optional;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
@@ -24,7 +25,7 @@ public abstract class AbstractEntityOwnerChangeListener implements DOMDataTreeCh
             .node(ENTITY_OWNER_QNAME).build();
 
     void init(ShardDataTree shardDataTree) {
-        shardDataTree.registerTreeChangeListener(EOS_PATH, this);
+        shardDataTree.registerTreeChangeListener(EOS_PATH, this, Optional.absent(), noop -> { });
     }
 
     protected static String extractOwner(LeafNode<?> ownerLeaf) {
index 3b4f2d9..6b6717c 100644 (file)
@@ -14,6 +14,7 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
 
 import akka.actor.ActorRef;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -60,7 +61,7 @@ class CandidateListChangeListener implements DOMDataTreeChangeListener {
     void init(ShardDataTree shardDataTree) {
         shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH)
                 .node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME)
-                    .node(Candidate.QNAME).node(Candidate.QNAME).build(), this);
+                    .node(Candidate.QNAME).node(Candidate.QNAME).build(), this, Optional.absent(), noop -> { });
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java
deleted file mode 100644 (file)
index 8c35caa..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-public class CloseDataChangeListenerRegistration {
-    public static final CloseDataChangeListenerRegistration INSTANCE = new CloseDataChangeListenerRegistration();
-
-    private CloseDataChangeListenerRegistration() {
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java
deleted file mode 100644 (file)
index 430202a..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-
-public class CloseDataChangeListenerRegistrationReply {
-    public static final CloseDataChangeListenerRegistrationReply INSTANCE =
-            new CloseDataChangeListenerRegistrationReply();
-
-    private CloseDataChangeListenerRegistrationReply() {
-    }
-}
@@ -10,15 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import java.io.ObjectStreamException;
 import java.io.Serializable;
 
-public final class CloseDataTreeChangeListenerRegistration implements Serializable {
+public final class CloseDataTreeNotificationListenerRegistration implements Serializable {
     private static final long serialVersionUID = 1L;
-    private static final CloseDataTreeChangeListenerRegistration INSTANCE =
-            new CloseDataTreeChangeListenerRegistration();
+    private static final CloseDataTreeNotificationListenerRegistration INSTANCE =
+            new CloseDataTreeNotificationListenerRegistration();
 
-    private CloseDataTreeChangeListenerRegistration() {
+    private CloseDataTreeNotificationListenerRegistration() {
     }
 
-    public static CloseDataTreeChangeListenerRegistration getInstance() {
+    public static CloseDataTreeNotificationListenerRegistration getInstance() {
         return INSTANCE;
     }
 
@@ -10,16 +10,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import java.io.ObjectStreamException;
 import java.io.Serializable;
 
-public final class CloseDataTreeChangeListenerRegistrationReply implements Serializable {
+public final class CloseDataTreeNotificationListenerRegistrationReply implements Serializable {
     private static final long serialVersionUID = 1L;
-    private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE =
-            new CloseDataTreeChangeListenerRegistrationReply();
+    private static final CloseDataTreeNotificationListenerRegistrationReply INSTANCE =
+            new CloseDataTreeNotificationListenerRegistrationReply();
 
-    private CloseDataTreeChangeListenerRegistrationReply() {
+    private CloseDataTreeNotificationListenerRegistrationReply() {
         // Use getInstance() instead
     }
 
-    public static CloseDataTreeChangeListenerRegistrationReply getInstance() {
+    public static CloseDataTreeNotificationListenerRegistrationReply getInstance() {
         return INSTANCE;
     }
 
index 5cea06b..3f016a4 100644 (file)
@@ -7,10 +7,13 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import akka.actor.ActorPath;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public interface ListenerRegistrationMessage {
     YangInstanceIdentifier getPath();
 
     boolean isRegisterOnAllInstances();
+
+    ActorPath getListenerActorPath();
 }
index f5d8698..163525a 100644 (file)
@@ -15,15 +15,14 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class RegisterChangeListener implements ListenerRegistrationMessage {
     private final YangInstanceIdentifier path;
-    private final ActorRef dataChangeListener;
+    private final ActorRef dataChangeListenerActor;
     private final AsyncDataBroker.DataChangeScope scope;
     private final boolean registerOnAllInstances;
 
-    public RegisterChangeListener(YangInstanceIdentifier path,
-        ActorRef dataChangeListener,
-        AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) {
+    public RegisterChangeListener(YangInstanceIdentifier path, ActorRef dataChangeListenerActor,
+            AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) {
         this.path = path;
-        this.dataChangeListener = dataChangeListener;
+        this.dataChangeListenerActor = dataChangeListenerActor;
         this.scope = scope;
         this.registerOnAllInstances = registerOnAllInstances;
     }
@@ -37,8 +36,9 @@ public class RegisterChangeListener implements ListenerRegistrationMessage {
         return scope;
     }
 
-    public ActorPath getDataChangeListenerPath() {
-        return dataChangeListener.path();
+    @Override
+    public ActorPath getListenerActorPath() {
+        return dataChangeListenerActor.path();
     }
 
     @Override
index c9c0c0c..abf072b 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import java.io.Externalizable;
@@ -42,8 +43,9 @@ public final class RegisterDataTreeChangeListener implements Externalizable, Lis
         return path;
     }
 
-    public ActorRef getDataTreeChangeListenerPath() {
-        return dataTreeChangeListenerPath;
+    @Override
+    public ActorPath getListenerActorPath() {
+        return dataTreeChangeListenerPath.path();
     }
 
     @Override
index 1c733cc..17a55b9 100644 (file)
@@ -30,7 +30,7 @@ import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
@@ -114,7 +114,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
                 proxy.close();
 
                 // The listener registration actor should get a Close message
-                expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
+                expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
 
                 // The DataChangeListener actor should be terminated
                 expectMsgClass(timeout, Terminated.class);
@@ -174,7 +174,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
                 proxy.close();
 
                 // The listener registration actor should get a Close message
-                expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
+                expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
 
                 // The DataChangeListener actor should be terminated
                 expectMsgClass(timeout, Terminated.class);
@@ -326,7 +326,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
                 proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
                         AsyncDataBroker.DataChangeScope.ONE);
 
-                expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.class);
+                expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class);
 
                 Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
                 proxy.close();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java
deleted file mode 100644 (file)
index d23cfea..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public class DataChangeListenerRegistrationTest extends AbstractActorTest {
-    private static final InMemoryDOMDataStore STORE = new InMemoryDOMDataStore("OPER",
-            MoreExecutors.newDirectExecutorService());
-
-    static {
-        STORE.onGlobalContextUpdated(TestModel.createTestContext());
-    }
-
-    @Test
-    public void testOnReceiveCloseListenerRegistration() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                final Props props = DataChangeListenerRegistrationActor.props(STORE.registerChangeListener(
-                        TestModel.TEST_PATH, noOpDataChangeListener(), AsyncDataBroker.DataChangeScope.BASE));
-                final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
-
-                new Within(duration("1 seconds")) {
-                    @Override
-                    protected void run() {
-
-                        subject.tell(CloseDataChangeListenerRegistration.INSTANCE, getRef());
-
-                        final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                            // do not put code outside this method, will run
-                            // afterwards
-                            @Override
-                            protected String match(final Object in) {
-                                if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.class)) {
-                                    return "match";
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                        assertEquals("match", out);
-
-                        expectNoMsg();
-                    }
-
-                };
-            }
-        };
-    }
-
-    private static AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
-        return change -> {
-        };
-    }
-}
index e6b20a7..6f2f99b 100644 (file)
@@ -29,7 +29,7 @@ import org.junit.Test;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
@@ -93,7 +93,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 proxy.close();
 
                 // The listener registration actor should get a Close message
-                expectMsgClass(timeout, CloseDataTreeChangeListenerRegistration.class);
+                expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
 
                 // The DataChangeListener actor should be terminated
                 expectMsgClass(timeout, Terminated.class);
@@ -277,7 +277,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
 
                 proxy.init(shardName);
 
-                expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class);
+                expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class);
 
                 Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
             }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActorTest.java
deleted file mode 100644 (file)
index 9259a61..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-
-public class DataTreeChangeListenerRegistrationActorTest extends AbstractActorTest {
-    private static final InMemoryDOMDataStore STORE = new InMemoryDOMDataStore("OPER",
-            MoreExecutors.newDirectExecutorService());
-
-    static {
-        STORE.onGlobalContextUpdated(TestModel.createTestContext());
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Test
-    public void testOnReceiveCloseListenerRegistration() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                final ListenerRegistration mockListenerReg = Mockito.mock(ListenerRegistration.class);
-                final Props props = DataTreeChangeListenerRegistrationActor.props(mockListenerReg);
-                final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
-
-                subject.tell(CloseDataTreeChangeListenerRegistration.getInstance(), getRef());
-
-                expectMsgClass(duration("1 second"), CloseDataTreeChangeListenerRegistrationReply.class);
-
-                Mockito.verify(mockListenerReg).close();
-            }
-        };
-    }
-}
index 00f0f4b..1426e4a 100644 (file)
@@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
@@ -92,8 +92,8 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
 
         listener.reset(1);
         JavaTestKit kit = new JavaTestKit(getSystem());
-        entry.getValue().tell(CloseDataTreeChangeListenerRegistration.getInstance(), kit.getRef());
-        kit.expectMsgClass(JavaTestKit.duration("5 seconds"), CloseDataTreeChangeListenerRegistrationReply.class);
+        entry.getValue().tell(CloseDataTreeNotificationListenerRegistration.getInstance(), kit.getRef());
+        kit.expectMsgClass(JavaTestKit.duration("5 seconds"), CloseDataTreeNotificationListenerRegistrationReply.class);
 
         writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
         listener.verifyNoNotifiedData(TEST_PATH);
index 31e36cf..e3aae92 100644 (file)
@@ -63,6 +63,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardData
 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@@ -117,6 +118,8 @@ public class DistributedDataStoreIntegrationTest {
 
     @Before
     public void setUp() throws IOException {
+        InMemorySnapshotStore.clear();
+        InMemoryJournal.clear();
         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
         Cluster.get(system).join(member1Address);
@@ -1167,6 +1170,10 @@ public class DistributedDataStoreIntegrationTest {
 
                     assertNotNull("registerChangeListener returned null", listenerReg);
 
+                    IntegrationTestKit.verifyShardState(dataStore, "test-1",
+                        state -> assertEquals("getDataChangeListenerActors", 1,
+                                state.getDataChangeListenerActors().size()));
+
                     // Wait for the initial notification
                     listener.waitForChangeEvents(TestModel.TEST_PATH);
                     listener.reset(2);
@@ -1184,6 +1191,63 @@ public class DistributedDataStoreIntegrationTest {
                     listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
                     listenerReg.close();
 
+                    IntegrationTestKit.verifyShardState(dataStore, "test-1",
+                        state -> assertEquals("getDataChangeListenerActors", 0,
+                                state.getDataChangeListenerActors().size()));
+
+                    testWriteTransaction(dataStore,
+                            YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
+                            ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
+
+                    listener.expectNoMoreChanges("Received unexpected change after close");
+                }
+            }
+        };
+    }
+
+    @Test
+    public void testDataTreeChangeListenerRegistration() throws Exception {
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+            {
+                try (final AbstractDataStore dataStore = setupAbstractDataStore(
+                        testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
+
+                    testWriteTransaction(dataStore, TestModel.TEST_PATH,
+                            ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+                    final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+
+                    ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
+                            .registerTreeChangeListener(TestModel.TEST_PATH, listener);
+
+                    assertNotNull("registerTreeChangeListener returned null", listenerReg);
+
+                    IntegrationTestKit.verifyShardState(dataStore, "test-1",
+                        state -> assertEquals("getTreeChangeListenerActors", 1,
+                                state.getTreeChangeListenerActors().size()));
+
+                    // Wait for the initial notification
+                    listener.waitForChangeEvents(TestModel.TEST_PATH);
+                    listener.reset(2);
+
+                    // Write 2 updates.
+                    testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+                            ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+                    YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                            .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+                    testWriteTransaction(dataStore, listPath,
+                            ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+                    // Wait for the 2 updates.
+                    listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
+                    listenerReg.close();
+
+                    IntegrationTestKit.verifyShardState(dataStore, "test-1",
+                        state -> assertEquals("getTreeChangeListenerActors", 0,
+                                state.getTreeChangeListenerActors().size()));
+
                     testWriteTransaction(dataStore,
                             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
index 42d0337..3ca0f9c 100644 (file)
@@ -174,7 +174,8 @@ public class ShardDataTreeTest extends AbstractTest {
         immediatePayloadReplication(shardDataTree, mockShard);
 
         DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
-        shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener);
+        shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener,
+                Optional.absent(), noop -> { });
 
         addCar(shardDataTree, "optima");
 
index 45239ac..0bd34aa 100644 (file)
@@ -59,6 +59,8 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -2122,10 +2124,10 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
+    public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final String testName = "testClusteredDataChangeListenerDelayedRegistration";
+                final String testName = "testClusteredDataChangeListenerWithDelayedRegistration";
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
@@ -2206,10 +2208,10 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+    public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
             {
-                final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+                final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
@@ -2238,6 +2240,43 @@ public class ShardTest extends AbstractShardTest {
         };
     }
 
+    @Test
+    public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
+        new ShardTestKit(getSystem()) {
+            {
+                final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
+                dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
+                        .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+                final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                        TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+                setupInMemorySnapshotStore();
+
+                final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                        newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        actorFactory.generateActorId(testName + "-shard"));
+
+                waitUntilNoLeader(shard);
+
+                shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeChangeListenerReply.class);
+                assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+                final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
+                regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+                expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
+
+                shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
+                        .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
+
+                listener.expectNoMoreChanges("Received unexpected change after close");
+            }
+        };
+    }
+
     @Test
     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java
new file mode 100644 (file)
index 0000000..12428e9
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors;
+
+import static org.mockito.Mockito.timeout;
+
+import akka.actor.ActorRef;
+import akka.testkit.JavaTestKit;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+public class DataTreeNotificationListenerRegistrationActorTest extends AbstractActorTest {
+    @Mock
+    private ListenerRegistration<?> mockListenerReg;
+
+    @Mock
+    private Runnable mockOnClose;
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+        DataTreeNotificationListenerRegistrationActor.killDelay = 100;
+    }
+
+    @Test
+    public void testOnReceiveCloseListenerRegistrationAfterSetRegistration() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(),
+                        "testOnReceiveCloseListenerRegistrationAfterSetRegistration");
+                watch(subject);
+
+                subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg,
+                        mockOnClose), ActorRef.noSender());
+                subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+
+                expectMsgClass(duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class);
+
+                Mockito.verify(mockListenerReg, timeout(5000)).close();
+                Mockito.verify(mockOnClose, timeout(5000)).run();
+
+                expectTerminated(duration("5 second"), subject);
+            }
+        };
+    }
+
+    @Test
+    public void testOnReceiveCloseListenerRegistrationBeforeSetRegistration() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(),
+                        "testOnReceiveSetRegistrationAfterPriorClose");
+                watch(subject);
+
+                subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+                expectMsgClass(duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class);
+
+                subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg,
+                        mockOnClose), ActorRef.noSender());
+
+                Mockito.verify(mockListenerReg, timeout(5000)).close();
+                Mockito.verify(mockOnClose, timeout(5000)).run();
+
+                expectTerminated(duration("5 second"), subject);
+            }
+        };
+    }
+
+    @Test
+    public void testOnReceiveSetRegistrationAfterPriorClose() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                DataTreeNotificationListenerRegistrationActor.killDelay = 1000;
+                final ListenerRegistration<?> mockListenerReg2 = Mockito.mock(ListenerRegistration.class);
+                final Runnable mockOnClose2 = Mockito.mock(Runnable.class);
+
+                final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(),
+                        "testOnReceiveSetRegistrationAfterPriorClose");
+                watch(subject);
+
+                subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg,
+                        mockOnClose), ActorRef.noSender());
+                subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
+                subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg2,
+                        mockOnClose2), ActorRef.noSender());
+
+                Mockito.verify(mockListenerReg, timeout(5000)).close();
+                Mockito.verify(mockOnClose, timeout(5000)).run();
+                Mockito.verify(mockListenerReg2, timeout(5000)).close();
+                Mockito.verify(mockOnClose2, timeout(5000)).run();
+
+                expectTerminated(duration("5 second"), subject);
+            }
+        };
+    }
+}
index 4be4315..b414e0f 100644 (file)
@@ -11,6 +11,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Arrays;
@@ -23,6 +24,9 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
@@ -54,12 +58,46 @@ public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
         }
     }
 
-    public void waitForChangeEvents() {
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public void waitForChangeEvents(YangInstanceIdentifier... expPaths) {
         boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS);
         if (!done) {
             fail(String.format("Missing change notifications. Expected: %d. Actual: %d",
                     expChangeEventCount, expChangeEventCount - changeLatch.getCount()));
         }
+
+        for (int i = 0; i < expPaths.length; i++) {
+            final DataTreeCandidate candidate = changeList.get(i);
+            final Optional<NormalizedNode<?, ?>> maybeDataAfter = candidate.getRootNode().getDataAfter();
+            if (!maybeDataAfter.isPresent()) {
+                fail(String.format("Change %d does not contain data after. Actual: %s", i + 1,
+                        candidate.getRootNode()));
+            }
+
+            final NormalizedNode<?, ?> dataAfter = maybeDataAfter.get();
+            final Optional<YangInstanceIdentifier> relativePath = expPaths[i].relativeTo(candidate.getRootPath());
+            if (!relativePath.isPresent()) {
+                assertEquals(String.format("Change %d does not contain %s. Actual: %s", i + 1, expPaths[i],
+                        dataAfter), expPaths[i].getLastPathArgument(), dataAfter.getIdentifier());
+            } else {
+                NormalizedNode<?, ?> nextChild = dataAfter;
+                for (PathArgument pathArg: relativePath.get().getPathArguments()) {
+                    boolean found = false;
+                    if (nextChild instanceof NormalizedNodeContainer) {
+                        Optional<NormalizedNode<?, ?>> maybeChild = ((NormalizedNodeContainer)nextChild)
+                                .getChild(pathArg);
+                        if (maybeChild.isPresent()) {
+                            found = true;
+                            nextChild = maybeChild.get();
+                        }
+                    }
+
+                    if (!found) {
+                        fail(String.format("Change %d does not contain %s. Actual: %s", i + 1, expPaths[i], dataAfter));
+                    }
+                }
+            }
+        }
     }
 
     public void verifyNotifiedData(YangInstanceIdentifier... paths) {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.