Bug 8231: Fix testChangeListenerRegistration failure 08/55108/5
authorTom Pantelis <tompantelis@gmail.com>
Fri, 14 Apr 2017 13:03:51 +0000 (09:03 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Tue, 18 Apr 2017 19:45:12 +0000 (15:45 -0400)
As described in Bug 8231, the sharing of the ListenerTree between the
ShardDataTree and the ShardDataTreeNotificationPublisherActor is
problematic. Therefore the ListenerTree (wrapped by the
DefaultShardDataTreeChangeListenerPublisher) is now owned by the
ShardDataTreeNotificationPublisherActor. On registration, a RegisterListener
messages is sent to the ShardDataTreeNotificationPublisherActor to perform
the on-boarding of the new listener, ie it atomically generates and sends
the initial notification and then adds the listener to the ListenerTree.

This change necessitated some refactoring of the DataChangeListenerSupport
class et al wrt to how the ListenerRegistration is handled. Prior the
ListenerRegistration was passed on creation of the registration actor. This
is now done indirectly by sending a SetRegistration message to the
registration actor via a Consumer callback passed in the RegisterListener
message. When the ListenerRegistration is obtained by the
ShardDataChangePublisherActor, it invokes the Consumer callback.

When a registration is initially delayed due to no leader, the
DelayedListenerRegistration is sent to the registration actor. When the
leader is elected later on, the actual ListenerRegistration is sent and
replaces the DelayedListenerRegistration.

The DOMDataTreeChangeListener registration classes were changed/refactored
similarly.

In addition, the 2 specific registration actor classes were replaced by a
generic reusable DataTreeNotificationListenerRegistrationActor that handles
both listener types. Also the 2 CloseData*ListenerRegistration and
CloseData*ListenerRegistrationReply messages were consolidated.

Change-Id: I79ac76b8044609351e5dd8367b691b589ea35075
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
44 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationActor.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangePublisherActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisherActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnerChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeNotificationListenerRegistration.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java with 62% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeNotificationListenerRegistrationReply.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java with 62% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActorTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataTreeChangeListener.java

index 0821951a1a67b7cc30d7c98dae86994119af5cf6..ac7a9337d9ff0a28507d07f75d199a1f1ed82b4c 100644 (file)
@@ -12,31 +12,36 @@ import akka.actor.ActorSelection;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EventListener;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EventListener;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class AbstractDataListenerSupport<L extends EventListener, M extends ListenerRegistrationMessage,
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class AbstractDataListenerSupport<L extends EventListener, M extends ListenerRegistrationMessage,
-        D extends DelayedListenerRegistration<L, M>, R extends ListenerRegistration<L>>
-                extends LeaderLocalDelegateFactory<M, R> {
+        D extends DelayedListenerRegistration<L, M>> extends LeaderLocalDelegateFactory<M> {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
-    private final ArrayList<D> delayedListenerOnAllRegistrations = new ArrayList<>();
-    private final Collection<ActorSelection> actors = new ArrayList<>();
+    private final Collection<D> delayedListenerRegistrations = ConcurrentHashMap.newKeySet();
+    private final Collection<D> delayedListenerOnAllRegistrations = ConcurrentHashMap.newKeySet();
+    private final Collection<ActorSelection> leaderOnlyListenerActors = ConcurrentHashMap.newKeySet();
+    private final Collection<ActorSelection> allListenerActors = ConcurrentHashMap.newKeySet();
 
     protected AbstractDataListenerSupport(Shard shard) {
         super(shard);
     }
 
 
     protected AbstractDataListenerSupport(Shard shard) {
         super(shard);
     }
 
+    Collection<ActorSelection> getListenerActors() {
+        return new ArrayList<>(allListenerActors);
+    }
+
     @Override
     void onLeadershipChange(boolean isLeader, boolean hasLeader) {
         log.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader);
 
         final EnableNotification msg = new EnableNotification(isLeader);
     @Override
     void onLeadershipChange(boolean isLeader, boolean hasLeader) {
         log.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader);
 
         final EnableNotification msg = new EnableNotification(isLeader);
-        for (ActorSelection dataChangeListener : actors) {
+        for (ActorSelection dataChangeListener : leaderOnlyListenerActors) {
             dataChangeListener.tell(msg, getSelf());
         }
 
             dataChangeListener.tell(msg, getSelf());
         }
 
@@ -46,7 +51,6 @@ abstract class AbstractDataListenerSupport<L extends EventListener, M extends Li
             }
 
             delayedListenerOnAllRegistrations.clear();
             }
 
             delayedListenerOnAllRegistrations.clear();
-            delayedListenerOnAllRegistrations.trimToSize();
         }
 
         if (isLeader) {
         }
 
         if (isLeader) {
@@ -55,7 +59,6 @@ abstract class AbstractDataListenerSupport<L extends EventListener, M extends Li
             }
 
             delayedListenerRegistrations.clear();
             }
 
             delayedListenerRegistrations.clear();
-            delayedListenerRegistrations.trimToSize();
         }
     }
 
         }
     }
 
@@ -63,41 +66,61 @@ abstract class AbstractDataListenerSupport<L extends EventListener, M extends Li
     void onMessage(M message, boolean isLeader, boolean hasLeader) {
         log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader);
 
     void onMessage(M message, boolean isLeader, boolean hasLeader) {
         log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader);
 
-        final ListenerRegistration<L> registration;
+        ActorRef registrationActor = createActor(DataTreeNotificationListenerRegistrationActor.props());
+
         if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
         if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
-            registration = createDelegate(message);
+            doRegistration(message, registrationActor);
         } else {
             log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
         } else {
             log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
-            D delayedReg = newDelayedListenerRegistration(message);
+            D delayedReg = newDelayedListenerRegistration(message, registrationActor);
+            Collection<D> delayedRegList;
             if (message.isRegisterOnAllInstances()) {
             if (message.isRegisterOnAllInstances()) {
-                delayedListenerOnAllRegistrations.add(delayedReg);
+                delayedRegList = delayedListenerOnAllRegistrations;
             } else {
             } else {
-                delayedListenerRegistrations.add(delayedReg);
+                delayedRegList = delayedListenerRegistrations;
             }
 
             }
 
-            registration = delayedReg;
+            delayedRegList.add(delayedReg);
+            registrationActor.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(
+                    delayedReg, () -> delayedRegList.remove(delayedReg)), ActorRef.noSender());
         }
 
         }
 
-        ActorRef registrationActor = newRegistrationActor(registration);
-
         log.debug("{}: {} sending reply, listenerRegistrationPath = {} ", persistenceId(), logName(),
                 registrationActor.path());
 
         tellSender(newRegistrationReplyMessage(registrationActor));
     }
 
         log.debug("{}: {} sending reply, listenerRegistrationPath = {} ", persistenceId(), logName(),
                 registrationActor.path());
 
         tellSender(newRegistrationReplyMessage(registrationActor));
     }
 
+    protected ActorSelection processListenerRegistrationMessage(M message) {
+        final ActorSelection listenerActor = selectActor(message.getListenerActorPath());
+
+        // We have a leader so enable the listener.
+        listenerActor.tell(new EnableNotification(true), getSelf());
+
+        if (!message.isRegisterOnAllInstances()) {
+            // This is a leader-only registration so store a reference to the listener actor so it can be notified
+            // at a later point if notifications should be enabled or disabled.
+            leaderOnlyListenerActors.add(listenerActor);
+        }
+
+        allListenerActors.add(listenerActor);
+
+        return listenerActor;
+    }
+
     protected Logger log() {
         return log;
     }
 
     protected Logger log() {
         return log;
     }
 
-    protected void addListenerActor(ActorSelection actor) {
-        actors.add(actor);
+    protected void removeListenerActor(ActorSelection listenerActor) {
+        allListenerActors.remove(listenerActor);
+        leaderOnlyListenerActors.remove(listenerActor);
     }
 
     }
 
-    protected abstract D newDelayedListenerRegistration(M message);
+    abstract void doRegistration(M message, ActorRef registrationActor);
 
 
-    protected abstract ActorRef newRegistrationActor(ListenerRegistration<L> registration);
+    protected abstract D newDelayedListenerRegistration(M message, ActorRef registrationActor);
 
     protected abstract Object newRegistrationReplyMessage(ActorRef registrationActor);
 
 
     protected abstract Object newRegistrationReplyMessage(ActorRef registrationActor);
 
index b7356d28bcc5dc5331832c883b9d739e2557feac..6b348fb76f904e6de35e105b0bc46cf796aabe32 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 
 import akka.actor.ActorContext;
 import akka.actor.ActorRef;
+import akka.actor.Props;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -28,38 +29,40 @@ abstract class AbstractShardDataTreeNotificationPublisherActorProxy implements S
 
     private final ActorContext actorContext;
     private final String actorName;
 
     private final ActorContext actorContext;
     private final String actorName;
+    private final String logContext;
     private ActorRef notifierActor;
 
     private ActorRef notifierActor;
 
