From: Tom Pantelis Date: Tue, 7 Feb 2017 21:48:03 +0000 (-0500) Subject: Add OnDemandShardState to report additional Shard state X-Git-Tag: release/carbon~267 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=013a6679470bf692753f2e04ab4398c97fd9f5d0 Add OnDemandShardState to report additional Shard state Extended the OnDemandRaftState with OnDemandShardState to include additional Shard state, including DTCL, DCL, and commit cohort actors. This will enable us to report thus info from the JMX bean as it's useful for debugging to have visibility into what listeners and cohorts are registered. The actors now also store the registered path. Both the instance and path will be queried for debugging. Change-Id: Iaa6c27c9aba3b5c0223199e6a3fc21bc54da95ba Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 08907e3fd2..1c057d7553 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -399,7 +399,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - OnDemandRaftState.Builder builder = OnDemandRaftState.builder() + OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog().dataSize()) @@ -443,6 +443,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandRaftState.builder(); + } + private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) { RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java index cf9bb620dd..9bbee881ea 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java @@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.List; import java.util.Map; +import javax.annotation.Nonnull; /** * The response to a GetOnDemandRaftState message. @@ -41,7 +42,7 @@ public class OnDemandRaftState { private Map peerAddresses = Collections.emptyMap(); private Map peerVotingStates = Collections.emptyMap(); - private OnDemandRaftState() { + protected OnDemandRaftState() { } public static Builder builder() { @@ -132,116 +133,131 @@ public class OnDemandRaftState { return customRaftPolicyClassName; } - public static class Builder { - private final OnDemandRaftState stats = new OnDemandRaftState(); + public abstract static class AbstractBuilder> { + @SuppressWarnings("unchecked") + protected T self() { + return (T) this; + } + + @Nonnull + protected abstract OnDemandRaftState state(); - public Builder lastLogIndex(long value) { - stats.lastLogIndex = value; - return this; + public T lastLogIndex(long value) { + state().lastLogIndex = value; + return self(); } - public Builder lastLogTerm(long value) { - stats.lastLogTerm = value; - return this; + public T lastLogTerm(long value) { + state().lastLogTerm = value; + return self(); } - public Builder currentTerm(long value) { - stats.currentTerm = value; - return this; + public T currentTerm(long value) { + state().currentTerm = value; + return self(); } - public Builder commitIndex(long value) { - stats.commitIndex = value; - return this; + public T commitIndex(long value) { + state().commitIndex = value; + return self(); } - public Builder lastApplied(long value) { - stats.lastApplied = value; - return this; + public T lastApplied(long value) { + state().lastApplied = value; + return self(); } - public Builder lastIndex(long value) { - stats.lastIndex = value; - return this; + public T lastIndex(long value) { + state().lastIndex = value; + return self(); } - public Builder lastTerm(long value) { - stats.lastTerm = value; - return this; + public T lastTerm(long value) { + state().lastTerm = value; + return self(); } - public Builder snapshotIndex(long value) { - stats.snapshotIndex = value; - return this; + public T snapshotIndex(long value) { + state().snapshotIndex = value; + return self(); } - public Builder snapshotTerm(long value) { - stats.snapshotTerm = value; - return this; + public T snapshotTerm(long value) { + state().snapshotTerm = value; + return self(); } - public Builder replicatedToAllIndex(long value) { - stats.replicatedToAllIndex = value; - return this; + public T replicatedToAllIndex(long value) { + state().replicatedToAllIndex = value; + return self(); } - public Builder inMemoryJournalDataSize(long value) { - stats.inMemoryJournalDataSize = value; - return this; + public T inMemoryJournalDataSize(long value) { + state().inMemoryJournalDataSize = value; + return self(); } - public Builder inMemoryJournalLogSize(long value) { - stats.inMemoryJournalLogSize = value; - return this; + public T inMemoryJournalLogSize(long value) { + state().inMemoryJournalLogSize = value; + return self(); } - public Builder leader(String value) { - stats.leader = value; - return this; + public T leader(String value) { + state().leader = value; + return self(); } - public Builder raftState(String value) { - stats.raftState = value; - return this; + public T raftState(String value) { + state().raftState = value; + return self(); } - public Builder votedFor(String value) { - stats.votedFor = value; - return this; + public T votedFor(String value) { + state().votedFor = value; + return self(); } - public Builder isVoting(boolean isVoting) { - stats.isVoting = isVoting; - return this; + public T isVoting(boolean isVoting) { + state().isVoting = isVoting; + return self(); } - public Builder followerInfoList(List followerInfoList) { - stats.followerInfoList = followerInfoList; - return this; + public T followerInfoList(List followerInfoList) { + state().followerInfoList = followerInfoList; + return self(); } - public Builder peerAddresses(Map peerAddresses) { - stats.peerAddresses = peerAddresses; - return this; + public T peerAddresses(Map peerAddresses) { + state().peerAddresses = peerAddresses; + return self(); } - public Builder peerVotingStates(Map peerVotingStates) { - stats.peerVotingStates = ImmutableMap.copyOf(peerVotingStates); - return this; + public T peerVotingStates(Map peerVotingStates) { + state().peerVotingStates = ImmutableMap.copyOf(peerVotingStates); + return self(); } - public Builder isSnapshotCaptureInitiated(boolean value) { - stats.isSnapshotCaptureInitiated = value; - return this; + public T isSnapshotCaptureInitiated(boolean value) { + state().isSnapshotCaptureInitiated = value; + return self(); } - public Builder customRaftPolicyClassName(String className) { - stats.customRaftPolicyClassName = className; - return this; + public T customRaftPolicyClassName(String className) { + state().customRaftPolicyClassName = className; + return self(); } public OnDemandRaftState build() { - return stats; + return state(); + } + } + + public static class Builder extends AbstractBuilder { + private final OnDemandRaftState state = new OnDemandRaftState(); + + @Override + protected OnDemandRaftState state() { + return state; } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java index f23f9567a7..0821951a1a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java @@ -9,21 +9,18 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import com.google.common.base.Optional; import java.util.ArrayList; import java.util.Collection; import java.util.EventListener; -import java.util.Map.Entry; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; abstract class AbstractDataListenerSupport, R extends ListenerRegistration> - extends LeaderLocalDelegateFactory> { + extends LeaderLocalDelegateFactory { private final Logger log = LoggerFactory.getLogger(getClass()); private final ArrayList delayedListenerRegistrations = new ArrayList<>(); @@ -68,8 +65,7 @@ abstract class AbstractDataListenerSupport registration; if (hasLeader && message.isRegisterOnAllInstances() || isLeader) { - final Entry> res = createDelegate(message); - registration = res.getKey(); + registration = createDelegate(message); } else { log.debug("{}: Shard is not the leader - delaying registration", persistenceId()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java index 36c822cd0b..db32b36d6e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java @@ -179,8 +179,8 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName); final DataTreeChangeListenerProxy listenerRegistrationProxy = - new DataTreeChangeListenerProxy<>(actorContext, listener); - listenerRegistrationProxy.init(shardName, treeId); + new DataTreeChangeListenerProxy<>(actorContext, listener, treeId); + listenerRegistrationProxy.init(shardName); return listenerRegistrationProxy; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java index 872feeac9d..dc19aa400f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java @@ -9,9 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.Props; -import akka.japi.Creator; import com.google.common.base.Preconditions; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.datastore.messages.DataChanged; import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply; @@ -30,10 +28,13 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @Deprecated public class DataChangeListener extends AbstractUntypedActor { private final AsyncDataChangeListener> listener; + private final YangInstanceIdentifier registeredPath; private boolean notificationsEnabled = false; - public DataChangeListener(AsyncDataChangeListener> listener) { + public DataChangeListener(AsyncDataChangeListener> listener, + final YangInstanceIdentifier registeredPath) { this.listener = Preconditions.checkNotNull(listener, "listener should not be null"); + this.registeredPath = Preconditions.checkNotNull(registeredPath); } @Override @@ -78,26 +79,8 @@ public class DataChangeListener extends AbstractUntypedActor { } } - public static Props props(final AsyncDataChangeListener> listener) { - return Props.create(new DataChangeListenerCreator(listener)); - } - - private static class DataChangeListenerCreator implements Creator { - private static final long serialVersionUID = 1L; - - @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't " - + "create remote instances of this actor and thus don't need it to be Serializable.") - final AsyncDataChangeListener> listener; - - DataChangeListenerCreator( - AsyncDataChangeListener> listener) { - this.listener = listener; - } - - @Override - public DataChangeListener create() throws Exception { - return new DataChangeListener(listener); - } + public static Props props(final AsyncDataChangeListener> listener, + final YangInstanceIdentifier registeredPath) { + return Props.create(DataChangeListener.class, listener, registeredPath); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java index 9eba41aac7..f0f4b7b9b6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java @@ -13,6 +13,7 @@ import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.dispatch.OnComplete; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -41,18 +42,18 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class); - private volatile ActorSelection listenerRegistrationActor; private final AsyncDataChangeListener> listener; - private ActorRef dataChangeListenerActor; private final String shardName; private final ActorContext actorContext; + private ActorRef dataChangeListenerActor; + private volatile ActorSelection listenerRegistrationActor; private boolean closed = false; public >> DataChangeListenerRegistrationProxy(String shardName, ActorContext actorContext, L listener) { - this.shardName = shardName; - this.actorContext = actorContext; - this.listener = listener; + this.shardName = Preconditions.checkNotNull(shardName); + this.actorContext = Preconditions.checkNotNull(actorContext); + this.listener = Preconditions.checkNotNull(listener); } @VisibleForTesting @@ -92,7 +93,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) { dataChangeListenerActor = actorContext.getActorSystem().actorOf( - DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath())); + DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath())); Future findFuture = actorContext.findLocalShardAsync(shardName); findFuture.onComplete(new OnComplete() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java index 26d8fa1b64..9b2beccdad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java @@ -10,10 +10,15 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.Collections; import java.util.Map.Entry; +import java.util.Set; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -26,14 +31,20 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport< DelayedDataChangeListenerRegistration, DataChangeListenerRegistration< AsyncDataChangeListener>>> { + private final Set listenerActors = Sets.newConcurrentHashSet(); + DataChangeListenerSupport(final Shard shard) { super(shard); } + Collection getListenerActors() { + return Collections.unmodifiableCollection(listenerActors); + } + @Override - Entry>>, - Optional> createDelegate(final RegisterChangeListener message) { - ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath()); + DataChangeListenerRegistration>> + createDelegate(final RegisterChangeListener message) { + final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath()); // Notify the listener if notifications should be enabled or not // If this shard is the leader then it will enable notifications else @@ -57,7 +68,32 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport< getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue()); - return regEntry; + listenerActors.add(dataChangeListenerPath); + final DataChangeListenerRegistration>> + delegate = regEntry.getKey(); + return new DataChangeListenerRegistration>>() { + @Override + public void close() { + listenerActors.remove(dataChangeListenerPath); + delegate.close(); + } + + @Override + public AsyncDataChangeListener> getInstance() { + return delegate.getInstance(); + } + + @Override + public YangInstanceIdentifier getPath() { + return delegate.getPath(); + } + + @Override + public DataChangeScope getScope() { + return delegate.getScope(); + } + }; } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java index bccb48477b..2936a28b90 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java @@ -8,14 +8,13 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.Props; -import akka.japi.Creator; import com.google.common.base.Preconditions; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged; import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; /** * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating @@ -23,10 +22,13 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; */ final class DataTreeChangeListenerActor extends AbstractUntypedActor { private final DOMDataTreeChangeListener listener; + private final YangInstanceIdentifier registeredPath; private boolean notificationsEnabled = false; - private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) { + private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, + final YangInstanceIdentifier registeredPath) { this.listener = Preconditions.checkNotNull(listener); + this.registeredPath = Preconditions.checkNotNull(registeredPath); } @Override @@ -70,24 +72,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor { listener); } - public static Props props(final DOMDataTreeChangeListener listener) { - return Props.create(new DataTreeChangeListenerCreator(listener)); - } - - private static final class DataTreeChangeListenerCreator implements Creator { - private static final long serialVersionUID = 1L; - - @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't " - + "create remote instances of this actor and thus don't need it to be Serializable.") - private final DOMDataTreeChangeListener listener; - - DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) { - this.listener = Preconditions.checkNotNull(listener); - } - - @Override - public DataTreeChangeListenerActor create() { - return new DataTreeChangeListenerActor(listener); - } + public static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) { + return Props.create(DataTreeChangeListenerActor.class, listener, registeredPath); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java index 8a9b466b6e..f60e676013 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java @@ -37,15 +37,18 @@ final class DataTreeChangeListenerProxy ext private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class); private final ActorRef dataChangeListenerActor; private final ActorContext actorContext; + private final YangInstanceIdentifier registeredPath; @GuardedBy("this") private ActorSelection listenerRegistrationActor; - DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) { + DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener, + final YangInstanceIdentifier registeredPath) { super(listener); this.actorContext = Preconditions.checkNotNull(actorContext); + this.registeredPath = Preconditions.checkNotNull(registeredPath); this.dataChangeListenerActor = actorContext.getActorSystem().actorOf( - DataTreeChangeListenerActor.props(getInstance()) + DataTreeChangeListenerActor.props(getInstance(), registeredPath) .withDispatcher(actorContext.getNotificationDispatcherPath())); } @@ -59,19 +62,19 @@ final class DataTreeChangeListenerProxy ext dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - void init(final String shardName, final YangInstanceIdentifier treeId) { + void init(final String shardName) { Future findFuture = actorContext.findLocalShardAsync(shardName); findFuture.onComplete(new OnComplete() { @Override public void onComplete(final Throwable failure, final ActorRef shard) { if (failure instanceof LocalShardNotFoundException) { LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " - + "cannot be registered", shardName, getInstance(), treeId); + + "cannot be registered", shardName, getInstance(), registeredPath); } else if (failure != null) { LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " - + "cannot be registered: {}", shardName, getInstance(), treeId, failure); + + "cannot be registered: {}", shardName, getInstance(), registeredPath, failure); } else { - doRegistration(shard, treeId); + doRegistration(shard); } } }, actorContext.getClientDispatcher()); @@ -94,10 +97,10 @@ final class DataTreeChangeListenerProxy ext actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null); } - private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) { + private void doRegistration(final ActorRef shard) { Future future = actorContext.executeOperationAsync(shard, - new RegisterDataTreeChangeListener(path, dataChangeListenerActor, + new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor, getInstance() instanceof ClusteredDOMDataTreeChangeListener), actorContext.getDatastoreContext().getShardInitializationTimeout()); @@ -106,7 +109,7 @@ final class DataTreeChangeListenerProxy ext public void onComplete(final Throwable failure, final Object result) { if (failure != null) { LOG.error("Failed to register DataTreeChangeListener {} at path {}", - getInstance(), path.toString(), failure); + getInstance(), registeredPath, failure); } else { RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result; setListenerRegistrationActor(actorContext.actorSelection( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java index f039ab707b..5b8e5f8abf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java @@ -10,7 +10,11 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.Collections; import java.util.Map.Entry; +import java.util.Set; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; @@ -21,14 +25,21 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport> { + + private final Set listenerActors = Sets.newConcurrentHashSet(); + DataTreeChangeListenerSupport(final Shard shard) { super(shard); } + Collection getListenerActors() { + return Collections.unmodifiableCollection(listenerActors); + } + @Override - Entry, Optional> createDelegate( + ListenerRegistration createDelegate( final RegisterDataTreeChangeListener message) { - ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath()); + final ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath()); // Notify the listener if notifications should be enabled or not // If this shard is the leader then it will enable notifications else @@ -51,7 +62,20 @@ final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport delegate = regEntry.getKey(); + return new ListenerRegistration() { + @Override + public DOMDataTreeChangeListener getInstance() { + return delegate.getInstance(); + } + + @Override + public void close() { + listenerActors.remove(dataChangeListenerPath); + delegate.close(); + } + }; } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java index 0be8f09865..e6ff10d831 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java @@ -19,6 +19,7 @@ import org.opendaylight.mdsal.common.api.PostPreCommitStep; import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep; import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate; import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** @@ -28,10 +29,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; final class DataTreeCohortActor extends AbstractUntypedActor { private final CohortBehaviour idleState = new Idle(); private final DOMDataTreeCommitCohort cohort; + private final YangInstanceIdentifier registeredPath; private CohortBehaviour currentState = idleState; - private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) { + private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) { this.cohort = Preconditions.checkNotNull(cohort); + this.registeredPath = Preconditions.checkNotNull(registeredPath); } @Override @@ -143,7 +146,8 @@ final class DataTreeCohortActor extends AbstractUntypedActor { } else if (message instanceof Abort) { return abort(); } - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s", + message.getClass(), getClass().getSimpleName())); } abstract CohortBehaviour abort(); @@ -269,7 +273,7 @@ final class DataTreeCohortActor extends AbstractUntypedActor { } - static Props props(final DOMDataTreeCommitCohort cohort) { - return Props.create(DataTreeCohortActor.class, cohort); + static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) { + return Props.create(DataTreeCohortActor.class, cohort, registeredPath); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java index 4364e22d2b..f79c0475bb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java @@ -15,6 +15,7 @@ import akka.util.Timeout; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,6 +49,10 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { private final Map> cohortToNode = new HashMap<>(); + Collection getCohortActors() { + return Collections.unmodifiableCollection(cohortToNode.keySet()); + } + @SuppressWarnings("checkstyle:IllegalCatch") void registerCohort(final ActorRef sender, final RegisterCohort cohort) { takeLock(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java index 18146b7f1f..cfe12a1bc6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java @@ -42,8 +42,8 @@ public class DataTreeCohortRegistrationProxy super(cohort); this.subtree = Preconditions.checkNotNull(subtree); this.actorContext = Preconditions.checkNotNull(actorContext); - this.actor = actorContext.getActorSystem().actorOf( - DataTreeCohortActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath())); + this.actor = actorContext.getActorSystem().actorOf(DataTreeCohortActor.props(getInstance(), + subtree.getRootIdentifier()).withDispatcher(actorContext.getNotificationDispatcherPath())); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java index 837242dec4..ac132721c5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java @@ -7,12 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Optional; import java.util.EventListener; -import java.util.Map.Entry; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; abstract class DelayedListenerRegistration implements ListenerRegistration { private final M registrationMessage; @@ -34,10 +31,9 @@ abstract class DelayedListenerRegistration implement } synchronized > void createDelegate( - final LeaderLocalDelegateFactory> factory) { + final LeaderLocalDelegateFactory factory) { if (!closed) { - final Entry> res = factory.createDelegate(registrationMessage); - this.delegate = res.getKey(); + this.delegate = factory.createDelegate(registrationMessage); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java index 1542fd6832..cd1b548b8e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java @@ -7,15 +7,12 @@ */ package org.opendaylight.controller.cluster.datastore; -import java.util.Map.Entry; - /** * Base class for factories instantiating delegates. * * @param message type * @param delegate type - * @param initial state type */ -abstract class DelegateFactory { - abstract Entry createDelegate(M message); +abstract class DelegateFactory { + abstract D createDelegate(M message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java index fbd974a1da..0f76629540 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java @@ -19,9 +19,8 @@ import com.google.common.base.Preconditions; * * @param delegate type * @param message type - * @param initial state type */ -abstract class LeaderLocalDelegateFactory extends DelegateFactory { +abstract class LeaderLocalDelegateFactory extends DelegateFactory { private final Shard shard; protected LeaderLocalDelegateFactory(final Shard shard) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 84c399deb2..d5d129f221 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -65,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; +import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -80,6 +81,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -783,6 +785,13 @@ public class Shard extends RaftActor { store.setRunOnPendingTransactionsComplete(operation); } + @Override + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) + .dataChangeListenerActors(changeSupport.getListenerActors()) + .commitCohortActors(store.getCohortActors()); + } + @Override public String persistenceId() { return this.name; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 282191216d..d398afefa7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -772,6 +772,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingCommit(); } + Collection getCohortActors() { + return cohortRegistry.getCohortActors(); + } + void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { cohortRegistry.process(sender, message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java new file mode 100644 index 0000000000..ba6f229de7 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import java.util.Collection; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; + +/** + * Extends OnDemandRaftState to add Shard state. + * + * @author Thomas Pantelis + */ +public class OnDemandShardState extends OnDemandRaftState { + private Collection treeChangeListenerActors; + private Collection dataChangeListenerActors; + private Collection commitCohortActors; + + public Collection getTreeChangeListenerActors() { + return treeChangeListenerActors; + } + + public Collection getDataChangeListenerActors() { + return dataChangeListenerActors; + } + + public Collection getCommitCohortActors() { + return commitCohortActors; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder extends AbstractBuilder { + private final OnDemandShardState state = new OnDemandShardState(); + + @Override + protected OnDemandRaftState state() { + return state; + } + + public Builder treeChangeListenerActors(Collection actors) { + state.treeChangeListenerActors = actors; + return self(); + } + + public Builder dataChangeListenerActors(Collection actors) { + state.dataChangeListenerActors = actors; + return self(); + } + + public Builder commitCohortActors(Collection actors) { + state.commitCohortActors = actors; + return self(); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java index fc70e3308c..9721766d9f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java @@ -30,6 +30,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodes; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; @@ -53,7 +54,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { final Shard shard = actor.underlyingActor(); final MockDataChangeListener listener = new MockDataChangeListener(0); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, TEST_PATH), "testChangeListenerWithNoInitialData-DataChangeListener"); final DataChangeListenerSupport support = new DataChangeListenerSupport(shard); support.onMessage(new RegisterChangeListener(TEST_PATH, dclActor, DataChangeScope.ONE, false), @@ -79,7 +80,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { writeToStore(shard.getDataStore(), TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, TEST_PATH), "testInitialChangeListenerEventWithContainerPath-DataChangeListener"); final DataChangeListenerSupport support = new DataChangeListenerSupport(shard); support.onMessage(new RegisterChangeListener(TEST_PATH, dclActor, DataChangeScope.ONE, false), @@ -104,7 +105,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2)); final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, OUTER_LIST_PATH), "testInitialChangeListenerEventWithListPath-DataChangeListener"); final DataChangeListenerSupport support = new DataChangeListenerSupport(shard); support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH, dclActor, DataChangeScope.ONE, false), @@ -137,11 +138,11 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { ImmutableNodes.containerNode(OUTER_CONTAINER_QNAME)); final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME); + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), "testInitialChangeListenerEventWithWildcardedListPath-DataChangeListener"); final DataChangeListenerSupport support = new DataChangeListenerSupport(shard); - support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME), dclActor, - DataChangeScope.ONE, false), true, true); + support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), true, true); listener.waitForChangeEvents(); listener.verifyCreatedData(0, outerEntryPath(1)); @@ -169,12 +170,12 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { outerNodeEntry(2, innerNode("three", "four"))))); final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME) + .node(INNER_LIST_QNAME).node(INNER_LIST_QNAME); + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), "testInitialChangeListenerEventWithNestedWildcardedListsPath-DataChangeListener"); final DataChangeListenerSupport support = new DataChangeListenerSupport(shard); - support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME) - .node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor, DataChangeScope.ONE, false), - true, true); + support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), true, true); listener.waitForChangeEvents(); @@ -185,12 +186,13 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { // Register for a specific outer list entry final MockDataChangeListener listener2 = new MockDataChangeListener(1); - final ActorRef dclActor2 = actorFactory.createActor(DataChangeListener.props(listener2), + final YangInstanceIdentifier path2 = OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME) + .node(INNER_LIST_QNAME); + final ActorRef dclActor2 = actorFactory.createActor(DataChangeListener.props(listener2, path2), "testInitialChangeListenerEventWithNestedWildcardedListsPath-DataChangeListener2"); final DataChangeListenerSupport support2 = new DataChangeListenerSupport(shard); - support2.onMessage(new RegisterChangeListener( - OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor2, - DataChangeScope.ONE, false), true, true); + support2.onMessage(new RegisterChangeListener(path2, dclActor2, DataChangeScope.ONE, false), + true, true); listener2.waitForChangeEvents(); listener2.verifyCreatedData(0, innerEntryPath(1, "one")); @@ -219,12 +221,12 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { outerNodeEntry(2, innerNode("three", "four"))))); final MockDataChangeListener listener = new MockDataChangeListener(0); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME) + .node(INNER_LIST_QNAME); + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), "testInitialChangeListenerEventWhenNotInitiallyLeader-DataChangeListener"); final DataChangeListenerSupport support = new DataChangeListenerSupport(shard); - support.onMessage(new RegisterChangeListener( - OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor, - DataChangeScope.ONE, false), false, true); + support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), false, true); listener.expectNoMoreChanges("Unexpected initial change event"); listener.reset(1); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java index 544a56628c..f3dcaa2b70 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.datastore; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH; + import akka.actor.ActorRef; import akka.actor.DeadLetter; import akka.actor.Props; @@ -30,7 +32,7 @@ public class DataChangeListenerTest extends AbstractActorTest { { final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class); final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class); - final Props props = DataChangeListener.props(mockListener); + final Props props = DataChangeListener.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsEnabled"); // Let the DataChangeListener know that notifications should be @@ -53,7 +55,7 @@ public class DataChangeListenerTest extends AbstractActorTest { { final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class); final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class); - final Props props = DataChangeListener.props(mockListener); + final Props props = DataChangeListener.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsDisabled"); subject.tell(new DataChanged(mockChangeEvent), getRef()); @@ -78,7 +80,7 @@ public class DataChangeListenerTest extends AbstractActorTest { { final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class); final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class); - final Props props = DataChangeListener.props(mockListener); + final Props props = DataChangeListener.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender"); getSystem().eventStream().subscribe(getRef(), DeadLetter.class); @@ -115,7 +117,7 @@ public class DataChangeListenerTest extends AbstractActorTest { AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class); Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataChanged(mockChangeEvent2); - Props props = DataChangeListener.props(mockListener); + Props props = DataChangeListener.props(mockListener, TEST_PATH); ActorRef subject = getSystem().actorOf(props, "testDataChangedWithListenerRuntimeEx"); // Let the DataChangeListener know that notifications should be diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java index 6b8658cc49..62b95c214b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH; + import akka.actor.ActorRef; import akka.actor.DeadLetter; import akka.actor.Props; @@ -31,7 +33,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest { final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class); final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); - final Props props = DataTreeChangeListenerActor.props(mockListener); + final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsEnabled"); // Let the DataChangeListener know that notifications should be @@ -54,7 +56,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest { final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class); final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); - final Props props = DataTreeChangeListenerActor.props(mockListener); + final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsDisabled"); subject.tell(new DataTreeChanged(mockCandidates), getRef()); @@ -79,7 +81,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest { final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class); final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); - final Props props = DataTreeChangeListenerActor.props(mockListener); + final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithNoSender"); getSystem().eventStream().subscribe(getRef(), DeadLetter.class); @@ -119,7 +121,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest { final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataTreeChanged(mockCandidates2); - Props props = DataTreeChangeListenerActor.props(mockListener); + Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithListenerRuntimeEx"); // Let the DataChangeListener know that notifications should be diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java index 8cf0d83a79..e6b20a7206 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java @@ -56,14 +56,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorContext, mockListener); + actorContext, mockListener, path); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); new Thread() { @Override public void run() { - proxy.init("shard-1", path); + proxy.init("shard-1"); } }.start(); @@ -115,14 +115,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { ClusteredDOMDataTreeChangeListener mockClusteredListener = mock( ClusteredDOMDataTreeChangeListener.class); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); final DataTreeChangeListenerProxy proxy = - new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener); + new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); new Thread() { @Override public void run() { - proxy.init("shard-1", path); + proxy.init("shard-1"); } }.start(); @@ -150,14 +150,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorContext, mockListener); + actorContext, mockListener, path); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); new Thread() { @Override public void run() { - proxy.init("shard-1", path); + proxy.init("shard-1"); } }.start(); @@ -182,14 +182,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorContext, mockListener); + actorContext, mockListener, path); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); new Thread() { @Override public void run() { - proxy.init("shard-1", path); + proxy.init("shard-1"); } }.start(); @@ -230,7 +230,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { String shardName = "shard-1"; final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorContext, mockListener); + actorContext, mockListener, path); doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); @@ -238,7 +238,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { .executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class)); doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext(); - proxy.init("shard-1", path); + proxy.init("shard-1"); Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); @@ -265,7 +265,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorContext, mockListener); + actorContext, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME)); Answer> answer = invocation -> { proxy.close(); @@ -275,7 +275,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class)); - proxy.init(shardName, YangInstanceIdentifier.of(TestModel.TEST_QNAME)); + proxy.init(shardName); expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java index 6d09d1cfac..00f0f4bdf0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java @@ -38,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChang import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import scala.concurrent.Await; @@ -157,7 +158,7 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest { private Entry registerChangeListener(final YangInstanceIdentifier path, final int expectedEvents) { MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents); - ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener)); + ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, TestModel.TEST_PATH)); try { RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 2edbff2380..45cfd29d25 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -139,8 +139,8 @@ public class ShardTest extends AbstractShardTest { shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender()); final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), - "testRegisterChangeListener-DataChangeListener"); + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, + TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener"); shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); @@ -216,8 +216,9 @@ public class ShardTest extends AbstractShardTest { setupInMemorySnapshotStore(); + final YangInstanceIdentifier path = TestModel.TEST_PATH; final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener"); final TestActorRef shard = actorFactory.createTestActor( @@ -226,8 +227,6 @@ public class ShardTest extends AbstractShardTest { new ShardTestKit(getSystem()) { { - final YangInstanceIdentifier path = TestModel.TEST_PATH; - // Wait until the shard receives the first ElectionTimeout // message. assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); @@ -271,8 +270,8 @@ public class ShardTest extends AbstractShardTest { shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender()); final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), - "testRegisterDataTreeChangeListener-DataTreeChangeListener"); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, + TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener"); shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef()); @@ -326,16 +325,15 @@ public class ShardTest extends AbstractShardTest { setupInMemorySnapshotStore(); + final YangInstanceIdentifier path = TestModel.TEST_PATH; final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path), "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener"); final TestActorRef shard = actorFactory.createTestActor( Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration"); - final YangInstanceIdentifier path = TestModel.TEST_PATH; - new ShardTestKit(getSystem()) { { assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); @@ -2129,8 +2127,9 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.shardElectionTimeoutFactor(1000) .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); + final YangInstanceIdentifier path = TestModel.TEST_PATH; final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), actorFactory.generateActorId(testName + "-DataChangeListener")); setupInMemorySnapshotStore(); @@ -2141,8 +2140,6 @@ public class ShardTest extends AbstractShardTest { waitUntilNoLeader(shard); - final YangInstanceIdentifier path = TestModel.TEST_PATH; - shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), @@ -2189,7 +2186,7 @@ public class ShardTest extends AbstractShardTest { final YangInstanceIdentifier path = TestModel.TEST_PATH; final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), actorFactory.generateActorId(testName + "-DataChangeListener")); followerShard.tell( @@ -2215,8 +2212,8 @@ public class ShardTest extends AbstractShardTest { .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), - actorFactory.generateActorId(testName + "-DataTreeChangeListener")); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, + TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener")); setupInMemorySnapshotStore(); @@ -2271,7 +2268,7 @@ public class ShardTest extends AbstractShardTest { final YangInstanceIdentifier path = TestModel.TEST_PATH; final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path), actorFactory.generateActorId(testName + "-DataTreeChangeListener")); followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());