From ec870dee9bacb971f11bc747b69e84ac37f5d746 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 14 Apr 2017 09:03:51 -0400 Subject: [PATCH] Bug 8231: Fix testChangeListenerRegistration failure As described in Bug 8231, the sharing of the ListenerTree between the ShardDataTree and the ShardDataTreeNotificationPublisherActor is problematic. Therefore the ListenerTree (wrapped by the DefaultShardDataTreeChangeListenerPublisher) is now owned by the ShardDataTreeNotificationPublisherActor. On registration, a RegisterListener messages is sent to the ShardDataTreeNotificationPublisherActor to perform the on-boarding of the new listener, ie it atomically generates and sends the initial notification and then adds the listener to the ListenerTree. This change necessitated some refactoring of the DataChangeListenerSupport class et al wrt to how the ListenerRegistration is handled. Prior the ListenerRegistration was passed on creation of the registration actor. This is now done indirectly by sending a SetRegistration message to the registration actor via a Consumer callback passed in the RegisterListener message. When the ListenerRegistration is obtained by the ShardDataChangePublisherActor, it invokes the Consumer callback. When a registration is initially delayed due to no leader, the DelayedListenerRegistration is sent to the registration actor. When the leader is elected later on, the actual ListenerRegistration is sent and replaces the DelayedListenerRegistration. The DOMDataTreeChangeListener registration classes were changed/refactored similarly. In addition, the 2 specific registration actor classes were replaced by a generic reusable DataTreeNotificationListenerRegistrationActor that handles both listener types. Also the 2 CloseData*ListenerRegistration and CloseData*ListenerRegistrationReply messages were consolidated. Change-Id: I79ac76b8044609351e5dd8367b691b589ea35075 Signed-off-by: Tom Pantelis --- .../AbstractDataListenerSupport.java | 65 +++++++---- ...taTreeNotificationPublisherActorProxy.java | 31 ++--- .../DataChangeListenerRegistrationActor.java | 77 ------------- .../DataChangeListenerRegistrationProxy.java | 8 +- .../datastore/DataChangeListenerSupport.java | 87 +++----------- .../DataTreeChangeListenerProxy.java | 7 +- ...taTreeChangeListenerRegistrationActor.java | 67 ----------- .../DataTreeChangeListenerSupport.java | 71 ++---------- ...faultShardDataChangeListenerPublisher.java | 29 +++-- ...tShardDataTreeChangeListenerPublisher.java | 47 +++++--- ...DelayedDataChangeListenerRegistration.java | 8 +- .../DelayedDataTreeListenerRegistration.java | 6 +- .../DelayedListenerRegistration.java | 26 ++--- .../cluster/datastore/DelegateFactory.java | 18 --- .../datastore/LeaderLocalDelegateFactory.java | 2 +- .../controller/cluster/datastore/Shard.java | 4 +- .../ShardDataChangeListenerPublisher.java | 14 ++- ...DataChangeListenerPublisherActorProxy.java | 37 +++--- .../ShardDataChangePublisherActor.java | 73 ++++++++++++ .../cluster/datastore/ShardDataTree.java | 47 ++------ .../ShardDataTreeChangeListenerPublisher.java | 13 ++- ...TreeChangeListenerPublisherActorProxy.java | 33 +++--- .../ShardDataTreeChangePublisherActor.java | 65 +++++++++++ ...ardDataTreeNotificationPublisherActor.java | 42 ++++--- ...NotificationListenerRegistrationActor.java | 82 +++++++++++++ .../AbstractEntityOwnerChangeListener.java | 3 +- .../CandidateListChangeListener.java | 3 +- .../CloseDataChangeListenerRegistration.java | 16 --- ...seDataChangeListenerRegistrationReply.java | 18 --- ...TreeNotificationListenerRegistration.java} | 10 +- ...otificationListenerRegistrationReply.java} | 10 +- .../messages/ListenerRegistrationMessage.java | 3 + .../messages/RegisterChangeListener.java | 14 +-- .../RegisterDataTreeChangeListener.java | 6 +- ...taChangeListenerRegistrationProxyTest.java | 8 +- .../DataChangeListenerRegistrationTest.java | 76 ------------ .../DataTreeChangeListenerProxyTest.java | 6 +- ...eeChangeListenerRegistrationActorTest.java | 47 -------- .../DataTreeChangeListenerSupportTest.java | 8 +- .../DistributedDataStoreIntegrationTest.java | 64 +++++++++++ .../cluster/datastore/ShardDataTreeTest.java | 3 +- .../cluster/datastore/ShardTest.java | 47 +++++++- ...ficationListenerRegistrationActorTest.java | 108 ++++++++++++++++++ .../utils/MockDataTreeChangeListener.java | 40 ++++++- 44 files changed, 764 insertions(+), 685 deletions(-) delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationActor.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangePublisherActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisherActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActor.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java rename opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/{CloseDataTreeChangeListenerRegistration.java => CloseDataTreeNotificationListenerRegistration.java} (62%) rename opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/{CloseDataTreeChangeListenerRegistrationReply.java => CloseDataTreeNotificationListenerRegistrationReply.java} (62%) delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActorTest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java index 0821951a1a..ac7a9337d9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java @@ -12,31 +12,36 @@ import akka.actor.ActorSelection; import java.util.ArrayList; import java.util.Collection; import java.util.EventListener; +import java.util.concurrent.ConcurrentHashMap; +import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; abstract class AbstractDataListenerSupport, R extends ListenerRegistration> - extends LeaderLocalDelegateFactory { + D extends DelayedListenerRegistration> extends LeaderLocalDelegateFactory { private final Logger log = LoggerFactory.getLogger(getClass()); - private final ArrayList delayedListenerRegistrations = new ArrayList<>(); - private final ArrayList delayedListenerOnAllRegistrations = new ArrayList<>(); - private final Collection actors = new ArrayList<>(); + private final Collection delayedListenerRegistrations = ConcurrentHashMap.newKeySet(); + private final Collection delayedListenerOnAllRegistrations = ConcurrentHashMap.newKeySet(); + private final Collection leaderOnlyListenerActors = ConcurrentHashMap.newKeySet(); + private final Collection allListenerActors = ConcurrentHashMap.newKeySet(); protected AbstractDataListenerSupport(Shard shard) { super(shard); } + Collection getListenerActors() { + return new ArrayList<>(allListenerActors); + } + @Override void onLeadershipChange(boolean isLeader, boolean hasLeader) { log.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader); final EnableNotification msg = new EnableNotification(isLeader); - for (ActorSelection dataChangeListener : actors) { + for (ActorSelection dataChangeListener : leaderOnlyListenerActors) { dataChangeListener.tell(msg, getSelf()); } @@ -46,7 +51,6 @@ abstract class AbstractDataListenerSupport registration; + ActorRef registrationActor = createActor(DataTreeNotificationListenerRegistrationActor.props()); + if (hasLeader && message.isRegisterOnAllInstances() || isLeader) { - registration = createDelegate(message); + doRegistration(message, registrationActor); } else { log.debug("{}: Shard is not the leader - delaying registration", persistenceId()); - D delayedReg = newDelayedListenerRegistration(message); + D delayedReg = newDelayedListenerRegistration(message, registrationActor); + Collection delayedRegList; if (message.isRegisterOnAllInstances()) { - delayedListenerOnAllRegistrations.add(delayedReg); + delayedRegList = delayedListenerOnAllRegistrations; } else { - delayedListenerRegistrations.add(delayedReg); + delayedRegList = delayedListenerRegistrations; } - registration = delayedReg; + delayedRegList.add(delayedReg); + registrationActor.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration( + delayedReg, () -> delayedRegList.remove(delayedReg)), ActorRef.noSender()); } - ActorRef registrationActor = newRegistrationActor(registration); - log.debug("{}: {} sending reply, listenerRegistrationPath = {} ", persistenceId(), logName(), registrationActor.path()); tellSender(newRegistrationReplyMessage(registrationActor)); } + protected ActorSelection processListenerRegistrationMessage(M message) { + final ActorSelection listenerActor = selectActor(message.getListenerActorPath()); + + // We have a leader so enable the listener. + listenerActor.tell(new EnableNotification(true), getSelf()); + + if (!message.isRegisterOnAllInstances()) { + // This is a leader-only registration so store a reference to the listener actor so it can be notified + // at a later point if notifications should be enabled or disabled. + leaderOnlyListenerActors.add(listenerActor); + } + + allListenerActors.add(listenerActor); + + return listenerActor; + } + protected Logger log() { return log; } - protected void addListenerActor(ActorSelection actor) { - actors.add(actor); + protected void removeListenerActor(ActorSelection listenerActor) { + allListenerActors.remove(listenerActor); + leaderOnlyListenerActors.remove(listenerActor); } - protected abstract D newDelayedListenerRegistration(M message); + abstract void doRegistration(M message, ActorRef registrationActor); - protected abstract ActorRef newRegistrationActor(ListenerRegistration registration); + protected abstract D newDelayedListenerRegistration(M message, ActorRef registrationActor); protected abstract Object newRegistrationReplyMessage(ActorRef registrationActor); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java index b7356d28bc..6b348fb76f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorContext; import akka.actor.ActorRef; +import akka.actor.Props; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; @@ -28,38 +29,40 @@ abstract class AbstractShardDataTreeNotificationPublisherActorProxy implements S private final ActorContext actorContext; private final String actorName; + private final String logContext; private ActorRef notifierActor; - protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName) { + protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName, + String logContext) { this.actorContext = actorContext; this.actorName = actorName; + this.logContext = logContext; } - protected AbstractShardDataTreeNotificationPublisherActorProxy( - AbstractShardDataTreeNotificationPublisherActorProxy other) { - this.actorContext = null; - this.actorName = null; - this.notifierActor = other.getNotifierActor(); + protected abstract Props props(); + + protected final String actorName() { + return actorName; } - protected abstract ShardDataTreeNotificationPublisher getDelegatePublisher(); + protected final String logContext() { + return logContext; + } @Override public void publishChanges(DataTreeCandidate candidate, String logContext) { - getNotifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications( - getDelegatePublisher(), candidate, logContext), ActorRef.noSender()); + notifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications(candidate), + ActorRef.noSender()); } - private ActorRef getNotifierActor() { + protected final ActorRef notifierActor() { if (notifierActor == null) { LOG.debug("Creating actor {}", actorName); String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath( Dispatchers.DispatcherType.Notification); - notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props(actorName) - .withDispatcher(dispatcher).withMailbox( - org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), - actorName); + notifierActor = actorContext.actorOf(props().withDispatcher(dispatcher).withMailbox( + org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName); } return notifierActor; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationActor.java deleted file mode 100644 index 89062dc83c..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationActor.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore; - -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.japi.Creator; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; - -public class DataChangeListenerRegistrationActor extends AbstractUntypedActor { - - private final ListenerRegistration>> - registration; - - public DataChangeListenerRegistrationActor( - ListenerRegistration>> registration) { - this.registration = registration; - } - - @Override - public void handleReceive(Object message) { - if (message instanceof CloseDataChangeListenerRegistration) { - closeListenerRegistration(); - } else { - unknownMessage(message); - } - } - - public static Props props(final ListenerRegistration>> registration) { - return Props.create(new DataChangeListenerRegistrationCreator(registration)); - } - - private void closeListenerRegistration() { - registration.close(); - - if (isValidSender(getSender())) { - getSender().tell(CloseDataChangeListenerRegistrationReply.INSTANCE, getSelf()); - } - - getSelf().tell(PoisonPill.getInstance(), getSelf()); - } - - private static class DataChangeListenerRegistrationCreator - implements Creator { - private static final long serialVersionUID = 1L; - - @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't " - + "create remote instances of this actor and thus don't need it to be Serializable.") - final ListenerRegistration>> registration; - - DataChangeListenerRegistrationCreator( - ListenerRegistration>> registration) { - this.registration = registration; - } - - @Override - public DataChangeListenerRegistrationActor create() throws Exception { - return new DataChangeListenerRegistrationActor(registration); - } - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java index f0f4b7b9b6..3fe3933fe7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java @@ -15,7 +15,7 @@ import akka.dispatch.OnComplete; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -86,7 +86,8 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration } if (sendCloseMessage) { - listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, null); + listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), + ActorRef.noSender()); } } @@ -145,7 +146,8 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration } if (sendCloseMessage) { - listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, ActorRef.noSender()); + listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), + ActorRef.noSender()); listenerRegistrationActor = null; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java index 2e26e6ee36..6b0d8294d4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java @@ -9,102 +9,41 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import com.google.common.base.Optional; -import com.google.common.collect.Sets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map.Entry; -import java.util.Set; -import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; +import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; final class DataChangeListenerSupport extends AbstractDataListenerSupport< AsyncDataChangeListener>, RegisterChangeListener, - DelayedDataChangeListenerRegistration, DataChangeListenerRegistration< - AsyncDataChangeListener>>> { - - private final Set listenerActors = Sets.newConcurrentHashSet(); + DelayedDataChangeListenerRegistration> { DataChangeListenerSupport(final Shard shard) { super(shard); } - Collection getListenerActors() { - return new ArrayList<>(listenerActors); - } - @Override - DataChangeListenerRegistration>> - createDelegate(final RegisterChangeListener message) { - final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath()); - - // Notify the listener if notifications should be enabled or not - // If this shard is the leader then it will enable notifications else - // it will not - dataChangeListenerPath.tell(new EnableNotification(true), getSelf()); - - // Now store a reference to the data change listener so it can be notified - // at a later point if notifications should be enabled or disabled - if (!message.isRegisterOnAllInstances()) { - addListenerActor(dataChangeListenerPath); - } + void doRegistration(final RegisterChangeListener message, final ActorRef registrationActor) { + final ActorSelection listenerActor = processListenerRegistrationMessage(message); AsyncDataChangeListener> listener = - new DataChangeListenerProxy(dataChangeListenerPath); + new DataChangeListenerProxy(listenerActor); log().debug("{}: Registering for path {}", persistenceId(), message.getPath()); - Entry>>, - Optional> regEntry = getShard().getDataStore().registerChangeListener( - message.getPath(), listener, message.getScope()); - - getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue()); - - listenerActors.add(dataChangeListenerPath); - final DataChangeListenerRegistration>> - delegate = regEntry.getKey(); - return new DataChangeListenerRegistration>>() { - @Override - public void close() { - listenerActors.remove(dataChangeListenerPath); - delegate.close(); - } - - @Override - public AsyncDataChangeListener> getInstance() { - return delegate.getInstance(); - } - - @Override - public YangInstanceIdentifier getPath() { - return delegate.getPath(); - } - - @Override - public DataChangeScope getScope() { - return delegate.getScope(); - } - }; - } - - @Override - protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message) { - return new DelayedDataChangeListenerRegistration(message); + final ShardDataTree shardDataTree = getShard().getDataStore(); + shardDataTree.registerDataChangeListener(message.getPath(), listener, message.getScope(), + shardDataTree.readCurrentData(), registration -> registrationActor.tell( + new DataTreeNotificationListenerRegistrationActor.SetRegistration(registration, () -> + removeListenerActor(listenerActor)), ActorRef.noSender())); } @Override - protected ActorRef newRegistrationActor( - ListenerRegistration>> registration) { - return createActor(DataChangeListenerRegistrationActor.props(registration)); + protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message, + ActorRef registrationActor) { + return new DelayedDataChangeListenerRegistration(message, registrationActor); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java index f60e676013..dd280a6c08 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java @@ -15,7 +15,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -55,7 +55,8 @@ final class DataTreeChangeListenerProxy ext @Override protected synchronized void removeRegistration() { if (listenerRegistrationActor != null) { - listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender()); + listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), + ActorRef.noSender()); listenerRegistrationActor = null; } @@ -94,7 +95,7 @@ final class DataTreeChangeListenerProxy ext } // This registration has already been closed, notify the actor - actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null); + actor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), null); } private void doRegistration(final ActorRef shard) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java deleted file mode 100644 index 17deeeb4dc..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore; - -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.japi.Creator; -import com.google.common.base.Preconditions; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply; -import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.yangtools.concepts.ListenerRegistration; - -/** - * Actor co-located with a shard. It exists only to terminate the registration when - * asked to do so via {@link CloseDataTreeChangeListenerRegistration}. - */ -public final class DataTreeChangeListenerRegistrationActor extends AbstractUntypedActor { - private final ListenerRegistration registration; - - public DataTreeChangeListenerRegistrationActor(final ListenerRegistration registration) { - this.registration = Preconditions.checkNotNull(registration); - } - - @Override - protected void handleReceive(Object message) throws Exception { - if (message instanceof CloseDataTreeChangeListenerRegistration) { - registration.close(); - if (isValidSender(getSender())) { - getSender().tell(CloseDataTreeChangeListenerRegistrationReply.getInstance(), getSelf()); - } - - getSelf().tell(PoisonPill.getInstance(), getSelf()); - } else { - unknownMessage(message); - } - } - - public static Props props(final ListenerRegistration registration) { - return Props.create(new DataTreeChangeListenerRegistrationCreator(registration)); - } - - private static final class DataTreeChangeListenerRegistrationCreator - implements Creator { - private static final long serialVersionUID = 1L; - - @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't " - + "create remote instances of this actor and thus don't need it to be Serializable.") - final ListenerRegistration registration; - - DataTreeChangeListenerRegistrationCreator(ListenerRegistration registration) { - this.registration = Preconditions.checkNotNull(registration); - } - - @Override - public DataTreeChangeListenerRegistrationActor create() { - return new DataTreeChangeListenerRegistrationActor(registration); - } - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java index 9a44b47e7e..4f70327cb7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java @@ -9,84 +9,37 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import com.google.common.base.Optional; -import com.google.common.collect.Sets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map.Entry; -import java.util.Set; -import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; +import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport> { - - private final Set listenerActors = Sets.newConcurrentHashSet(); + RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration> { DataTreeChangeListenerSupport(final Shard shard) { super(shard); } - Collection getListenerActors() { - return new ArrayList<>(listenerActors); - } - @Override - ListenerRegistration createDelegate( - final RegisterDataTreeChangeListener message) { - final ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath()); + void doRegistration(final RegisterDataTreeChangeListener message, final ActorRef registrationActor) { + final ActorSelection listenerActor = processListenerRegistrationMessage(message); - // Notify the listener if notifications should be enabled or not - // If this shard is the leader then it will enable notifications else - // it will not - dataChangeListenerPath.tell(new EnableNotification(true), getSelf()); - - // Now store a reference to the data change listener so it can be notified - // at a later point if notifications should be enabled or disabled - if (!message.isRegisterOnAllInstances()) { - addListenerActor(dataChangeListenerPath); - } - - DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath); + DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(listenerActor); log().debug("{}: Registering for path {}", persistenceId(), message.getPath()); - Entry, Optional> regEntry = - getShard().getDataStore().registerTreeChangeListener(message.getPath(), listener); - - getShard().getDataStore().notifyOfInitialData(message.getPath(), - regEntry.getKey().getInstance(), regEntry.getValue()); - - listenerActors.add(dataChangeListenerPath); - final ListenerRegistration delegate = regEntry.getKey(); - return new ListenerRegistration() { - @Override - public DOMDataTreeChangeListener getInstance() { - return delegate.getInstance(); - } - - @Override - public void close() { - listenerActors.remove(dataChangeListenerPath); - delegate.close(); - } - }; + final ShardDataTree shardDataTree = getShard().getDataStore(); + shardDataTree.registerTreeChangeListener(message.getPath(), + listener, shardDataTree.readCurrentData(), registration -> registrationActor.tell( + new DataTreeNotificationListenerRegistrationActor.SetRegistration(registration, () -> + removeListenerActor(listenerActor)), ActorRef.noSender())); } @Override protected DelayedDataTreeListenerRegistration newDelayedListenerRegistration( - RegisterDataTreeChangeListener message) { - return new DelayedDataTreeListenerRegistration(message); - } - - @Override - protected ActorRef newRegistrationActor(ListenerRegistration registration) { - return createActor(DataTreeChangeListenerRegistrationActor.props(registration)); + RegisterDataTreeChangeListener message, ActorRef registrationActor) { + return new DelayedDataTreeListenerRegistration(message, registrationActor); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java index a94773481d..98083a0b1c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.base.Optional; +import java.util.function.Consumer; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; @@ -14,6 +16,7 @@ import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeE import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.util.concurrent.NotificationManager; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -59,14 +62,26 @@ final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeLi } @Override - public >> - DataChangeListenerRegistration registerDataChangeListener(YangInstanceIdentifier path, L listener, - DataChangeScope scope) { - return dataChangeListenerTree.registerDataChangeListener(path, listener, scope); + public void registerDataChangeListener(YangInstanceIdentifier path, + AsyncDataChangeListener> listener, DataChangeScope scope, + Optional initialState, + Consumer>>> + onRegistration) { + final DataChangeListenerRegistration>> + registration = dataChangeListenerTree.registerDataChangeListener(path, listener, scope); + + onRegistration.accept(registration); + + if (initialState.isPresent()) { + notifySingleListener(path, listener, scope, initialState.get()); + } } - @Override - public ShardDataChangeListenerPublisher newInstance() { - return new DefaultShardDataChangeListenerPublisher(); + static void notifySingleListener(final YangInstanceIdentifier path, + final AsyncDataChangeListener> listener, + final DataChangeScope scope, final DataTreeCandidate initialState) { + DefaultShardDataChangeListenerPublisher publisher = new DefaultShardDataChangeListenerPublisher(); + publisher.registerDataChangeListener(path, listener, scope, Optional.absent(), noop -> { }); + publisher.publishChanges(initialState, ""); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java index a63859e9e6..02326e0f84 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java @@ -7,9 +7,11 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.base.Optional; import java.util.Collection; +import java.util.function.Consumer; import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTreeChangePublisher; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -34,11 +36,6 @@ final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStore processCandidateTree(candidate); } - @Override - public ShardDataTreeChangeListenerPublisher newInstance() { - return new DefaultShardDataTreeChangeListenerPublisher(); - } - @Override protected void notifyListener(AbstractDOMDataTreeChangeListenerRegistration registration, Collection changes) { @@ -51,18 +48,32 @@ final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStore } @Override - public ListenerRegistration - registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) { - final AbstractDOMDataTreeChangeListenerRegistration registration = - super.registerTreeChangeListener(treeId, (org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener) - changes -> listener.onDataTreeChanged(changes)); + public void registerTreeChangeListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener, + Optional initialState, + Consumer> onRegistration) { + AbstractDOMDataTreeChangeListenerRegistration + registration = super.registerTreeChangeListener(treeId, + (org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener)changes -> + listener.onDataTreeChanged(changes)); + + onRegistration.accept( + new org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration< + DOMDataTreeChangeListener>(listener) { + @Override + protected void removeRegistration() { + registration.close(); + } + }); + + if (initialState.isPresent()) { + notifySingleListener(treeId, listener, initialState.get()); + } + } - return new org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration( - listener) { - @Override - protected void removeRegistration() { - registration.close(); - } - }; + static void notifySingleListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener, + DataTreeCandidate state) { + DefaultShardDataTreeChangeListenerPublisher publisher = new DefaultShardDataTreeChangeListenerPublisher(); + publisher.registerTreeChangeListener(treeId, listener, Optional.absent(), noop -> { }); + publisher.publishChanges(state, ""); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java index c49c3de9a8..a8fe1002da 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -15,7 +16,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; final class DelayedDataChangeListenerRegistration extends DelayedListenerRegistration< AsyncDataChangeListener>, RegisterChangeListener> { - DelayedDataChangeListenerRegistration(final RegisterChangeListener registerChangeListener) { - super(registerChangeListener); + DelayedDataChangeListenerRegistration(final RegisterChangeListener registerChangeListener, + final ActorRef registrationActor) { + super(registerChangeListener, registrationActor); } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java index 35f7308689..c67115a1f3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; @@ -18,8 +19,9 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; final class DelayedDataTreeListenerRegistration extends DelayedListenerRegistration { - DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) { - super(registerTreeChangeListener); + DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener, + final ActorRef registrationActor) { + super(registerTreeChangeListener, registrationActor); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java index 8d73bc6155..18d23aa0cd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java @@ -7,33 +7,32 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; import java.util.EventListener; import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage; import org.opendaylight.yangtools.concepts.ListenerRegistration; -abstract class DelayedListenerRegistration implements ListenerRegistration { +abstract class DelayedListenerRegistration + implements ListenerRegistration { private final M registrationMessage; - private volatile ListenerRegistration delegate; + private final ActorRef registrationActor; @GuardedBy("this") private boolean closed; - protected DelayedListenerRegistration(M registrationMessage) { + protected DelayedListenerRegistration(M registrationMessage, ActorRef registrationActor) { this.registrationMessage = registrationMessage; + this.registrationActor = registrationActor; } M getRegistrationMessage() { return registrationMessage; } - ListenerRegistration getDelegate() { - return delegate; - } - - synchronized > void createDelegate( - final LeaderLocalDelegateFactory factory) { + synchronized void createDelegate(final AbstractDataListenerSupport support) { if (!closed) { - this.delegate = factory.createDelegate(registrationMessage); + support.doRegistration(registrationMessage, registrationActor); } } @@ -50,11 +49,6 @@ abstract class DelayedListenerRegistration implement @Override public synchronized void close() { - if (!closed) { - closed = true; - if (delegate != null) { - delegate.close(); - } - } + closed = true; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java deleted file mode 100644 index cd1b548b8e..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore; - -/** - * Base class for factories instantiating delegates. - * - * @param message type - * @param delegate type - */ -abstract class DelegateFactory { - abstract D createDelegate(M message); -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java index 0f76629540..5f58bc9ef5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java @@ -20,7 +20,7 @@ import com.google.common.base.Preconditions; * @param delegate type * @param message type */ -abstract class LeaderLocalDelegateFactory extends DelegateFactory { +abstract class LeaderLocalDelegateFactory { private final Shard shard; protected LeaderLocalDelegateFactory(final Shard shard) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 39ee810eb0..aca69c2a5f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -188,9 +188,9 @@ public class Shard extends RaftActor { LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher = - new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"); + new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name); ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher = - new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"); + new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name); if (builder.getDataTree() != null) { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java index 87a0c8d64b..3558107f17 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java @@ -7,11 +7,14 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.base.Optional; +import java.util.function.Consumer; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; /** * Interface for a class that generates and publishes notifications for DataChangeListeners. @@ -19,8 +22,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; * @author Thomas Pantelis */ interface ShardDataChangeListenerPublisher extends ShardDataTreeNotificationPublisher { - ShardDataChangeListenerPublisher newInstance(); - - >> DataChangeListenerRegistration - registerDataChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope); + void registerDataChangeListener(YangInstanceIdentifier path, + AsyncDataChangeListener> listener, DataChangeScope scope, + Optional initialState, + Consumer>>> + onRegistration); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java index e3c71830a8..a17c603596 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java @@ -8,12 +8,17 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorContext; +import akka.actor.ActorRef; +import akka.actor.Props; +import com.google.common.base.Optional; +import java.util.function.Consumer; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; /** * Implementation of ShardDataChangeListenerPublisher that offloads the generation and publication @@ -25,30 +30,22 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; class ShardDataChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy implements ShardDataChangeListenerPublisher { - private final ShardDataChangeListenerPublisher delegatePublisher = new DefaultShardDataChangeListenerPublisher(); - - ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) { - super(actorContext, actorName); - } - - private ShardDataChangeListenerPublisherActorProxy(ShardDataChangeListenerPublisherActorProxy other) { - super(other); - } - - @Override - public >> - DataChangeListenerRegistration registerDataChangeListener(YangInstanceIdentifier path, L listener, - DataChangeScope scope) { - return delegatePublisher.registerDataChangeListener(path, listener, scope); + ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName, String logContext) { + super(actorContext, actorName, logContext); } @Override - public ShardDataChangeListenerPublisher newInstance() { - return new ShardDataChangeListenerPublisherActorProxy(this); + public void registerDataChangeListener(YangInstanceIdentifier path, + AsyncDataChangeListener> listener, DataChangeScope scope, + Optional initialState, + Consumer>>> + onRegistration) { + notifierActor().tell(new ShardDataChangePublisherActor.RegisterListener(path, listener, scope, initialState, + onRegistration), ActorRef.noSender()); } @Override - protected ShardDataTreeNotificationPublisher getDelegatePublisher() { - return delegatePublisher; + protected Props props() { + return ShardDataChangePublisherActor.props(actorName(), logContext()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangePublisherActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangePublisherActor.java new file mode 100644 index 0000000000..cb7d80dbdc --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangePublisherActor.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Props; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.function.Consumer; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; + +/** + * Actor used to generate and publish DataChange notifications. + * + * @author Thomas Pantelis + */ +public class ShardDataChangePublisherActor + extends ShardDataTreeNotificationPublisherActor { + + private ShardDataChangePublisherActor(final String name, final String logContext) { + super(new DefaultShardDataChangeListenerPublisher(), name, logContext); + } + + @Override + protected void handleReceive(Object message) { + if (message instanceof RegisterListener) { + RegisterListener reg = (RegisterListener)message; + if (reg.initialState.isPresent()) { + DefaultShardDataChangeListenerPublisher.notifySingleListener(reg.path, reg.listener, reg.scope, + reg.initialState.get()); + } + + publisher().registerDataChangeListener(reg.path, reg.listener, reg.scope, Optional.absent(), + reg.onRegistration); + } else { + super.handleReceive(message); + } + } + + static Props props(final String name, final String logContext) { + return Props.create(ShardDataChangePublisherActor.class, name, logContext); + } + + static class RegisterListener { + private final YangInstanceIdentifier path; + private final AsyncDataChangeListener> listener; + private final DataChangeScope scope; + private final Optional initialState; + private final Consumer>>> onRegistration; + + RegisterListener(final YangInstanceIdentifier path, + final AsyncDataChangeListener> listener, + final DataChangeScope scope, final Optional initialState, + final Consumer>>> onRegistration) { + this.path = Preconditions.checkNotNull(path); + this.listener = Preconditions.checkNotNull(listener); + this.scope = Preconditions.checkNotNull(scope); + this.initialState = Preconditions.checkNotNull(initialState); + this.onRegistration = Preconditions.checkNotNull(onRegistration); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 9abb00292f..f7d98276ca 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -24,7 +24,6 @@ import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -63,7 +62,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -544,25 +542,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { dataChangeListenerPublisher.publishChanges(candidate, logContext); } - void notifyOfInitialData(final DataChangeListenerRegistration>> listenerReg, final Optional currentState) { - if (currentState.isPresent()) { - ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance(); - localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(), - listenerReg.getScope()); - localPublisher.publishChanges(currentState.get(), logContext); - } - } - - void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, - final Optional currentState) { - if (currentState.isPresent()) { - ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance(); - localPublisher.registerTreeChangeListener(path, listener); - localPublisher.publishChanges(currentState.get(), logContext); - } - } - /** * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled * replication callbacks. @@ -615,29 +594,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback); } - Entry>>, - Optional> registerChangeListener(final YangInstanceIdentifier path, - final AsyncDataChangeListener> listener, - final DataChangeScope scope) { - DataChangeListenerRegistration>> reg = - dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope); - - return new SimpleEntry<>(reg, readCurrentData()); + void registerDataChangeListener(YangInstanceIdentifier path, + AsyncDataChangeListener> listener, DataChangeScope scope, + Optional initialState, + Consumer>>> + onRegistration) { + dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration); } - private Optional readCurrentData() { + Optional readCurrentData() { final Optional> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY); return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode( YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.absent(); } - public Entry, Optional> - registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { - final ListenerRegistration reg = - treeChangeListenerPublisher.registerTreeChangeListener(path, listener); - - return new SimpleEntry<>(reg, readCurrentData()); + public void registerTreeChangeListener(YangInstanceIdentifier path, DOMDataTreeChangeListener listener, + Optional initialState, + Consumer> onRegistration) { + treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration); } int getQueueSize() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java index d4a5156d2b..7f20beecd5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java @@ -7,13 +7,20 @@ */ package org.opendaylight.controller.cluster.datastore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; +import com.google.common.base.Optional; +import java.util.function.Consumer; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; /** * Interface for a class that generates and publishes notifications for DataTreeChangeListeners. * * @author Thomas Pantelis */ -interface ShardDataTreeChangeListenerPublisher extends ShardDataTreeNotificationPublisher, DOMStoreTreeChangePublisher { - ShardDataTreeChangeListenerPublisher newInstance(); +interface ShardDataTreeChangeListenerPublisher extends ShardDataTreeNotificationPublisher { + void registerTreeChangeListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener, + Optional initialState, + Consumer> onRegistration); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java index 7196f839e4..ceaeeccaad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java @@ -8,10 +8,15 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorContext; +import akka.actor.ActorRef; +import akka.actor.Props; +import com.google.common.base.Optional; +import java.util.function.Consumer; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; /** * Implementation of ShardDataTreeChangeListenerPublisher that offloads the generation and publication @@ -23,30 +28,20 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; class ShardDataTreeChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy implements ShardDataTreeChangeListenerPublisher { - private final ShardDataTreeChangeListenerPublisher delegatePublisher = - new DefaultShardDataTreeChangeListenerPublisher(); - - ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) { - super(actorContext, actorName); - } - - private ShardDataTreeChangeListenerPublisherActorProxy(ShardDataTreeChangeListenerPublisherActorProxy other) { - super(other); - } - - @Override - public ListenerRegistration registerTreeChangeListener( - YangInstanceIdentifier treeId, L listener) { - return delegatePublisher.registerTreeChangeListener(treeId, listener); + ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName, String logContext) { + super(actorContext, actorName, logContext); } @Override - public ShardDataTreeChangeListenerPublisher newInstance() { - return new ShardDataTreeChangeListenerPublisherActorProxy(this); + public void registerTreeChangeListener(YangInstanceIdentifier treeId, + DOMDataTreeChangeListener listener, Optional currentState, + Consumer> onRegistration) { + notifierActor().tell(new ShardDataTreeChangePublisherActor.RegisterListener(treeId, listener, currentState, + onRegistration), ActorRef.noSender()); } @Override - protected ShardDataTreeNotificationPublisher getDelegatePublisher() { - return delegatePublisher; + protected Props props() { + return ShardDataTreeChangePublisherActor.props(actorName(), logContext()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisherActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisherActor.java new file mode 100644 index 0000000000..f054ccdae8 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisherActor.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Props; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.function.Consumer; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; + +/** + * Actor used to generate and publish DataTreeChange notifications. + * + * @author Thomas Pantelis + */ +public class ShardDataTreeChangePublisherActor + extends ShardDataTreeNotificationPublisherActor { + + private ShardDataTreeChangePublisherActor(final String name, final String logContext) { + super(new DefaultShardDataTreeChangeListenerPublisher(), name, logContext); + } + + @Override + protected void handleReceive(Object message) { + if (message instanceof RegisterListener) { + RegisterListener reg = (RegisterListener)message; + if (reg.initialState.isPresent()) { + DefaultShardDataTreeChangeListenerPublisher.notifySingleListener(reg.path, reg.listener, + reg.initialState.get()); + } + + publisher().registerTreeChangeListener(reg.path, reg.listener, Optional.absent(), reg.onRegistration); + } else { + super.handleReceive(message); + } + } + + static Props props(final String name, final String logContext) { + return Props.create(ShardDataTreeChangePublisherActor.class, name, logContext); + } + + static class RegisterListener { + private final YangInstanceIdentifier path; + private final DOMDataTreeChangeListener listener; + private final Optional initialState; + private final Consumer> onRegistration; + + RegisterListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, + final Optional initialState, + final Consumer> onRegistration) { + this.path = Preconditions.checkNotNull(path); + this.listener = Preconditions.checkNotNull(listener); + this.initialState = Preconditions.checkNotNull(initialState); + this.onRegistration = Preconditions.checkNotNull(onRegistration); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java index 76c52c9bb5..ea08830829 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.actor.Props; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; @@ -19,31 +18,43 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; * * @author Thomas Pantelis */ -public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActor { +public class ShardDataTreeNotificationPublisherActor + extends AbstractUntypedActor { + private final T publisher; private final Stopwatch timer = Stopwatch.createUnstarted(); private final String name; + private final String logContext; - private ShardDataTreeNotificationPublisherActor(String name) { + protected ShardDataTreeNotificationPublisherActor(final T publisher, final String name, final String logContext) { + this.publisher = publisher; this.name = name; + this.logContext = logContext; + } + + protected T publisher() { + return publisher; + } + + protected String logContext() { + return logContext; } @Override protected void handleReceive(Object message) { if (message instanceof PublishNotifications) { - PublishNotifications publisher = (PublishNotifications)message; + PublishNotifications toPublish = (PublishNotifications)message; timer.start(); try { - publisher.publish(); + publisher.publishChanges(toPublish.candidate, logContext); } finally { long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS); if (elapsedTime >= ShardDataTreeNotificationPublisher.PUBLISH_DELAY_THRESHOLD_IN_MS) { LOG.warn("{}: Generation of change events for {} took longer than expected. Elapsed time: {}", - publisher.logContext, name, timer); + logContext, name, timer); } else { - LOG.debug("{}: Elapsed time for generation of change events for {}: {}", publisher.logContext, - name, timer); + LOG.debug("{}: Elapsed time for generation of change events for {}: {}", logContext, name, timer); } timer.reset(); @@ -51,24 +62,11 @@ public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActo } } - static Props props(String notificationType) { - return Props.create(ShardDataTreeNotificationPublisherActor.class, notificationType); - } - static class PublishNotifications { - private final ShardDataTreeNotificationPublisher publisher; private final DataTreeCandidate candidate; - private final String logContext; - PublishNotifications(ShardDataTreeNotificationPublisher publisher, DataTreeCandidate candidate, - String logContext) { - this.publisher = publisher; + PublishNotifications(DataTreeCandidate candidate) { this.candidate = candidate; - this.logContext = logContext; - } - - private void publish() { - publisher.publishChanges(candidate, logContext); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActor.java new file mode 100644 index 0000000000..2a60abbc46 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActor.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors; + +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.PoisonPill; +import akka.actor.Props; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import scala.concurrent.duration.Duration; + +/** + * Actor co-located with a shard. It exists only to terminate the registration when + * asked to do so via {@link CloseDataTreeNotificationListenerRegistration}. + */ +public final class DataTreeNotificationListenerRegistrationActor extends AbstractUntypedActor { + @VisibleForTesting + static long killDelay = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS); + + private ListenerRegistration registration; + private Runnable onClose; + private boolean closed; + private Cancellable killSchedule; + + @Override + protected void handleReceive(Object message) throws Exception { + if (message instanceof CloseDataTreeNotificationListenerRegistration) { + closeListenerRegistration(); + if (isValidSender(getSender())) { + getSender().tell(CloseDataTreeNotificationListenerRegistrationReply.getInstance(), getSelf()); + } + } else if (message instanceof SetRegistration) { + registration = ((SetRegistration)message).registration; + onClose = ((SetRegistration)message).onClose; + if (closed) { + closeListenerRegistration(); + } + } else { + unknownMessage(message); + } + } + + private void closeListenerRegistration() { + closed = true; + if (registration != null) { + registration.close(); + onClose.run(); + registration = null; + + if (killSchedule == null) { + killSchedule = getContext().system().scheduler().scheduleOnce(Duration.create(killDelay, + TimeUnit.MILLISECONDS), getSelf(), PoisonPill.getInstance(), getContext().dispatcher(), + ActorRef.noSender()); + } + } + } + + public static Props props() { + return Props.create(DataTreeNotificationListenerRegistrationActor.class); + } + + public static class SetRegistration { + private final ListenerRegistration registration; + private final Runnable onClose; + + public SetRegistration(final ListenerRegistration registration, final Runnable onClose) { + this.registration = Preconditions.checkNotNull(registration); + this.onClose = Preconditions.checkNotNull(onClose); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnerChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnerChangeListener.java index 1989bad81a..6094e2469d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnerChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnerChangeListener.java @@ -12,6 +12,7 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME; +import com.google.common.base.Optional; import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType; @@ -24,7 +25,7 @@ public abstract class AbstractEntityOwnerChangeListener implements DOMDataTreeCh .node(ENTITY_OWNER_QNAME).build(); void init(ShardDataTree shardDataTree) { - shardDataTree.registerTreeChangeListener(EOS_PATH, this); + shardDataTree.registerTreeChangeListener(EOS_PATH, this, Optional.absent(), noop -> { }); } protected static String extractOwner(LeafNode ownerLeaf) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java index 3b4f2d9647..6b6717c7da 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListener.java @@ -14,6 +14,7 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME; import akka.actor.ActorRef; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; @@ -60,7 +61,7 @@ class CandidateListChangeListener implements DOMDataTreeChangeListener { void init(ShardDataTree shardDataTree) { shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH) .node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME) - .node(Candidate.QNAME).node(Candidate.QNAME).build(), this); + .node(Candidate.QNAME).node(Candidate.QNAME).build(), this, Optional.absent(), noop -> { }); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java deleted file mode 100644 index 8c35caa2c1..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore.messages; - -public class CloseDataChangeListenerRegistration { - public static final CloseDataChangeListenerRegistration INSTANCE = new CloseDataChangeListenerRegistration(); - - private CloseDataChangeListenerRegistration() { - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java deleted file mode 100644 index 430202af42..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore.messages; - - -public class CloseDataChangeListenerRegistrationReply { - public static final CloseDataChangeListenerRegistrationReply INSTANCE = - new CloseDataChangeListenerRegistrationReply(); - - private CloseDataChangeListenerRegistrationReply() { - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeNotificationListenerRegistration.java similarity index 62% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeNotificationListenerRegistration.java index dfcc894ffe..3e968a4eab 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeNotificationListenerRegistration.java @@ -10,15 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages; import java.io.ObjectStreamException; import java.io.Serializable; -public final class CloseDataTreeChangeListenerRegistration implements Serializable { +public final class CloseDataTreeNotificationListenerRegistration implements Serializable { private static final long serialVersionUID = 1L; - private static final CloseDataTreeChangeListenerRegistration INSTANCE = - new CloseDataTreeChangeListenerRegistration(); + private static final CloseDataTreeNotificationListenerRegistration INSTANCE = + new CloseDataTreeNotificationListenerRegistration(); - private CloseDataTreeChangeListenerRegistration() { + private CloseDataTreeNotificationListenerRegistration() { } - public static CloseDataTreeChangeListenerRegistration getInstance() { + public static CloseDataTreeNotificationListenerRegistration getInstance() { return INSTANCE; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeNotificationListenerRegistrationReply.java similarity index 62% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeNotificationListenerRegistrationReply.java index a7717b69e0..1a6a485b5f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeNotificationListenerRegistrationReply.java @@ -10,16 +10,16 @@ package org.opendaylight.controller.cluster.datastore.messages; import java.io.ObjectStreamException; import java.io.Serializable; -public final class CloseDataTreeChangeListenerRegistrationReply implements Serializable { +public final class CloseDataTreeNotificationListenerRegistrationReply implements Serializable { private static final long serialVersionUID = 1L; - private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE = - new CloseDataTreeChangeListenerRegistrationReply(); + private static final CloseDataTreeNotificationListenerRegistrationReply INSTANCE = + new CloseDataTreeNotificationListenerRegistrationReply(); - private CloseDataTreeChangeListenerRegistrationReply() { + private CloseDataTreeNotificationListenerRegistrationReply() { // Use getInstance() instead } - public static CloseDataTreeChangeListenerRegistrationReply getInstance() { + public static CloseDataTreeNotificationListenerRegistrationReply getInstance() { return INSTANCE; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java index 5cea06b4cb..3f016a444f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java @@ -7,10 +7,13 @@ */ package org.opendaylight.controller.cluster.datastore.messages; +import akka.actor.ActorPath; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; public interface ListenerRegistrationMessage { YangInstanceIdentifier getPath(); boolean isRegisterOnAllInstances(); + + ActorPath getListenerActorPath(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java index f5d8698475..163525a8ca 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java @@ -15,15 +15,14 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; public class RegisterChangeListener implements ListenerRegistrationMessage { private final YangInstanceIdentifier path; - private final ActorRef dataChangeListener; + private final ActorRef dataChangeListenerActor; private final AsyncDataBroker.DataChangeScope scope; private final boolean registerOnAllInstances; - public RegisterChangeListener(YangInstanceIdentifier path, - ActorRef dataChangeListener, - AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) { + public RegisterChangeListener(YangInstanceIdentifier path, ActorRef dataChangeListenerActor, + AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) { this.path = path; - this.dataChangeListener = dataChangeListener; + this.dataChangeListenerActor = dataChangeListenerActor; this.scope = scope; this.registerOnAllInstances = registerOnAllInstances; } @@ -37,8 +36,9 @@ public class RegisterChangeListener implements ListenerRegistrationMessage { return scope; } - public ActorPath getDataChangeListenerPath() { - return dataChangeListener.path(); + @Override + public ActorPath getListenerActorPath() { + return dataChangeListenerActor.path(); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java index c9c0c0ceea..abf072b640 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore.messages; +import akka.actor.ActorPath; import akka.actor.ActorRef; import com.google.common.base.Preconditions; import java.io.Externalizable; @@ -42,8 +43,9 @@ public final class RegisterDataTreeChangeListener implements Externalizable, Lis return path; } - public ActorRef getDataTreeChangeListenerPath() { - return dataTreeChangeListenerPath; + @Override + public ActorPath getListenerActorPath() { + return dataTreeChangeListenerPath.path(); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index 1c733cc1ff..17a55b9767 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -30,7 +30,7 @@ import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; @@ -114,7 +114,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { proxy.close(); // The listener registration actor should get a Close message - expectMsgClass(timeout, CloseDataChangeListenerRegistration.class); + expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class); // The DataChangeListener actor should be terminated expectMsgClass(timeout, Terminated.class); @@ -174,7 +174,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { proxy.close(); // The listener registration actor should get a Close message - expectMsgClass(timeout, CloseDataChangeListenerRegistration.class); + expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class); // The DataChangeListener actor should be terminated expectMsgClass(timeout, Terminated.class); @@ -326,7 +326,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME), AsyncDataBroker.DataChangeScope.ONE); - expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.class); + expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class); Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); proxy.close(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java deleted file mode 100644 index d23cfea5f4..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore; - -import static org.junit.Assert.assertEquals; - -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import com.google.common.util.concurrent.MoreExecutors; -import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; - -public class DataChangeListenerRegistrationTest extends AbstractActorTest { - private static final InMemoryDOMDataStore STORE = new InMemoryDOMDataStore("OPER", - MoreExecutors.newDirectExecutorService()); - - static { - STORE.onGlobalContextUpdated(TestModel.createTestContext()); - } - - @Test - public void testOnReceiveCloseListenerRegistration() throws Exception { - new JavaTestKit(getSystem()) { - { - final Props props = DataChangeListenerRegistrationActor.props(STORE.registerChangeListener( - TestModel.TEST_PATH, noOpDataChangeListener(), AsyncDataBroker.DataChangeScope.BASE)); - final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration"); - - new Within(duration("1 seconds")) { - @Override - protected void run() { - - subject.tell(CloseDataChangeListenerRegistration.INSTANCE, getRef()); - - final String out = new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run - // afterwards - @Override - protected String match(final Object in) { - if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.class)) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - - expectNoMsg(); - } - - }; - } - }; - } - - private static AsyncDataChangeListener> noOpDataChangeListener() { - return change -> { - }; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java index e6b20a7206..6f2f99b24e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java @@ -29,7 +29,7 @@ import org.junit.Test; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; @@ -93,7 +93,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { proxy.close(); // The listener registration actor should get a Close message - expectMsgClass(timeout, CloseDataTreeChangeListenerRegistration.class); + expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class); // The DataChangeListener actor should be terminated expectMsgClass(timeout, Terminated.class); @@ -277,7 +277,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { proxy.init(shardName); - expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class); + expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class); Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActorTest.java deleted file mode 100644 index 9259a6190a..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActorTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore; - -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import com.google.common.util.concurrent.MoreExecutors; -import org.junit.Test; -import org.mockito.Mockito; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.yangtools.concepts.ListenerRegistration; - -public class DataTreeChangeListenerRegistrationActorTest extends AbstractActorTest { - private static final InMemoryDOMDataStore STORE = new InMemoryDOMDataStore("OPER", - MoreExecutors.newDirectExecutorService()); - - static { - STORE.onGlobalContextUpdated(TestModel.createTestContext()); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testOnReceiveCloseListenerRegistration() throws Exception { - new JavaTestKit(getSystem()) { - { - final ListenerRegistration mockListenerReg = Mockito.mock(ListenerRegistration.class); - final Props props = DataTreeChangeListenerRegistrationActor.props(mockListenerReg); - final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration"); - - subject.tell(CloseDataTreeChangeListenerRegistration.getInstance(), getRef()); - - expectMsgClass(duration("1 second"), CloseDataTreeChangeListenerRegistrationReply.class); - - Mockito.verify(mockListenerReg).close(); - } - }; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java index 00f0f4bdf0..1426e4a243 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java @@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; @@ -92,8 +92,8 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest { listener.reset(1); JavaTestKit kit = new JavaTestKit(getSystem()); - entry.getValue().tell(CloseDataTreeChangeListenerRegistration.getInstance(), kit.getRef()); - kit.expectMsgClass(JavaTestKit.duration("5 seconds"), CloseDataTreeChangeListenerRegistrationReply.class); + entry.getValue().tell(CloseDataTreeNotificationListenerRegistration.getInstance(), kit.getRef()); + kit.expectMsgClass(JavaTestKit.duration("5 seconds"), CloseDataTreeNotificationListenerRegistrationReply.class); writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME)); listener.verifyNoNotifiedData(TEST_PATH); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 31e36cf679..e3aae92f9c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -63,6 +63,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardData import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; +import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; @@ -117,6 +118,8 @@ public class DistributedDataStoreIntegrationTest { @Before public void setUp() throws IOException { + InMemorySnapshotStore.clear(); + InMemoryJournal.clear(); system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"); Cluster.get(system).join(member1Address); @@ -1167,6 +1170,10 @@ public class DistributedDataStoreIntegrationTest { assertNotNull("registerChangeListener returned null", listenerReg); + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getDataChangeListenerActors", 1, + state.getDataChangeListenerActors().size())); + // Wait for the initial notification listener.waitForChangeEvents(TestModel.TEST_PATH); listener.reset(2); @@ -1184,6 +1191,63 @@ public class DistributedDataStoreIntegrationTest { listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); listenerReg.close(); + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getDataChangeListenerActors", 0, + state.getDataChangeListenerActors().size())); + + testWriteTransaction(dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); + + listener.expectNoMoreChanges("Received unexpected change after close"); + } + } + }; + } + + @Test + public void testDataTreeChangeListenerRegistration() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (final AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testDataTreeChangeListenerRegistration", "test-1")) { + + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + + ListenerRegistration listenerReg = dataStore + .registerTreeChangeListener(TestModel.TEST_PATH, listener); + + assertNotNull("registerTreeChangeListener returned null", listenerReg); + + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getTreeChangeListenerActors", 1, + state.getTreeChangeListenerActors().size())); + + // Wait for the initial notification + listener.waitForChangeEvents(TestModel.TEST_PATH); + listener.reset(2); + + // Write 2 updates. + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); + testWriteTransaction(dataStore, listPath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + + // Wait for the 2 updates. + listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); + listenerReg.close(); + + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getTreeChangeListenerActors", 0, + state.getTreeChangeListenerActors().size())); + testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java index 42d0337ba9..3ca0f9c7aa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java @@ -174,7 +174,8 @@ public class ShardDataTreeTest extends AbstractTest { immediatePayloadReplication(shardDataTree, mockShard); DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class); - shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener); + shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener, + Optional.absent(), noop -> { }); addCar(shardDataTree, "optima"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 45239ac63f..0bd34aac3e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -59,6 +59,8 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; @@ -2122,10 +2124,10 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testClusteredDataChangeListenerDelayedRegistration() throws Exception { + public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception { new ShardTestKit(getSystem()) { { - final String testName = "testClusteredDataChangeListenerDelayedRegistration"; + final String testName = "testClusteredDataChangeListenerWithDelayedRegistration"; dataStoreContextBuilder.shardElectionTimeoutFactor(1000) .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); @@ -2206,10 +2208,10 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception { + public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception { new ShardTestKit(getSystem()) { { - final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration"; + final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration"; dataStoreContextBuilder.shardElectionTimeoutFactor(1000) .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); @@ -2238,6 +2240,43 @@ public class ShardTest extends AbstractShardTest { }; } + @Test + public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception { + new ShardTestKit(getSystem()) { + { + final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed"; + dataStoreContextBuilder.shardElectionTimeoutFactor(1000) + .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); + + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, + TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener")); + + setupInMemorySnapshotStore(); + + final TestActorRef shard = actorFactory.createTestActor( + newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(testName + "-shard")); + + waitUntilNoLeader(shard); + + shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); + final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeChangeListenerReply.class); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); + + final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath()); + regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef()); + expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class); + + shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()) + .customRaftPolicyImplementation(null).build(), ActorRef.noSender()); + + listener.expectNoMoreChanges("Received unexpected change after close"); + } + }; + } + @Test public void testClusteredDataTreeChangeListenerRegistration() throws Exception { new ShardTestKit(getSystem()) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java new file mode 100644 index 0000000000..12428e9256 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors; + +import static org.mockito.Mockito.timeout; + +import akka.actor.ActorRef; +import akka.testkit.JavaTestKit; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply; +import org.opendaylight.yangtools.concepts.ListenerRegistration; + +public class DataTreeNotificationListenerRegistrationActorTest extends AbstractActorTest { + @Mock + private ListenerRegistration mockListenerReg; + + @Mock + private Runnable mockOnClose; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + DataTreeNotificationListenerRegistrationActor.killDelay = 100; + } + + @Test + public void testOnReceiveCloseListenerRegistrationAfterSetRegistration() throws Exception { + new JavaTestKit(getSystem()) { + { + final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(), + "testOnReceiveCloseListenerRegistrationAfterSetRegistration"); + watch(subject); + + subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg, + mockOnClose), ActorRef.noSender()); + subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef()); + + expectMsgClass(duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class); + + Mockito.verify(mockListenerReg, timeout(5000)).close(); + Mockito.verify(mockOnClose, timeout(5000)).run(); + + expectTerminated(duration("5 second"), subject); + } + }; + } + + @Test + public void testOnReceiveCloseListenerRegistrationBeforeSetRegistration() throws Exception { + new JavaTestKit(getSystem()) { + { + final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(), + "testOnReceiveSetRegistrationAfterPriorClose"); + watch(subject); + + subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef()); + expectMsgClass(duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class); + + subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg, + mockOnClose), ActorRef.noSender()); + + Mockito.verify(mockListenerReg, timeout(5000)).close(); + Mockito.verify(mockOnClose, timeout(5000)).run(); + + expectTerminated(duration("5 second"), subject); + } + }; + } + + @Test + public void testOnReceiveSetRegistrationAfterPriorClose() throws Exception { + new JavaTestKit(getSystem()) { + { + DataTreeNotificationListenerRegistrationActor.killDelay = 1000; + final ListenerRegistration mockListenerReg2 = Mockito.mock(ListenerRegistration.class); + final Runnable mockOnClose2 = Mockito.mock(Runnable.class); + + final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(), + "testOnReceiveSetRegistrationAfterPriorClose"); + watch(subject); + + subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg, + mockOnClose), ActorRef.noSender()); + subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender()); + subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg2, + mockOnClose2), ActorRef.noSender()); + + Mockito.verify(mockListenerReg, timeout(5000)).close(); + Mockito.verify(mockOnClose, timeout(5000)).run(); + Mockito.verify(mockListenerReg2, timeout(5000)).close(); + Mockito.verify(mockOnClose2, timeout(5000)).run(); + + expectTerminated(duration("5 second"), subject); + } + }; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataTreeChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataTreeChangeListener.java index 4be4315541..b414e0f269 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataTreeChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataTreeChangeListener.java @@ -11,6 +11,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; +import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import java.util.Arrays; @@ -23,6 +24,9 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; public class MockDataTreeChangeListener implements DOMDataTreeChangeListener { @@ -54,12 +58,46 @@ public class MockDataTreeChangeListener implements DOMDataTreeChangeListener { } } - public void waitForChangeEvents() { + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void waitForChangeEvents(YangInstanceIdentifier... expPaths) { boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS); if (!done) { fail(String.format("Missing change notifications. Expected: %d. Actual: %d", expChangeEventCount, expChangeEventCount - changeLatch.getCount())); } + + for (int i = 0; i < expPaths.length; i++) { + final DataTreeCandidate candidate = changeList.get(i); + final Optional> maybeDataAfter = candidate.getRootNode().getDataAfter(); + if (!maybeDataAfter.isPresent()) { + fail(String.format("Change %d does not contain data after. Actual: %s", i + 1, + candidate.getRootNode())); + } + + final NormalizedNode dataAfter = maybeDataAfter.get(); + final Optional relativePath = expPaths[i].relativeTo(candidate.getRootPath()); + if (!relativePath.isPresent()) { + assertEquals(String.format("Change %d does not contain %s. Actual: %s", i + 1, expPaths[i], + dataAfter), expPaths[i].getLastPathArgument(), dataAfter.getIdentifier()); + } else { + NormalizedNode nextChild = dataAfter; + for (PathArgument pathArg: relativePath.get().getPathArguments()) { + boolean found = false; + if (nextChild instanceof NormalizedNodeContainer) { + Optional> maybeChild = ((NormalizedNodeContainer)nextChild) + .getChild(pathArg); + if (maybeChild.isPresent()) { + found = true; + nextChild = maybeChild.get(); + } + } + + if (!found) { + fail(String.format("Change %d does not contain %s. Actual: %s", i + 1, expPaths[i], dataAfter)); + } + } + } + } } public void verifyNotifiedData(YangInstanceIdentifier... paths) { -- 2.36.6