-    protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName) {
+    protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName,
+            String logContext) {
         this.actorContext = actorContext;
         this.actorName = actorName;
         this.actorContext = actorContext;
         this.actorName = actorName;
+        this.logContext = logContext;
     }
 
     }
 
-    protected AbstractShardDataTreeNotificationPublisherActorProxy(
-            AbstractShardDataTreeNotificationPublisherActorProxy other) {
-        this.actorContext = null;
-        this.actorName = null;
-        this.notifierActor = other.getNotifierActor();
+    protected abstract Props props();
+
+    protected final String actorName() {
+        return actorName;
     }
 
     }
 
-    protected abstract ShardDataTreeNotificationPublisher getDelegatePublisher();
+    protected final String logContext() {
+        return logContext;
+    }
 
     @Override
     public void publishChanges(DataTreeCandidate candidate, String logContext) {
 
     @Override
     public void publishChanges(DataTreeCandidate candidate, String logContext) {
-        getNotifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications(
-                getDelegatePublisher(), candidate, logContext), ActorRef.noSender());
+        notifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications(candidate),
+                ActorRef.noSender());
     }
 
     }
 
-    private ActorRef getNotifierActor() {
+    protected final ActorRef notifierActor() {
         if (notifierActor == null) {
             LOG.debug("Creating actor {}", actorName);
 
             String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
                     Dispatchers.DispatcherType.Notification);
         if (notifierActor == null) {
             LOG.debug("Creating actor {}", actorName);
 
             String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
                     Dispatchers.DispatcherType.Notification);
-            notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props(actorName)
-                    .withDispatcher(dispatcher).withMailbox(
-                            org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX),
-                    actorName);
+            notifierActor = actorContext.actorOf(props().withDispatcher(dispatcher).withMailbox(
+                    org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName);
         }
 
         return notifierActor;
         }
 
         return notifierActor;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationActor.java
deleted file mode 100644 (file)
index 89062dc..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.japi.Creator;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public class DataChangeListenerRegistrationActor extends AbstractUntypedActor {
-
-    private final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-        registration;
-
-    public DataChangeListenerRegistrationActor(
-        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
-        this.registration = registration;
-    }
-
-    @Override
-    public void handleReceive(Object message) {
-        if (message instanceof CloseDataChangeListenerRegistration) {
-            closeListenerRegistration();
-        } else {
-            unknownMessage(message);
-        }
-    }
-
-    public static Props props(final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-            NormalizedNode<?, ?>>> registration) {
-        return Props.create(new DataChangeListenerRegistrationCreator(registration));
-    }
-
-    private void closeListenerRegistration() {
-        registration.close();
-
-        if (isValidSender(getSender())) {
-            getSender().tell(CloseDataChangeListenerRegistrationReply.INSTANCE, getSelf());
-        }
-
-        getSelf().tell(PoisonPill.getInstance(), getSelf());
-    }
-
-    private static class DataChangeListenerRegistrationCreator
-                                            implements Creator<DataChangeListenerRegistrationActor> {
-        private static final long serialVersionUID = 1L;
-
-        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
-                + "create remote instances of this actor and thus don't need it to be Serializable.")
-        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-                                                           NormalizedNode<?, ?>>> registration;
-
-        DataChangeListenerRegistrationCreator(
-                ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-                                                             NormalizedNode<?, ?>>> registration) {
-            this.registration = registration;
-        }
-
-        @Override
-        public DataChangeListenerRegistrationActor create() throws Exception {
-            return new DataChangeListenerRegistrationActor(registration);
-        }
-    }
-}
index f0f4b7b9b6aba47481c6c0ea8bc890f8ed478f28..3fe3933fe72c08b84f33b747760ce6e3f6f17aa9 100644 (file)
@@ -15,7 +15,7 @@ import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -86,7 +86,8 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         }
 
         if (sendCloseMessage) {
         }
 
         if (sendCloseMessage) {
-            listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, null);
+            listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+                    ActorRef.noSender());
         }
     }
 
         }
     }
 
@@ -145,7 +146,8 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         }
 
         if (sendCloseMessage) {
         }
 
         if (sendCloseMessage) {
-            listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, ActorRef.noSender());
+            listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+                    ActorRef.noSender());
             listenerRegistrationActor = null;
         }
 
             listenerRegistrationActor = null;
         }
 
index 2e26e6ee36d170d4bb0d3da3ae3d276d8f62305c..6b0d8294d40313f3142cc9024722c827d9ebc41c 100644 (file)
@@ -9,102 +9,41 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 final class DataChangeListenerSupport extends AbstractDataListenerSupport<
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener,
 
 final class DataChangeListenerSupport extends AbstractDataListenerSupport<
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener,
-            DelayedDataChangeListenerRegistration, DataChangeListenerRegistration<
-                    AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
-
-    private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+            DelayedDataChangeListenerRegistration> {
 
     DataChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
 
     DataChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
-    Collection<ActorSelection> getListenerActors() {
-        return new ArrayList<>(listenerActors);
-    }
-
     @Override
     @Override
-    DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            createDelegate(final RegisterChangeListener message) {
-        final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
-
-        // Notify the listener if notifications should be enabled or not
-        // If this shard is the leader then it will enable notifications else
-        // it will not
-        dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
-
-        // Now store a reference to the data change listener so it can be notified
-        // at a later point if notifications should be enabled or disabled
-        if (!message.isRegisterOnAllInstances()) {
-            addListenerActor(dataChangeListenerPath);
-        }
+    void doRegistration(final RegisterChangeListener message, final ActorRef registrationActor) {
+        final ActorSelection listenerActor = processListenerRegistrationMessage(message);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                new DataChangeListenerProxy(dataChangeListenerPath);
+                new DataChangeListenerProxy(listenerActor);
 
         log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
 
 
         log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
 
-        Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-                Optional<DataTreeCandidate>> regEntry = getShard().getDataStore().registerChangeListener(
-                        message.getPath(), listener, message.getScope());
-
-        getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue());
-
-        listenerActors.add(dataChangeListenerPath);
-        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            delegate = regEntry.getKey();
-        return new DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-                NormalizedNode<?,?>>>() {
-            @Override
-            public void close() {
-                listenerActors.remove(dataChangeListenerPath);
-                delegate.close();
-            }
-
-            @Override
-            public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
-                return delegate.getInstance();
-            }
-
-            @Override
-            public YangInstanceIdentifier getPath() {
-                return delegate.getPath();
-            }
-
-            @Override
-            public DataChangeScope getScope() {
-                return delegate.getScope();
-            }
-        };
-    }
-
-    @Override
-    protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message) {
-        return new DelayedDataChangeListenerRegistration(message);
+        final ShardDataTree shardDataTree = getShard().getDataStore();
+        shardDataTree.registerDataChangeListener(message.getPath(), listener, message.getScope(),
+                shardDataTree.readCurrentData(), registration -> registrationActor.tell(
+                        new DataTreeNotificationListenerRegistrationActor.SetRegistration(registration, () ->
+                            removeListenerActor(listenerActor)), ActorRef.noSender()));
     }
 
     @Override
     }
 
     @Override
-    protected ActorRef newRegistrationActor(
-            ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
-        return createActor(DataChangeListenerRegistrationActor.props(registration));
+    protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message,
+            ActorRef registrationActor) {
+        return new DelayedDataChangeListenerRegistration(message, registrationActor);
     }
 
     @Override
     }
 
     @Override
index f60e676013ae0d2ed3c9b7ed0aab265da112b4d4..dd280a6c08bc505fc279354f18b200377a6f4765 100644 (file)
@@ -15,7 +15,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import com.google.common.base.Preconditions;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -55,7 +55,8 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
     @Override
     protected synchronized void removeRegistration() {
         if (listenerRegistrationActor != null) {
     @Override
     protected synchronized void removeRegistration() {
         if (listenerRegistrationActor != null) {
-            listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender());
+            listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
+                    ActorRef.noSender());
             listenerRegistrationActor = null;
         }
 
             listenerRegistrationActor = null;
         }
 
@@ -94,7 +95,7 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
         }
 
         // This registration has already been closed, notify the actor
         }
 
         // This registration has already been closed, notify the actor
-        actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
+        actor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), null);
     }
 
     private void doRegistration(final ActorRef shard) {
     }
 
     private void doRegistration(final ActorRef shard) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java
deleted file mode 100644 (file)
index 17deeeb..0000000
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.japi.Creator;
-import com.google.common.base.Preconditions;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-
-/**
- * Actor co-located with a shard. It exists only to terminate the registration when
- * asked to do so via {@link CloseDataTreeChangeListenerRegistration}.
- */
-public final class DataTreeChangeListenerRegistrationActor extends AbstractUntypedActor {
-    private final ListenerRegistration<DOMDataTreeChangeListener> registration;
-
-    public DataTreeChangeListenerRegistrationActor(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
-        this.registration = Preconditions.checkNotNull(registration);
-    }
-
-    @Override
-    protected void handleReceive(Object message) throws Exception {
-        if (message instanceof CloseDataTreeChangeListenerRegistration) {
-            registration.close();
-            if (isValidSender(getSender())) {
-                getSender().tell(CloseDataTreeChangeListenerRegistrationReply.getInstance(), getSelf());
-            }
-
-            getSelf().tell(PoisonPill.getInstance(), getSelf());
-        } else {
-            unknownMessage(message);
-        }
-    }
-
-    public static Props props(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
-        return Props.create(new DataTreeChangeListenerRegistrationCreator(registration));
-    }
-
-    private static final class DataTreeChangeListenerRegistrationCreator
-            implements Creator<DataTreeChangeListenerRegistrationActor> {
-        private static final long serialVersionUID = 1L;
-
-        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
-                + "create remote instances of this actor and thus don't need it to be Serializable.")
-        final ListenerRegistration<DOMDataTreeChangeListener> registration;
-
-        DataTreeChangeListenerRegistrationCreator(ListenerRegistration<DOMDataTreeChangeListener> registration) {
-            this.registration = Preconditions.checkNotNull(registration);
-        }
-
-        @Override
-        public DataTreeChangeListenerRegistrationActor create() {
-            return new DataTreeChangeListenerRegistrationActor(registration);
-        }
-    }
-}
index 9a44b47e7e9257222804ed46c1e08ccaabdbf322..4f70327cb7246e947801902c05a39d2f5c81cc3b 100644 (file)
@@ -9,84 +9,37 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
 
 final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
-        RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration,
-        ListenerRegistration<DOMDataTreeChangeListener>> {
-
-    private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+        RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration> {
 
     DataTreeChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
 
     DataTreeChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
-    Collection<ActorSelection> getListenerActors() {
-        return new ArrayList<>(listenerActors);
-    }
-
     @Override
     @Override
-    ListenerRegistration<DOMDataTreeChangeListener> createDelegate(
-            final RegisterDataTreeChangeListener message) {
-        final ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
+    void doRegistration(final RegisterDataTreeChangeListener message, final ActorRef registrationActor) {
+        final ActorSelection listenerActor = processListenerRegistrationMessage(message);
 
 
-        // Notify the listener if notifications should be enabled or not
-        // If this shard is the leader then it will enable notifications else
-        // it will not
-        dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
-
-        // Now store a reference to the data change listener so it can be notified
-        // at a later point if notifications should be enabled or disabled
-        if (!message.isRegisterOnAllInstances()) {
-            addListenerActor(dataChangeListenerPath);
-        }
-
-        DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath);
+        DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(listenerActor);
 
         log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
 
 
         log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
 
-        Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> regEntry =
-                getShard().getDataStore().registerTreeChangeListener(message.getPath(), listener);
-
-        getShard().getDataStore().notifyOfInitialData(message.getPath(),
-                regEntry.getKey().getInstance(), regEntry.getValue());
-
-        listenerActors.add(dataChangeListenerPath);
-        final ListenerRegistration<DOMDataTreeChangeListener> delegate = regEntry.getKey();
-        return new ListenerRegistration<DOMDataTreeChangeListener>() {
-            @Override
-            public DOMDataTreeChangeListener getInstance() {
-                return delegate.getInstance();
-            }
-
-            @Override
-            public void close() {
-                listenerActors.remove(dataChangeListenerPath);
-                delegate.close();
-            }
-        };
+        final ShardDataTree shardDataTree = getShard().getDataStore();
+        shardDataTree.registerTreeChangeListener(message.getPath(),
+                listener, shardDataTree.readCurrentData(), registration -> registrationActor.tell(
+                        new DataTreeNotificationListenerRegistrationActor.SetRegistration(registration, () ->
+                            removeListenerActor(listenerActor)), ActorRef.noSender()));
     }
 
     @Override
     protected DelayedDataTreeListenerRegistration newDelayedListenerRegistration(
     }
 
     @Override
     protected DelayedDataTreeListenerRegistration newDelayedListenerRegistration(
-            RegisterDataTreeChangeListener message) {
-        return new DelayedDataTreeListenerRegistration(message);
-    }
-
-    @Override
-    protected ActorRef newRegistrationActor(ListenerRegistration<DOMDataTreeChangeListener> registration) {
-        return createActor(DataTreeChangeListenerRegistrationActor.props(registration));
+            RegisterDataTreeChangeListener message, ActorRef registrationActor) {
+        return new DelayedDataTreeListenerRegistration(message, registrationActor);
     }
 
     @Override
     }
 
     @Override
index a94773481d91e3e6353f60572a7b3c1fb9e4ff61..98083a0b1c9a7e5932e32d3aba29aa9c8334321a 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -14,6 +16,7 @@ import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeE
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -59,14 +62,26 @@ final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeLi
     }
 
     @Override
     }
 
     @Override
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            DataChangeListenerRegistration<L> registerDataChangeListener(YangInstanceIdentifier path, L listener,
-                    DataChangeScope scope) {
-        return dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+    public void registerDataChangeListener(YangInstanceIdentifier path,
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                    onRegistration) {
+        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                registration = dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+
+        onRegistration.accept(registration);
+
+        if (initialState.isPresent()) {
+            notifySingleListener(path, listener, scope, initialState.get());
+        }
     }
 
     }
 
-    @Override
-    public ShardDataChangeListenerPublisher newInstance() {
-        return new DefaultShardDataChangeListenerPublisher();
+    static void notifySingleListener(final YangInstanceIdentifier path,
+            final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
+            final DataChangeScope scope, final DataTreeCandidate initialState) {
+        DefaultShardDataChangeListenerPublisher publisher = new DefaultShardDataChangeListenerPublisher();
+        publisher.registerDataChangeListener(path, listener, scope, Optional.absent(), noop -> { });
+        publisher.publishChanges(initialState, "");
     }
 }
     }
 }
index a63859e9e695516488129a496319d1070e9b3804..02326e0f84364ab65e007224a09f6a8217f33f96 100644 (file)
@@ -7,9 +7,11 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Optional;
 import java.util.Collection;
 import java.util.Collection;
+import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
 import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTreeChangePublisher;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
 import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTreeChangePublisher;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -34,11 +36,6 @@ final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStore
         processCandidateTree(candidate);
     }
 
         processCandidateTree(candidate);
     }
 
-    @Override
-    public ShardDataTreeChangeListenerPublisher newInstance() {
-        return new DefaultShardDataTreeChangeListenerPublisher();
-    }
-
     @Override
     protected void notifyListener(AbstractDOMDataTreeChangeListenerRegistration<?> registration,
             Collection<DataTreeCandidate> changes) {
     @Override
     protected void notifyListener(AbstractDOMDataTreeChangeListenerRegistration<?> registration,
             Collection<DataTreeCandidate> changes) {
@@ -51,18 +48,32 @@ final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStore
     }
 
     @Override
     }
 
     @Override
-    public <L extends org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> ListenerRegistration<L>
-            registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
-        final AbstractDOMDataTreeChangeListenerRegistration<DOMDataTreeChangeListener> registration =
-            super.registerTreeChangeListener(treeId, (org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener)
-                changes -> listener.onDataTreeChanged(changes));
+    public void registerTreeChangeListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+        AbstractDOMDataTreeChangeListenerRegistration<org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener>
+            registration = super.registerTreeChangeListener(treeId,
+                (org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener)changes ->
+                    listener.onDataTreeChanged(changes));
+
+        onRegistration.accept(
+            new org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration<
+                    DOMDataTreeChangeListener>(listener) {
+                @Override
+                protected void removeRegistration() {
+                    registration.close();
+                }
+            });
+
+        if (initialState.isPresent()) {
+            notifySingleListener(treeId, listener, initialState.get());
+        }
+    }
 
 
-        return new org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration<L>(
-                listener) {
-            @Override
-            protected void removeRegistration() {
-                registration.close();
-            }
-        };
+    static void notifySingleListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
+            DataTreeCandidate state) {
+        DefaultShardDataTreeChangeListenerPublisher publisher = new DefaultShardDataTreeChangeListenerPublisher();
+        publisher.registerTreeChangeListener(treeId, listener, Optional.absent(), noop -> { });
+        publisher.publishChanges(state, "");
     }
 }
     }
 }
index c49c3de9a831549672a07e35c05268449617da0b..a8fe1002da1f16b86238178b4eb50223259d05a4 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -15,7 +16,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 final class DelayedDataChangeListenerRegistration extends DelayedListenerRegistration<
            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener> {
 
 final class DelayedDataChangeListenerRegistration extends DelayedListenerRegistration<
            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener> {
 
-    DelayedDataChangeListenerRegistration(final RegisterChangeListener registerChangeListener) {
-        super(registerChangeListener);
+    DelayedDataChangeListenerRegistration(final RegisterChangeListener registerChangeListener,
+            final ActorRef registrationActor) {
+        super(registerChangeListener, registrationActor);
     }
     }
-}
\ No newline at end of file
+}
index 35f73086892fa5d596705e21b7bdf75a8f65c6e3..c67115a1f3fb6d90e470d1b8eefaf6aec880232b 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 
@@ -18,8 +19,9 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 final class DelayedDataTreeListenerRegistration
         extends DelayedListenerRegistration<DOMDataTreeChangeListener, RegisterDataTreeChangeListener> {
 
 final class DelayedDataTreeListenerRegistration
         extends DelayedListenerRegistration<DOMDataTreeChangeListener, RegisterDataTreeChangeListener> {
 
-    DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) {
-        super(registerTreeChangeListener);
+    DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener,
+            final ActorRef registrationActor) {
+        super(registerTreeChangeListener, registrationActor);
     }
 }
 
     }
 }
 
index 8d73bc6155d2c664ba6448a1085bbf85850d9606..18d23aa0cd95fb2270d46ebd3a24b30814bbf959 100644 (file)
@@ -7,33 +7,32 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import java.util.EventListener;
 import javax.annotation.concurrent.GuardedBy;
 import java.util.EventListener;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 
-abstract class DelayedListenerRegistration<L extends EventListener, M> implements ListenerRegistration<L> {
+abstract class DelayedListenerRegistration<L extends EventListener, M extends ListenerRegistrationMessage>
+        implements ListenerRegistration<L> {
     private final M registrationMessage;
     private final M registrationMessage;
-    private volatile ListenerRegistration<L> delegate;
+    private final ActorRef registrationActor;
 
     @GuardedBy("this")
     private boolean closed;
 
 
     @GuardedBy("this")
     private boolean closed;
 
-    protected DelayedListenerRegistration(M registrationMessage) {
+    protected DelayedListenerRegistration(M registrationMessage, ActorRef registrationActor) {
         this.registrationMessage = registrationMessage;
         this.registrationMessage = registrationMessage;
+        this.registrationActor = registrationActor;
     }
 
     M getRegistrationMessage() {
         return registrationMessage;
     }
 
     }
 
     M getRegistrationMessage() {
         return registrationMessage;
     }
 
-    ListenerRegistration<L> getDelegate() {
-        return delegate;
-    }
-
-    synchronized <R extends ListenerRegistration<L>> void createDelegate(
-            final LeaderLocalDelegateFactory<M, R> factory) {
+    synchronized void createDelegate(final AbstractDataListenerSupport<L, M, ?> support) {
         if (!closed) {
         if (!closed) {
-            this.delegate = factory.createDelegate(registrationMessage);
+            support.doRegistration(registrationMessage, registrationActor);
         }
     }
 
         }
     }
 
@@ -50,11 +49,6 @@ abstract class DelayedListenerRegistration<L extends EventListener, M> implement
 
     @Override
     public synchronized void close() {
 
     @Override
     public synchronized void close() {
-        if (!closed) {
-            closed = true;
-            if (delegate != null) {
-                delegate.close();
-            }
-        }
+        closed = true;
     }
 }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java
deleted file mode 100644 (file)
index cd1b548..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-/**
- * Base class for factories instantiating delegates.
- *
- * @param <M> message type
- * @param <D> delegate type
- */
-abstract class DelegateFactory<M, D> {
-    abstract D createDelegate(M message);
-}
index 0f766295409c73e0305f6d45dd3fbc2df605afce..5f58bc9ef57f61f28326c7e661a23ba57c0bfafd 100644 (file)
@@ -20,7 +20,7 @@ import com.google.common.base.Preconditions;
  * @param <D> delegate type
  * @param <M> message type
  */
  * @param <D> delegate type
  * @param <M> message type
  */
-abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
+abstract class LeaderLocalDelegateFactory<M> {
     private final Shard shard;
 
     protected LeaderLocalDelegateFactory(final Shard shard) {
     private final Shard shard;
 
     protected LeaderLocalDelegateFactory(final Shard shard) {
index 39ee810eb0f3b29a0544e516b56d9e086dc5fb2d..aca69c2a5f5f5956893d247ac9fa01f2e5277d89 100644 (file)
@@ -188,9 +188,9 @@ public class Shard extends RaftActor {
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
         ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
         ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
-                new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
+                new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
         ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
         ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
-                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
+                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
         if (builder.getDataTree() != null) {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
                     treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
         if (builder.getDataTree() != null) {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
                     treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
index 87a0c8d64bef09c18676eaca5e0e472c041fe523..3558107f17da6a8b06efc1ded7c15f8ac8e84f87 100644 (file)
@@ -7,11 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 /**
  * Interface for a class that generates and publishes notifications for DataChangeListeners.
 
 /**
  * Interface for a class that generates and publishes notifications for DataChangeListeners.
@@ -19,8 +22,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * @author Thomas Pantelis
  */
 interface ShardDataChangeListenerPublisher extends ShardDataTreeNotificationPublisher {
  * @author Thomas Pantelis
  */
 interface ShardDataChangeListenerPublisher extends ShardDataTreeNotificationPublisher {
-    ShardDataChangeListenerPublisher newInstance();
-
-    <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L>
-            registerDataChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope);
+    void registerDataChangeListener(YangInstanceIdentifier path,
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                onRegistration);
 }
 }
index e3c71830a85df1b8d8d3f0e240257f280c577c0f..a17c603596d1e9b0d8045f521bfb08b23c3953fa 100644 (file)
@@ -8,12 +8,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorContext;
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 /**
  * Implementation of ShardDataChangeListenerPublisher that offloads the generation and publication
 
 /**
  * Implementation of ShardDataChangeListenerPublisher that offloads the generation and publication
@@ -25,30 +30,22 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 class ShardDataChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
         implements ShardDataChangeListenerPublisher {
 
 class ShardDataChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
         implements ShardDataChangeListenerPublisher {
 
-    private final ShardDataChangeListenerPublisher delegatePublisher = new DefaultShardDataChangeListenerPublisher();
-
-    ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) {
-        super(actorContext, actorName);
-    }
-
-    private ShardDataChangeListenerPublisherActorProxy(ShardDataChangeListenerPublisherActorProxy other) {
-        super(other);
-    }
-
-    @Override
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            DataChangeListenerRegistration<L> registerDataChangeListener(YangInstanceIdentifier path, L listener,
-                    DataChangeScope scope) {
-        return delegatePublisher.registerDataChangeListener(path, listener, scope);
+    ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName, String logContext) {
+        super(actorContext, actorName, logContext);
     }
 
     @Override
     }
 
     @Override
-    public ShardDataChangeListenerPublisher newInstance() {
-        return new ShardDataChangeListenerPublisherActorProxy(this);
+    public void registerDataChangeListener(YangInstanceIdentifier path,
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                onRegistration) {
+        notifierActor().tell(new ShardDataChangePublisherActor.RegisterListener(path, listener, scope, initialState,
+                onRegistration), ActorRef.noSender());
     }
 
     @Override
     }
 
     @Override
-    protected ShardDataTreeNotificationPublisher getDelegatePublisher() {
-        return delegatePublisher;
+    protected Props props() {
+        return ShardDataChangePublisherActor.props(actorName(), logContext());
     }
 }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangePublisherActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangePublisherActor.java
new file mode 100644 (file)
index 0000000..cb7d80d
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.function.Consumer;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Actor used to generate and publish DataChange notifications.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardDataChangePublisherActor
+        extends ShardDataTreeNotificationPublisherActor<ShardDataChangeListenerPublisher> {
+
+    private ShardDataChangePublisherActor(final String name, final String logContext) {
+        super(new DefaultShardDataChangeListenerPublisher(), name, logContext);
+    }
+
+    @Override
+    protected void handleReceive(Object message) {
+        if (message instanceof RegisterListener) {
+            RegisterListener reg = (RegisterListener)message;
+            if (reg.initialState.isPresent()) {
+                DefaultShardDataChangeListenerPublisher.notifySingleListener(reg.path, reg.listener, reg.scope,
+                        reg.initialState.get());
+            }
+
+            publisher().registerDataChangeListener(reg.path, reg.listener, reg.scope, Optional.absent(),
+                    reg.onRegistration);
+        } else {
+            super.handleReceive(message);
+        }
+    }
+
+    static Props props(final String name, final String logContext) {
+        return Props.create(ShardDataChangePublisherActor.class, name, logContext);
+    }
+
+    static class RegisterListener {
+        private final YangInstanceIdentifier path;
+        private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+        private final DataChangeScope scope;
+        private final Optional<DataTreeCandidate> initialState;
+        private final Consumer<ListenerRegistration<
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> onRegistration;
+
+        RegisterListener(final YangInstanceIdentifier path,
+                final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
+                final DataChangeScope scope, final Optional<DataTreeCandidate> initialState,
+                final Consumer<ListenerRegistration<
+                    AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> onRegistration) {
+            this.path = Preconditions.checkNotNull(path);
+            this.listener = Preconditions.checkNotNull(listener);
+            this.scope = Preconditions.checkNotNull(scope);
+            this.initialState = Preconditions.checkNotNull(initialState);
+            this.onRegistration = Preconditions.checkNotNull(onRegistration);
+        }
+    }
+}
index 9abb00292f1ac682eaaebb3637de0c44c80f9614..f7d98276caa5fc5f995232b7758eb089ed1af1be 100644 (file)
@@ -24,7 +24,6 @@ import com.google.common.primitives.UnsignedLong;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.IOException;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.IOException;
-import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -63,7 +62,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -544,25 +542,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         dataChangeListenerPublisher.publishChanges(candidate, logContext);
     }
 
         dataChangeListenerPublisher.publishChanges(candidate, logContext);
     }
 
-    void notifyOfInitialData(final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-            NormalizedNode<?, ?>>> listenerReg, final Optional<DataTreeCandidate> currentState) {
-        if (currentState.isPresent()) {
-            ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance();
-            localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
-                    listenerReg.getScope());
-            localPublisher.publishChanges(currentState.get(), logContext);
-        }
-    }
-
-    void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
-            final Optional<DataTreeCandidate> currentState) {
-        if (currentState.isPresent()) {
-            ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance();
-            localPublisher.registerTreeChangeListener(path, listener);
-            localPublisher.publishChanges(currentState.get(), logContext);
-        }
-    }
-
     /**
      * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
      * replication callbacks.
     /**
      * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
      * replication callbacks.
@@ -615,29 +594,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
     }
 
         replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
     }
 
-    Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-            Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
-                    final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
-                    final DataChangeScope scope) {
-        DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
-                dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
-
-        return new SimpleEntry<>(reg, readCurrentData());
+    void registerDataChangeListener(YangInstanceIdentifier path,
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                    onRegistration) {
+        dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration);
     }
 
     }
 
-    private Optional<DataTreeCandidate> readCurrentData() {
+    Optional<DataTreeCandidate> readCurrentData() {
         final Optional<NormalizedNode<?, ?>> currentState =
                 dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
         return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
             YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
     }
 
         final Optional<NormalizedNode<?, ?>> currentState =
                 dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
         return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
             YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
     }
 
-    public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>>
-            registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
-        final ListenerRegistration<DOMDataTreeChangeListener> reg =
-                treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
-
-        return new SimpleEntry<>(reg, readCurrentData());
+    public void registerTreeChangeListener(YangInstanceIdentifier path, DOMDataTreeChangeListener listener,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+        treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
     }
 
     int getQueueSize() {
     }
 
     int getQueueSize() {
index d4a5156d2b1c063ea41586d0bf104b64de42f479..7f20beecd5f142eb9fa1966734de1d90c778e53c 100644 (file)
@@ -7,13 +7,20 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 /**
  * Interface for a class that generates and publishes notifications for DataTreeChangeListeners.
  *
  * @author Thomas Pantelis
  */
 
 /**
  * Interface for a class that generates and publishes notifications for DataTreeChangeListeners.
  *
  * @author Thomas Pantelis
  */
-interface ShardDataTreeChangeListenerPublisher extends ShardDataTreeNotificationPublisher, DOMStoreTreeChangePublisher {
-    ShardDataTreeChangeListenerPublisher newInstance();
+interface ShardDataTreeChangeListenerPublisher extends ShardDataTreeNotificationPublisher {
+    void registerTreeChangeListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration);
 }
 }
index 7196f839e4ca876e7ae67ec519f54b6751d603db..ceaeeccaad8b700a9c262f25cdc7d1f50f590f60 100644 (file)
@@ -8,10 +8,15 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorContext;
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 /**
  * Implementation of ShardDataTreeChangeListenerPublisher that offloads the generation and publication
 
 /**
  * Implementation of ShardDataTreeChangeListenerPublisher that offloads the generation and publication
@@ -23,30 +28,20 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 class ShardDataTreeChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
         implements ShardDataTreeChangeListenerPublisher {
 
 class ShardDataTreeChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
         implements ShardDataTreeChangeListenerPublisher {
 
-    private final ShardDataTreeChangeListenerPublisher delegatePublisher =
-            new DefaultShardDataTreeChangeListenerPublisher();
-
-    ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) {
-        super(actorContext, actorName);
-    }
-
-    private ShardDataTreeChangeListenerPublisherActorProxy(ShardDataTreeChangeListenerPublisherActorProxy other) {
-        super(other);
-    }
-
-    @Override
-    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
-            YangInstanceIdentifier treeId, L listener) {
-        return delegatePublisher.registerTreeChangeListener(treeId, listener);
+    ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName, String logContext) {
+        super(actorContext, actorName, logContext);
     }
 
     @Override
     }
 
     @Override
-    public ShardDataTreeChangeListenerPublisher newInstance() {
-        return new ShardDataTreeChangeListenerPublisherActorProxy(this);
+    public void registerTreeChangeListener(YangInstanceIdentifier treeId,
+            DOMDataTreeChangeListener listener, Optional<DataTreeCandidate> currentState,
+            Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+        notifierActor().tell(new ShardDataTreeChangePublisherActor.RegisterListener(treeId, listener, currentState,
+                onRegistration), ActorRef.noSender());
     }
 
     @Override
     }
 
     @Override
-    protected ShardDataTreeNotificationPublisher getDelegatePublisher() {
-        return delegatePublisher;
+    protected Props props() {
+        return ShardDataTreeChangePublisherActor.props(actorName(), logContext());
     }
 }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisherActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisherActor.java
new file mode 100644 (file)
index 0000000..f054ccd
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.function.Consumer;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Actor used to generate and publish DataTreeChange notifications.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardDataTreeChangePublisherActor
+        extends ShardDataTreeNotificationPublisherActor<ShardDataTreeChangeListenerPublisher> {
+
+    private ShardDataTreeChangePublisherActor(final String name, final String logContext) {
+        super(new DefaultShardDataTreeChangeListenerPublisher(), name, logContext);
+    }
+
+    @Override
+    protected void handleReceive(Object message) {
+        if (message instanceof RegisterListener) {
+            RegisterListener reg = (RegisterListener)message;
+            if (reg.initialState.isPresent()) {
+                DefaultShardDataTreeChangeListenerPublisher.notifySingleListener(reg.path, reg.listener,
+                        reg.initialState.get());
+            }
+
+            publisher().registerTreeChangeListener(reg.path, reg.listener, Optional.absent(), reg.onRegistration);
+        } else {
+            super.handleReceive(message);
+        }
+    }
+
+    static Props props(final String name, final String logContext) {
+        return Props.create(ShardDataTreeChangePublisherActor.class, name, logContext);
+    }
+
+    static class RegisterListener {
+        private final YangInstanceIdentifier path;
+        private final DOMDataTreeChangeListener listener;
+        private final Optional<DataTreeCandidate> initialState;
+        private final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration;
+
+        RegisterListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+                final Optional<DataTreeCandidate> initialState,
+                final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+            this.path = Preconditions.checkNotNull(path);
+            this.listener = Preconditions.checkNotNull(listener);
+            this.initialState = Preconditions.checkNotNull(initialState);
+            this.onRegistration = Preconditions.checkNotNull(onRegistration);
+        }
+    }
+}
index 76c52c9bb55e4342d0056b81dd2e0748cd64aa6f..ea088308299272f4cbf9cf30cda48fddd454f3ca 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.Props;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
@@ -19,31 +18,43 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
  *
  * @author Thomas Pantelis
  */
  *
  * @author Thomas Pantelis
  */
-public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActor {
+public class ShardDataTreeNotificationPublisherActor<T extends ShardDataTreeNotificationPublisher>
+        extends AbstractUntypedActor {
+    private final T publisher;
     private final Stopwatch timer = Stopwatch.createUnstarted();
     private final String name;
     private final Stopwatch timer = Stopwatch.createUnstarted();
     private final String name;
+    private final String logContext;
 
 
-    private ShardDataTreeNotificationPublisherActor(String name) {
+    protected ShardDataTreeNotificationPublisherActor(final T publisher, final String name, final String logContext) {
+        this.publisher = publisher;
         this.name = name;
         this.name = name;
+        this.logContext = logContext;
+    }
+
+    protected T publisher() {
+        return publisher;
+    }
+
+    protected String logContext() {
+        return logContext;
     }
 
     @Override
     protected void handleReceive(Object message) {
         if (message instanceof PublishNotifications) {
     }
 
     @Override
     protected void handleReceive(Object message) {
         if (message instanceof PublishNotifications) {
-            PublishNotifications publisher = (PublishNotifications)message;
+            PublishNotifications toPublish = (PublishNotifications)message;
             timer.start();
 
             try {
             timer.start();
 
             try {
-                publisher.publish();
+                publisher.publishChanges(toPublish.candidate, logContext);
             } finally {
                 long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
 
                 if (elapsedTime >= ShardDataTreeNotificationPublisher.PUBLISH_DELAY_THRESHOLD_IN_MS) {
                     LOG.warn("{}: Generation of change events for {} took longer than expected. Elapsed time: {}",
             } finally {
                 long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
 
                 if (elapsedTime >= ShardDataTreeNotificationPublisher.PUBLISH_DELAY_THRESHOLD_IN_MS) {
                     LOG.warn("{}: Generation of change events for {} took longer than expected. Elapsed time: {}",
-                            publisher.logContext, name, timer);
+                            logContext, name, timer);
                 } else {
                 } else {
-                    LOG.debug("{}: Elapsed time for generation of change events for {}: {}", publisher.logContext,
-                            name, timer);
+                    LOG.debug("{}: Elapsed time for generation of change events for {}: {}", logContext, name, timer);
                 }
 
                 timer.reset();
                 }
 
                 timer.reset();
@@ -51,24 +62,11 @@ public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActo
         }
     }
 
         }
     }
 
-    static Props props(String notificationType) {
-        return Props.create(ShardDataTreeNotificationPublisherActor.class, notificationType);
-    }
-
     static class PublishNotifications {
     static class PublishNotifications {
-        private final ShardDataTreeNotificationPublisher publisher;
         private final DataTreeCandidate candidate;
         private final DataTreeCandidate candidate;
-        private final String logContext;
 
 
-        PublishNotifications(ShardDataTreeNotificationPublisher publisher, DataTreeCandidate candidate,
-                String logContext) {
-            this.publisher = publisher;
+        PublishNotifications(DataTreeCandidate candidate) {
             this.candidate = candidate;
             this.candidate = candidate;
-            this.logContext = logContext;
-        }
-
-        private void publish() {
-            publisher.publishChanges(candidate, logContext);
         }
     }
 }
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActor.java
new file mode 100644 (file)
index 0000000..2a60abb
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Actor co-located with a shard. It exists only to terminate the registration when
+ * asked to do so via {@link CloseDataTreeNotificationListenerRegistration}.
+ */
+public final class DataTreeNotificationListenerRegistrationActor extends AbstractUntypedActor {
+    @VisibleForTesting
+    static long killDelay = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);
+
+    private ListenerRegistration<?> registration;
+    private Runnable onClose;
+    private boolean closed;
+    private Cancellable killSchedule;
+
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if (message instanceof CloseDataTreeNotificationListenerRegistration) {
+            closeListenerRegistration();
+            if (isValidSender(getSender())) {
+                getSender().tell(CloseDataTreeNotificationListenerRegistrationReply.getInstance(), getSelf());
+            }
+        } else if (message instanceof SetRegistration) {
+            registration = ((SetRegistration)message).registration;
+            onClose = ((SetRegistration)message).onClose;
+            if (closed) {
+                closeListenerRegistration();
+            }
+        } else {
+            unknownMessage(message);
+        }
+    }
+
+    private void closeListenerRegistration() {
+        closed = true;
+        if (registration != null) {
+            registration.close();
+            onClose.run();
+            registration = null;
+
+            if (killSchedule == null) {
+                killSchedule = getContext().system().scheduler().scheduleOnce(Duration.create(killDelay,
+                        TimeUnit.MILLISECONDS), getSelf(), PoisonPill.getInstance(), getContext().dispatcher(),
+                        ActorRef.noSender());
+            }
+        }
+    }
+
+    public static Props props() {
+        return Props.create(DataTreeNotificationListenerRegistrationActor.class);
+    }
+
+    public static class SetRegistration {
+        private final ListenerRegistration<?> registration;
+        private final Runnable onClose;
+
+        public SetRegistration(final ListenerRegistration<?> registration, final Runnable onClose) {
+            this.registration = Preconditions.checkNotNull(registration);
+            this.onClose = Preconditions.checkNotNull(onClose);
+        }
+    }
+}
index 1989bad81a06acb4c63e53310f7b2693ac717e0a..6094e2469d9aaaf72c35d259c297d3c600673db0 100644 (file)
@@ -12,6 +12,7 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
 
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
 
+import com.google.common.base.Optional;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
@@ -24,7 +25,7 @@ public abstract class AbstractEntityOwnerChangeListener implements DOMDataTreeCh
             .node(ENTITY_OWNER_QNAME).build();
 
     void init(ShardDataTree shardDataTree) {
             .node(ENTITY_OWNER_QNAME).build();
 
     void init(ShardDataTree shardDataTree) {
-        shardDataTree.registerTreeChangeListener(EOS_PATH, this);
+        shardDataTree.registerTreeChangeListener(EOS_PATH, this, Optional.absent(), noop -> { });
     }
 
     protected static String extractOwner(LeafNode<?> ownerLeaf) {
     }
 
     protected static String extractOwner(LeafNode<?> ownerLeaf) {
index 3b4f2d96471a296db82d01f4689c974a4dccb1fd..6b6717c7da16ac0432f8ffc69381a2df010fc1ad 100644 (file)
@@ -14,6 +14,7 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
 
 import akka.actor.ActorRef;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
 
 import akka.actor.ActorRef;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -60,7 +61,7 @@ class CandidateListChangeListener implements DOMDataTreeChangeListener {
     void init(ShardDataTree shardDataTree) {
         shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH)
                 .node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME)
     void init(ShardDataTree shardDataTree) {
         shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH)
                 .node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME)
-                    .node(Candidate.QNAME).node(Candidate.QNAME).build(), this);
+                    .node(Candidate.QNAME).node(Candidate.QNAME).build(), this, Optional.absent(), noop -> { });
     }
 
     @Override
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java
deleted file mode 100644 (file)
index 8c35caa..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-public class CloseDataChangeListenerRegistration {
-    public static final CloseDataChangeListenerRegistration INSTANCE = new CloseDataChangeListenerRegistration();
-
-    private CloseDataChangeListenerRegistration() {
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java
deleted file mode 100644 (file)
index 430202a..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-
-public class CloseDataChangeListenerRegistrationReply {
-    public static final CloseDataChangeListenerRegistrationReply INSTANCE =
-            new CloseDataChangeListenerRegistrationReply();
-
-    private CloseDataChangeListenerRegistrationReply() {
-    }
-}
@@ -10,15 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import java.io.ObjectStreamException;
 import java.io.Serializable;
 
 import java.io.ObjectStreamException;
 import java.io.Serializable;
 
-public final class CloseDataTreeChangeListenerRegistration implements Serializable {
+public final class CloseDataTreeNotificationListenerRegistration implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final long serialVersionUID = 1L;
-    private static final CloseDataTreeChangeListenerRegistration INSTANCE =
-            new CloseDataTreeChangeListenerRegistration();
+    private static final CloseDataTreeNotificationListenerRegistration INSTANCE =
+            new CloseDataTreeNotificationListenerRegistration();
 
 
-    private CloseDataTreeChangeListenerRegistration() {
+    private CloseDataTreeNotificationListenerRegistration() {
     }
 
     }
 
-    public static CloseDataTreeChangeListenerRegistration getInstance() {
+    public static CloseDataTreeNotificationListenerRegistration getInstance() {
         return INSTANCE;
     }
 
         return INSTANCE;
     }
 
@@ -10,16 +10,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import java.io.ObjectStreamException;
 import java.io.Serializable;
 
 import java.io.ObjectStreamException;
 import java.io.Serializable;
 
-public final class CloseDataTreeChangeListenerRegistrationReply implements Serializable {
+public final class CloseDataTreeNotificationListenerRegistrationReply implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final long serialVersionUID = 1L;
-    private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE =
-            new CloseDataTreeChangeListenerRegistrationReply();
+    private static final CloseDataTreeNotificationListenerRegistrationReply INSTANCE =
+            new CloseDataTreeNotificationListenerRegistrationReply();
 
 
-    private CloseDataTreeChangeListenerRegistrationReply() {
+    private CloseDataTreeNotificationListenerRegistrationReply() {
         // Use getInstance() instead
     }
 
         // Use getInstance() instead
     }
 
-    public static CloseDataTreeChangeListenerRegistrationReply getInstance() {
+    public static CloseDataTreeNotificationListenerRegistrationReply getInstance() {
         return INSTANCE;
     }
 
         return INSTANCE;
     }
 
index 5cea06b4cb301e691beae65357a3151c7fcc3fa3..3f016a444f26678756d2b4a5dac26ea704011c27 100644 (file)
@@ -7,10 +7,13 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import akka.actor.ActorPath;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public interface ListenerRegistrationMessage {
     YangInstanceIdentifier getPath();
 
     boolean isRegisterOnAllInstances();
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public interface ListenerRegistrationMessage {
     YangInstanceIdentifier getPath();
 
     boolean isRegisterOnAllInstances();
+
+    ActorPath getListenerActorPath();
 }
 }
index f5d86984755beada93ae5ec6406545c7e74ee89d..163525a8ca57f1496e8b932c7851e6a618a9f988 100644 (file)
@@ -15,15 +15,14 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class RegisterChangeListener implements ListenerRegistrationMessage {
     private final YangInstanceIdentifier path;
 
 public class RegisterChangeListener implements ListenerRegistrationMessage {
     private final YangInstanceIdentifier path;
-    private final ActorRef dataChangeListener;
+    private final ActorRef dataChangeListenerActor;
     private final AsyncDataBroker.DataChangeScope scope;
     private final boolean registerOnAllInstances;
 
     private final AsyncDataBroker.DataChangeScope scope;
     private final boolean registerOnAllInstances;
 
-    public RegisterChangeListener(YangInstanceIdentifier path,
-        ActorRef dataChangeListener,
-        AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) {
+    public RegisterChangeListener(YangInstanceIdentifier path, ActorRef dataChangeListenerActor,
+            AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) {
         this.path = path;
         this.path = path;
-        this.dataChangeListener = dataChangeListener;
+        this.dataChangeListenerActor = dataChangeListenerActor;
         this.scope = scope;
         this.registerOnAllInstances = registerOnAllInstances;
     }
         this.scope = scope;
         this.registerOnAllInstances = registerOnAllInstances;
     }
@@ -37,8 +36,9 @@ public class RegisterChangeListener implements ListenerRegistrationMessage {
         return scope;
     }
 
         return scope;
     }
 
-    public ActorPath getDataChangeListenerPath() {
-        return dataChangeListener.path();
+    @Override
+    public ActorPath getListenerActorPath() {
+        return dataChangeListenerActor.path();
     }
 
     @Override
     }
 
     @Override
index c9c0c0ceea914631c426304dea984971765a36b5..abf072b640aa08b57855cd9eb5dba34d8089af7f 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import java.io.Externalizable;
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import java.io.Externalizable;
@@ -42,8 +43,9 @@ public final class RegisterDataTreeChangeListener implements Externalizable, Lis
         return path;
     }
 
         return path;
     }
 
-    public ActorRef getDataTreeChangeListenerPath() {
-        return dataTreeChangeListenerPath;
+    @Override
+    public ActorPath getListenerActorPath() {
+        return dataTreeChangeListenerPath.path();
     }
 
     @Override
     }
 
     @Override
index 1c733cc1ffe393911566e49c1f0197cd63e9769a..17a55b9767a152379e9f02f5ad0740a22c563535 100644 (file)
@@ -30,7 +30,7 @@ import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
@@ -114,7 +114,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
                 proxy.close();
 
                 // The listener registration actor should get a Close message
                 proxy.close();
 
                 // The listener registration actor should get a Close message
-                expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
+                expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
 
                 // The DataChangeListener actor should be terminated
                 expectMsgClass(timeout, Terminated.class);
 
                 // The DataChangeListener actor should be terminated
                 expectMsgClass(timeout, Terminated.class);
@@ -174,7 +174,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
                 proxy.close();
 
                 // The listener registration actor should get a Close message
                 proxy.close();
 
                 // The listener registration actor should get a Close message
-                expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
+                expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
 
                 // The DataChangeListener actor should be terminated
                 expectMsgClass(timeout, Terminated.class);
 
                 // The DataChangeListener actor should be terminated
                 expectMsgClass(timeout, Terminated.class);
@@ -326,7 +326,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
                 proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
                         AsyncDataBroker.DataChangeScope.ONE);
 
                 proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
                         AsyncDataBroker.DataChangeScope.ONE);
 
-                expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.class);
+                expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class);
 
                 Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
                 proxy.close();
 
                 Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
                 proxy.close();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java
deleted file mode 100644 (file)
index d23cfea..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public class DataChangeListenerRegistrationTest extends AbstractActorTest {
-    private static final InMemoryDOMDataStore STORE = new InMemoryDOMDataStore("OPER",
-            MoreExecutors.newDirectExecutorService());
-
-    static {
-        STORE.onGlobalContextUpdated(TestModel.createTestContext());
-    }
-
-    @Test
-    public void testOnReceiveCloseListenerRegistration() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                final Props props = DataChangeListenerRegistrationActor.props(STORE.registerChangeListener(
-                        TestModel.TEST_PATH, noOpDataChangeListener(), AsyncDataBroker.DataChangeScope.BASE));
-                final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
-
-                new Within(duration("1 seconds")) {
-                    @Override
-                    protected void run() {
-
-                        subject.tell(CloseDataChangeListenerRegistration.INSTANCE, getRef());
-
-                        final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                            // do not put code outside this method, will run
-                            // afterwards
-                            @Override
-                            protected String match(final Object in) {
-                                if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.class)) {
-                                    return "match";
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                        assertEquals("match", out);
-
-                        expectNoMsg();
-                    }
-
-                };
-            }
-        };
-    }
-
-    private static AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
-        return change -> {
-        };
-    }
-}
index e6b20a720673e9ed3a7b8d5ebce77b3fe705a610..6f2f99b24ef7a088586e39ce7334a4cc3e8d4d61 100644 (file)
@@ -29,7 +29,7 @@ import org.junit.Test;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
@@ -93,7 +93,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 proxy.close();
 
                 // The listener registration actor should get a Close message
                 proxy.close();
 
                 // The listener registration actor should get a Close message
-                expectMsgClass(timeout, CloseDataTreeChangeListenerRegistration.class);
+                expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
 
                 // The DataChangeListener actor should be terminated
                 expectMsgClass(timeout, Terminated.class);
 
                 // The DataChangeListener actor should be terminated
                 expectMsgClass(timeout, Terminated.class);
@@ -277,7 +277,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
 
                 proxy.init(shardName);
 
 
                 proxy.init(shardName);
 
-                expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class);
+                expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class);
 
                 Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
             }
 
                 Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
             }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActorTest.java
deleted file mode 100644 (file)
index 9259a61..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-
-public class DataTreeChangeListenerRegistrationActorTest extends AbstractActorTest {
-    private static final InMemoryDOMDataStore STORE = new InMemoryDOMDataStore("OPER",
-            MoreExecutors.newDirectExecutorService());
-
-    static {
-        STORE.onGlobalContextUpdated(TestModel.createTestContext());
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Test
-    public void testOnReceiveCloseListenerRegistration() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                final ListenerRegistration mockListenerReg = Mockito.mock(ListenerRegistration.class);
-                final Props props = DataTreeChangeListenerRegistrationActor.props(mockListenerReg);
-                final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
-
-                subject.tell(CloseDataTreeChangeListenerRegistration.getInstance(), getRef());
-
-                expectMsgClass(duration("1 second"), CloseDataTreeChangeListenerRegistrationReply.class);
-
-                Mockito.verify(mockListenerReg).close();
-            }
-        };
-    }
-}
index 00f0f4bdf0502095aefad3858ffa18f63625e1a6..1426e4a243f4a35cdc4de136581ee1f1a26a7d45 100644 (file)
@@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
@@ -92,8 +92,8 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
 
         listener.reset(1);
         JavaTestKit kit = new JavaTestKit(getSystem());
 
         listener.reset(1);
         JavaTestKit kit = new JavaTestKit(getSystem());
-        entry.getValue().tell(CloseDataTreeChangeListenerRegistration.getInstance(), kit.getRef());
-        kit.expectMsgClass(JavaTestKit.duration("5 seconds"), CloseDataTreeChangeListenerRegistrationReply.class);
+        entry.getValue().tell(CloseDataTreeNotificationListenerRegistration.getInstance(), kit.getRef());
+        kit.expectMsgClass(JavaTestKit.duration("5 seconds"), CloseDataTreeNotificationListenerRegistrationReply.class);
 
         writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
         listener.verifyNoNotifiedData(TEST_PATH);
 
         writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
         listener.verifyNoNotifiedData(TEST_PATH);
index 31e36cf679ea58454643bfebdf0eb71ca142e9ec..e3aae92f9c2c200d0598600e88dea2cabf9cd4dc 100644 (file)
@@ -63,6 +63,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardData
 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@@ -117,6 +118,8 @@ public class DistributedDataStoreIntegrationTest {
 
     @Before
     public void setUp() throws IOException {
 
     @Before
     public void setUp() throws IOException {
+        InMemorySnapshotStore.clear();
+        InMemoryJournal.clear();
         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
         Cluster.get(system).join(member1Address);
         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
         Cluster.get(system).join(member1Address);
@@ -1167,6 +1170,10 @@ public class DistributedDataStoreIntegrationTest {
 
                     assertNotNull("registerChangeListener returned null", listenerReg);
 
 
                     assertNotNull("registerChangeListener returned null", listenerReg);
 
+                    IntegrationTestKit.verifyShardState(dataStore, "test-1",
+                        state -> assertEquals("getDataChangeListenerActors", 1,
+                                state.getDataChangeListenerActors().size()));
+
                     // Wait for the initial notification
                     listener.waitForChangeEvents(TestModel.TEST_PATH);
                     listener.reset(2);
                     // Wait for the initial notification
                     listener.waitForChangeEvents(TestModel.TEST_PATH);
                     listener.reset(2);
@@ -1184,6 +1191,63 @@ public class DistributedDataStoreIntegrationTest {
                     listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
                     listenerReg.close();
 
                     listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
                     listenerReg.close();
 
+                    IntegrationTestKit.verifyShardState(dataStore, "test-1",
+                        state -> assertEquals("getDataChangeListenerActors", 0,
+                                state.getDataChangeListenerActors().size()));
+
+                    testWriteTransaction(dataStore,
+                            YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
+                            ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
+
+                    listener.expectNoMoreChanges("Received unexpected change after close");
+                }
+            }
+        };
+    }
+
+    @Test
+    public void testDataTreeChangeListenerRegistration() throws Exception {
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+            {
+                try (final AbstractDataStore dataStore = setupAbstractDataStore(
+                        testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
+
+                    testWriteTransaction(dataStore, TestModel.TEST_PATH,
+                            ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+                    final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+
+                    ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
+                            .registerTreeChangeListener(TestModel.TEST_PATH, listener);
+
+                    assertNotNull("registerTreeChangeListener returned null", listenerReg);
+
+                    IntegrationTestKit.verifyShardState(dataStore, "test-1",
+                        state -> assertEquals("getTreeChangeListenerActors", 1,
+                                state.getTreeChangeListenerActors().size()));
+
+                    // Wait for the initial notification
+                    listener.waitForChangeEvents(TestModel.TEST_PATH);
+                    listener.reset(2);
+
+                    // Write 2 updates.
+                    testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+                            ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+                    YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                            .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+                    testWriteTransaction(dataStore, listPath,
+                            ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+                    // Wait for the 2 updates.
+                    listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
+                    listenerReg.close();
+
+                    IntegrationTestKit.verifyShardState(dataStore, "test-1",
+                        state -> assertEquals("getTreeChangeListenerActors", 0,
+                                state.getTreeChangeListenerActors().size()));
+
                     testWriteTransaction(dataStore,
                             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
                     testWriteTransaction(dataStore,
                             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
index 42d0337ba9bfd116eb26d2b35013e1c5c4af186b..3ca0f9c7aad2b096754d04af4cd607bd950895b8 100644 (file)
@@ -174,7 +174,8 @@ public class ShardDataTreeTest extends AbstractTest {
         immediatePayloadReplication(shardDataTree, mockShard);
 
         DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
         immediatePayloadReplication(shardDataTree, mockShard);
 
         DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
-        shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener);
+        shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener,
+                Optional.absent(), noop -> { });
 
         addCar(shardDataTree, "optima");
 
 
         addCar(shardDataTree, "optima");
 
index 45239ac63fdff808d8bb4ddceaf70d84f7eb7752..0bd34aac3ee768eb9c4abc0f872b0e83cf67f547 100644 (file)
@@ -59,6 +59,8 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -2122,10 +2124,10 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
     }
 
     @Test
-    public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
+    public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
             {
         new ShardTestKit(getSystem()) {
             {
-                final String testName = "testClusteredDataChangeListenerDelayedRegistration";
+                final String testName = "testClusteredDataChangeListenerWithDelayedRegistration";
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
@@ -2206,10 +2208,10 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
     }
 
     @Test
-    public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+    public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
             {
         new ShardTestKit(getSystem()) {
             {
-                final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+                final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
@@ -2238,6 +2240,43 @@ public class ShardTest extends AbstractShardTest {
         };
     }
 
         };
     }
 
+    @Test
+    public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
+        new ShardTestKit(getSystem()) {
+            {
+                final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
+                dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
+                        .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+                final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                        TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+                setupInMemorySnapshotStore();
+
+                final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                        newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        actorFactory.generateActorId(testName + "-shard"));
+
+                waitUntilNoLeader(shard);
+
+                shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+                final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                        RegisterDataTreeChangeListenerReply.class);
+                assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+                final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
+                regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+                expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
+
+                shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
+                        .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
+
+                listener.expectNoMoreChanges("Received unexpected change after close");
+            }
+        };
+    }
+
     @Test
     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
     @Test
     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
         new ShardTestKit(getSystem()) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java
new file mode 100644 (file)
index 0000000..12428e9
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors;
+
+import static org.mockito.Mockito.timeout;
+
+import akka.actor.ActorRef;
+import akka.testkit.JavaTestKit;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+public class DataTreeNotificationListenerRegistrationActorTest extends AbstractActorTest {
+    @Mock
+    private ListenerRegistration<?> mockListenerReg;
+
+    @Mock
+    private Runnable mockOnClose;
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+        DataTreeNotificationListenerRegistrationActor.killDelay = 100;
+    }
+
+    @Test
+    public void testOnReceiveCloseListenerRegistrationAfterSetRegistration() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(),
+                        "testOnReceiveCloseListenerRegistrationAfterSetRegistration");
+                watch(subject);
+
+                subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg,
+                        mockOnClose), ActorRef.noSender());
+                subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+
+                expectMsgClass(duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class);
+
+                Mockito.verify(mockListenerReg, timeout(5000)).close();
+                Mockito.verify(mockOnClose, timeout(5000)).run();
+
+                expectTerminated(duration("5 second"), subject);
+            }
+        };
+    }
+
+    @Test
+    public void testOnReceiveCloseListenerRegistrationBeforeSetRegistration() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(),
+                        "testOnReceiveSetRegistrationAfterPriorClose");
+                watch(subject);
+
+                subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
+                expectMsgClass(duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class);
+
+                subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg,
+                        mockOnClose), ActorRef.noSender());
+
+                Mockito.verify(mockListenerReg, timeout(5000)).close();
+                Mockito.verify(mockOnClose, timeout(5000)).run();
+
+                expectTerminated(duration("5 second"), subject);
+            }
+        };
+    }
+
+    @Test
+    public void testOnReceiveSetRegistrationAfterPriorClose() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                DataTreeNotificationListenerRegistrationActor.killDelay = 1000;
+                final ListenerRegistration<?> mockListenerReg2 = Mockito.mock(ListenerRegistration.class);
+                final Runnable mockOnClose2 = Mockito.mock(Runnable.class);
+
+                final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(),
+                        "testOnReceiveSetRegistrationAfterPriorClose");
+                watch(subject);
+
+                subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg,
+                        mockOnClose), ActorRef.noSender());
+                subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
+                subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg2,
+                        mockOnClose2), ActorRef.noSender());
+
+                Mockito.verify(mockListenerReg, timeout(5000)).close();
+                Mockito.verify(mockOnClose, timeout(5000)).run();
+                Mockito.verify(mockListenerReg2, timeout(5000)).close();
+                Mockito.verify(mockOnClose2, timeout(5000)).run();
+
+                expectTerminated(duration("5 second"), subject);
+            }
+        };
+    }
+}
index 4be431554118484f6760a0e78f72f0f82bfba86d..b414e0f269f9b19ea868a05e61d0692f9a5f573d 100644 (file)
@@ -11,6 +11,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Arrays;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Arrays;
@@ -23,6 +24,9 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
@@ -54,12 +58,46 @@ public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
         }
     }
 
         }
     }
 
-    public void waitForChangeEvents() {
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public void waitForChangeEvents(YangInstanceIdentifier... expPaths) {
         boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS);
         if (!done) {
             fail(String.format("Missing change notifications. Expected: %d. Actual: %d",
                     expChangeEventCount, expChangeEventCount - changeLatch.getCount()));
         }
         boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS);
         if (!done) {
             fail(String.format("Missing change notifications. Expected: %d. Actual: %d",
                     expChangeEventCount, expChangeEventCount - changeLatch.getCount()));
         }
+
+        for (int i = 0; i < expPaths.length; i++) {
+            final DataTreeCandidate candidate = changeList.get(i);
+            final Optional<NormalizedNode<?, ?>> maybeDataAfter = candidate.getRootNode().getDataAfter();
+            if (!maybeDataAfter.isPresent()) {
+                fail(String.format("Change %d does not contain data after. Actual: %s", i + 1,
+                        candidate.getRootNode()));
+            }
+
+            final NormalizedNode<?, ?> dataAfter = maybeDataAfter.get();
+            final Optional<YangInstanceIdentifier> relativePath = expPaths[i].relativeTo(candidate.getRootPath());
+            if (!relativePath.isPresent()) {
+                assertEquals(String.format("Change %d does not contain %s. Actual: %s", i + 1, expPaths[i],
+                        dataAfter), expPaths[i].getLastPathArgument(), dataAfter.getIdentifier());
+            } else {
+                NormalizedNode<?, ?> nextChild = dataAfter;
+                for (PathArgument pathArg: relativePath.get().getPathArguments()) {
+                    boolean found = false;
+                    if (nextChild instanceof NormalizedNodeContainer) {
+                        Optional<NormalizedNode<?, ?>> maybeChild = ((NormalizedNodeContainer)nextChild)
+                                .getChild(pathArg);
+                        if (maybeChild.isPresent()) {
+                            found = true;
+                            nextChild = maybeChild.get();
+                        }
+                    }
+
+                    if (!found) {
+                        fail(String.format("Change %d does not contain %s. Actual: %s", i + 1, expPaths[i], dataAfter));
+                    }
+                }
+            }
+        }
     }
 
     public void verifyNotifiedData(YangInstanceIdentifier... paths) {
     }
 
     public void verifyNotifiedData(YangInstanceIdentifier... paths) {