From: Tom Pantelis Date: Tue, 7 Feb 2017 21:48:03 +0000 (-0500) Subject: Add OnDemandShardState to report additional Shard state X-Git-Tag: release/boron-sr3~8 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;ds=sidebyside;h=8dfdfb5627c0434a4d253945a8f590f9c66f4777;p=controller.git Add OnDemandShardState to report additional Shard state Extended the OnDemandRaftState with OnDemandShardState to include additional Shard state, including DTCL, DCL, and commit cohort actors. This will enable us to report thus info from the JMX bean as it's useful for debugging to have visibility into what listeners and cohorts are registered. The actors now also store the registered path. Both the instance and path will be queried for debugging. Change-Id: Iaa6c27c9aba3b5c0223199e6a3fc21bc54da95ba Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 88e64aa65d..6bef0b593f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -422,7 +422,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - OnDemandRaftState.Builder builder = OnDemandRaftState.builder() + OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog().dataSize()) @@ -466,6 +466,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandRaftState.builder(); + } + private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) { RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java index 0bd85b1e6d..97cdf723c4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java @@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.List; import java.util.Map; +import javax.annotation.Nonnull; /** * The response to a GetOnDemandRaftState message, @@ -41,7 +42,7 @@ public class OnDemandRaftState { private Map peerAddresses = Collections.emptyMap(); private Map peerVotingStates = Collections.emptyMap(); - private OnDemandRaftState() { + protected OnDemandRaftState() { } public static Builder builder() { @@ -132,116 +133,131 @@ public class OnDemandRaftState { return customRaftPolicyClassName; } - public static class Builder { - private final OnDemandRaftState stats = new OnDemandRaftState(); + public abstract static class AbstractBuilder> { + @SuppressWarnings("unchecked") + protected T self() { + return (T) this; + } + + @Nonnull + protected abstract OnDemandRaftState state(); - public Builder lastLogIndex(long value) { - stats.lastLogIndex = value; - return this; + public T lastLogIndex(long value) { + state().lastLogIndex = value; + return self(); } - public Builder lastLogTerm(long value) { - stats.lastLogTerm = value; - return this; + public T lastLogTerm(long value) { + state().lastLogTerm = value; + return self(); } - public Builder currentTerm(long value) { - stats.currentTerm = value; - return this; + public T currentTerm(long value) { + state().currentTerm = value; + return self(); } - public Builder commitIndex(long value) { - stats.commitIndex = value; - return this; + public T commitIndex(long value) { + state().commitIndex = value; + return self(); } - public Builder lastApplied(long value) { - stats.lastApplied = value; - return this; + public T lastApplied(long value) { + state().lastApplied = value; + return self(); } - public Builder lastIndex(long value) { - stats.lastIndex = value; - return this; + public T lastIndex(long value) { + state().lastIndex = value; + return self(); } - public Builder lastTerm(long value) { - stats.lastTerm = value; - return this; + public T lastTerm(long value) { + state().lastTerm = value; + return self(); } - public Builder snapshotIndex(long value) { - stats.snapshotIndex = value; - return this; + public T snapshotIndex(long value) { + state().snapshotIndex = value; + return self(); } - public Builder snapshotTerm(long value) { - stats.snapshotTerm = value; - return this; + public T snapshotTerm(long value) { + state().snapshotTerm = value; + return self(); } - public Builder replicatedToAllIndex(long value) { - stats.replicatedToAllIndex = value; - return this; + public T replicatedToAllIndex(long value) { + state().replicatedToAllIndex = value; + return self(); } - public Builder inMemoryJournalDataSize(long value) { - stats.inMemoryJournalDataSize = value; - return this; + public T inMemoryJournalDataSize(long value) { + state().inMemoryJournalDataSize = value; + return self(); } - public Builder inMemoryJournalLogSize(long value) { - stats.inMemoryJournalLogSize = value; - return this; + public T inMemoryJournalLogSize(long value) { + state().inMemoryJournalLogSize = value; + return self(); } - public Builder leader(String value) { - stats.leader = value; - return this; + public T leader(String value) { + state().leader = value; + return self(); } - public Builder raftState(String value) { - stats.raftState = value; - return this; + public T raftState(String value) { + state().raftState = value; + return self(); } - public Builder votedFor(String value) { - stats.votedFor = value; - return this; + public T votedFor(String value) { + state().votedFor = value; + return self(); } - public Builder isVoting(boolean isVoting) { - stats.isVoting = isVoting; - return this; + public T isVoting(boolean isVoting) { + state().isVoting = isVoting; + return self(); } - public Builder followerInfoList(List followerInfoList) { - stats.followerInfoList = followerInfoList; - return this; + public T followerInfoList(List followerInfoList) { + state().followerInfoList = followerInfoList; + return self(); } - public Builder peerAddresses(Map peerAddresses) { - stats.peerAddresses = peerAddresses; - return this; + public T peerAddresses(Map peerAddresses) { + state().peerAddresses = peerAddresses; + return self(); } - public Builder peerVotingStates(Map peerVotingStates) { - stats.peerVotingStates = ImmutableMap.copyOf(peerVotingStates); - return this; + public T peerVotingStates(Map peerVotingStates) { + state().peerVotingStates = ImmutableMap.copyOf(peerVotingStates); + return self(); } - public Builder isSnapshotCaptureInitiated(boolean value) { - stats.isSnapshotCaptureInitiated = value; - return this; + public T isSnapshotCaptureInitiated(boolean value) { + state().isSnapshotCaptureInitiated = value; + return self(); } - public Builder customRaftPolicyClassName(String className) { - stats.customRaftPolicyClassName = className; - return this; + public T customRaftPolicyClassName(String className) { + state().customRaftPolicyClassName = className; + return self(); } public OnDemandRaftState build() { - return stats; + return state(); + } + } + + public static class Builder extends AbstractBuilder { + private final OnDemandRaftState state = new OnDemandRaftState(); + + @Override + protected OnDemandRaftState state() { + return state; } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java index a253b794db..fde9ff3a46 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java @@ -9,21 +9,18 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import com.google.common.base.Optional; import java.util.ArrayList; import java.util.Collection; import java.util.EventListener; -import java.util.Map.Entry; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class AbstractDataListenerSupport, LR extends ListenerRegistration> - extends LeaderLocalDelegateFactory> { +abstract class AbstractDataListenerSupport, R extends ListenerRegistration> + extends LeaderLocalDelegateFactory { private final Logger log = LoggerFactory.getLogger(getClass()); private final ArrayList delayedListenerRegistrations = new ArrayList<>(); @@ -63,13 +60,12 @@ abstract class AbstractDataListenerSupport registration; - if((hasLeader && message.isRegisterOnAllInstances()) || isLeader) { - final Entry> res = createDelegate(message); - registration = res.getKey(); + if (hasLeader && message.isRegisterOnAllInstances() || isLeader) { + registration = createDelegate(message); } else { log.debug("{}: Shard is not the leader - delaying registration", persistenceId()); @@ -99,7 +95,7 @@ abstract class AbstractDataListenerSupport registration); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java index d572beb4d4..f2c2f1dbf5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.Props; -import akka.japi.Creator; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.datastore.messages.DataChanged; @@ -31,10 +30,13 @@ public class DataChangeListener extends AbstractUntypedActor { private static final Logger LOG = LoggerFactory.getLogger(DataChangeListener.class); private final AsyncDataChangeListener> listener; + private final YangInstanceIdentifier registeredPath; private boolean notificationsEnabled = false; - public DataChangeListener(AsyncDataChangeListener> listener) { + public DataChangeListener(AsyncDataChangeListener> listener, + final YangInstanceIdentifier registeredPath) { this.listener = Preconditions.checkNotNull(listener, "listener should not be null"); + this.registeredPath = Preconditions.checkNotNull(registeredPath); } @Override @@ -78,24 +80,8 @@ public class DataChangeListener extends AbstractUntypedActor { } } - public static Props props(final AsyncDataChangeListener> listener) { - return Props.create(new DataChangeListenerCreator(listener)); - } - - private static class DataChangeListenerCreator implements Creator { - private static final long serialVersionUID = 1L; - - final AsyncDataChangeListener> listener; - - DataChangeListenerCreator( - AsyncDataChangeListener> listener) { - this.listener = listener; - } - - @Override - public DataChangeListener create() throws Exception { - return new DataChangeListener(listener); - } + public static Props props(final AsyncDataChangeListener> listener, + final YangInstanceIdentifier registeredPath) { + return Props.create(DataChangeListener.class, listener, registeredPath); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java index 534beead0e..738d256369 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java @@ -13,6 +13,7 @@ import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.dispatch.OnComplete; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -41,19 +42,19 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class); - private volatile ActorSelection listenerRegistrationActor; private final AsyncDataChangeListener> listener; - private ActorRef dataChangeListenerActor; private final String shardName; private final ActorContext actorContext; + private ActorRef dataChangeListenerActor; + private volatile ActorSelection listenerRegistrationActor; private boolean closed = false; public >> DataChangeListenerRegistrationProxy ( String shardName, ActorContext actorContext, L listener) { - this.shardName = shardName; - this.actorContext = actorContext; - this.listener = listener; + this.shardName = Preconditions.checkNotNull(shardName); + this.actorContext = Preconditions.checkNotNull(actorContext); + this.listener = Preconditions.checkNotNull(listener); } @VisibleForTesting @@ -93,7 +94,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) { dataChangeListenerActor = actorContext.getActorSystem().actorOf( - DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath())); + DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath())); Future findFuture = actorContext.findLocalShardAsync(shardName); findFuture.onComplete(new OnComplete() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java index f4b6bcc9fd..57e20059e6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java @@ -10,10 +10,15 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.Collections; import java.util.Map.Entry; +import java.util.Set; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -26,14 +31,20 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport< DelayedDataChangeListenerRegistration, DataChangeListenerRegistration< AsyncDataChangeListener>>> { + private final Set listenerActors = Sets.newConcurrentHashSet(); + DataChangeListenerSupport(final Shard shard) { super(shard); } + Collection getListenerActors() { + return Collections.unmodifiableCollection(listenerActors); + } + @Override - Entry>>, - Optional> createDelegate(final RegisterChangeListener message) { - ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath()); + DataChangeListenerRegistration>> + createDelegate(final RegisterChangeListener message) { + final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath()); // Notify the listener if notifications should be enabled or not // If this shard is the leader then it will enable notifications else @@ -57,7 +68,32 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport< getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue()); - return regEntry; + listenerActors.add(dataChangeListenerPath); + final DataChangeListenerRegistration>> + delegate = regEntry.getKey(); + return new DataChangeListenerRegistration>>() { + @Override + public void close() { + listenerActors.remove(dataChangeListenerPath); + delegate.close(); + } + + @Override + public AsyncDataChangeListener> getInstance() { + return delegate.getInstance(); + } + + @Override + public YangInstanceIdentifier getPath() { + return delegate.getPath(); + } + + @Override + public DataChangeScope getScope() { + return delegate.getScope(); + } + }; } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java index 03978f2b66..7e6d87cc02 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java @@ -8,13 +8,13 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.Props; -import akka.japi.Creator; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged; import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,10 +25,13 @@ import org.slf4j.LoggerFactory; final class DataTreeChangeListenerActor extends AbstractUntypedActor { private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerActor.class); private final DOMDataTreeChangeListener listener; + private final YangInstanceIdentifier registeredPath; private boolean notificationsEnabled = false; - private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) { + private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, + final YangInstanceIdentifier registeredPath) { this.listener = Preconditions.checkNotNull(listener); + this.registeredPath = Preconditions.checkNotNull(registeredPath); } @Override @@ -71,21 +74,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor { listener); } - public static Props props(final DOMDataTreeChangeListener listener) { - return Props.create(new DataTreeChangeListenerCreator(listener)); - } - - private static final class DataTreeChangeListenerCreator implements Creator { - private static final long serialVersionUID = 1L; - private final DOMDataTreeChangeListener listener; - - DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) { - this.listener = Preconditions.checkNotNull(listener); - } - - @Override - public DataTreeChangeListenerActor create() { - return new DataTreeChangeListenerActor(listener); - } + public static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) { + return Props.create(DataTreeChangeListenerActor.class, listener, registeredPath); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java index a45ae52afd..72d9097191 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java @@ -37,15 +37,19 @@ final class DataTreeChangeListenerProxy ext private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class); private final ActorRef dataChangeListenerActor; private final ActorContext actorContext; + private final YangInstanceIdentifier registeredPath; @GuardedBy("this") private ActorSelection listenerRegistrationActor; - public DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) { + DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener, + final YangInstanceIdentifier registeredPath) { super(listener); this.actorContext = Preconditions.checkNotNull(actorContext); + this.registeredPath = Preconditions.checkNotNull(registeredPath); this.dataChangeListenerActor = actorContext.getActorSystem().actorOf( - DataTreeChangeListenerActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath())); + DataTreeChangeListenerActor.props(getInstance(), registeredPath) + .withDispatcher(actorContext.getNotificationDispatcherPath())); } @Override @@ -58,19 +62,19 @@ final class DataTreeChangeListenerProxy ext dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - void init(final String shardName, final YangInstanceIdentifier treeId) { + void init(final String shardName) { Future findFuture = actorContext.findLocalShardAsync(shardName); findFuture.onComplete(new OnComplete() { @Override public void onComplete(final Throwable failure, final ActorRef shard) { if (failure instanceof LocalShardNotFoundException) { - LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " + - "cannot be registered", shardName, getInstance(), treeId); + LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " + + "cannot be registered", shardName, getInstance(), registeredPath); } else if (failure != null) { - LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " + - "cannot be registered: {}", shardName, getInstance(), treeId, failure); + LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " + + "cannot be registered: {}", shardName, getInstance(), registeredPath, failure); } else { - doRegistration(shard, treeId); + doRegistration(shard); } } }, actorContext.getClientDispatcher()); @@ -93,10 +97,10 @@ final class DataTreeChangeListenerProxy ext actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null); } - private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) { + private void doRegistration(final ActorRef shard) { Future future = actorContext.executeOperationAsync(shard, - new RegisterDataTreeChangeListener(path, dataChangeListenerActor, + new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor, getInstance() instanceof ClusteredDOMDataTreeChangeListener), actorContext.getDatastoreContext().getShardInitializationTimeout()); @@ -105,7 +109,7 @@ final class DataTreeChangeListenerProxy ext public void onComplete(final Throwable failure, final Object result) { if (failure != null) { LOG.error("Failed to register DataTreeChangeListener {} at path {}", - getInstance(), path.toString(), failure); + getInstance(), registeredPath, failure); } else { RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result; setListenerRegistrationActor(actorContext.actorSelection( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java index fa55523db0..abafae8049 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java @@ -10,7 +10,11 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.Collections; import java.util.Map.Entry; +import java.util.Set; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; @@ -20,14 +24,21 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport> { + + private final Set listenerActors = Sets.newConcurrentHashSet(); + DataTreeChangeListenerSupport(final Shard shard) { super(shard); } + Collection getListenerActors() { + return Collections.unmodifiableCollection(listenerActors); + } + @Override - Entry, Optional> createDelegate( + ListenerRegistration createDelegate( final RegisterDataTreeChangeListener message) { - ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath()); + final ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath()); // Notify the listener if notifications should be enabled or not // If this shard is the leader then it will enable notifications else @@ -50,7 +61,20 @@ final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport delegate = regEntry.getKey(); + return new ListenerRegistration() { + @Override + public DOMDataTreeChangeListener getInstance() { + return delegate.getInstance(); + } + + @Override + public void close() { + listenerActors.remove(dataChangeListenerPath); + delegate.close(); + } + }; } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java index 10ffe1f7b7..1ef0e09654 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java @@ -19,6 +19,7 @@ import org.opendaylight.mdsal.common.api.PostPreCommitStep; import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep; import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate; import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,10 +32,12 @@ final class DataTreeCohortActor extends AbstractUntypedActor { private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class); private final CohortBehaviour idleState = new Idle(); private final DOMDataTreeCommitCohort cohort; + private final YangInstanceIdentifier registeredPath; private CohortBehaviour currentState = idleState; - private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) { + private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) { this.cohort = Preconditions.checkNotNull(cohort); + this.registeredPath = Preconditions.checkNotNull(registeredPath); } @Override @@ -146,7 +149,8 @@ final class DataTreeCohortActor extends AbstractUntypedActor { } else if (message instanceof Abort) { return abort(); } - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s", + message.getClass(), getClass().getSimpleName())); } abstract CohortBehaviour abort(); @@ -268,7 +272,7 @@ final class DataTreeCohortActor extends AbstractUntypedActor { } - static Props props(final DOMDataTreeCommitCohort cohort) { - return Props.create(DataTreeCohortActor.class, cohort); + static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) { + return Props.create(DataTreeCohortActor.class, cohort, registeredPath); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java index fb3743d426..e232e4fa84 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java @@ -15,6 +15,7 @@ import akka.util.Timeout; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,6 +49,9 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { private final Map> cohortToNode = new HashMap<>(); + Collection getCohortActors() { + return Collections.unmodifiableCollection(cohortToNode.keySet()); + } void registerCohort(final ActorRef sender, final RegisterCohort cohort) { takeLock(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java index c269312c8d..3c5aeaa49f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java @@ -42,8 +42,8 @@ public class DataTreeCohortRegistrationProxy super(cohort); this.subtree = Preconditions.checkNotNull(subtree); this.actorContext = Preconditions.checkNotNull(actorContext); - this.actor = actorContext.getActorSystem().actorOf( - DataTreeCohortActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath())); + this.actor = actorContext.getActorSystem().actorOf(DataTreeCohortActor.props(getInstance(), + subtree.getRootIdentifier()).withDispatcher(actorContext.getNotificationDispatcherPath())); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java index 8f18cb74dc..944107039a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java @@ -7,12 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Optional; import java.util.EventListener; -import java.util.Map.Entry; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; abstract class DelayedListenerRegistration implements ListenerRegistration { private final R registrationMessage; @@ -34,10 +31,9 @@ abstract class DelayedListenerRegistration implement } synchronized > void createDelegate( - final LeaderLocalDelegateFactory> factory) { + final LeaderLocalDelegateFactory factory) { if (!closed) { - final Entry> res = factory.createDelegate(registrationMessage); - this.delegate = res.getKey(); + this.delegate = factory.createDelegate(registrationMessage); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java index 3b9b7adc6b..cd1b548b8e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java @@ -7,15 +7,12 @@ */ package org.opendaylight.controller.cluster.datastore; -import java.util.Map.Entry; - /** * Base class for factories instantiating delegates. * - * delegate type - * message type - * initial state type + * @param message type + * @param delegate type */ -abstract class DelegateFactory { - abstract Entry createDelegate(M message); +abstract class DelegateFactory { + abstract D createDelegate(M message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index a925e93566..d21a4d4ccd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -174,8 +174,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName); final DataTreeChangeListenerProxy listenerRegistrationProxy = - new DataTreeChangeListenerProxy(actorContext, listener); - listenerRegistrationProxy.init(shardName, treeId); + new DataTreeChangeListenerProxy<>(actorContext, listener, treeId); + listenerRegistrationProxy.init(shardName); return listenerRegistrationProxy; } @@ -192,7 +192,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId); LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName); - DataTreeCohortRegistrationProxy cohortProxy = new DataTreeCohortRegistrationProxy(actorContext, subtree, cohort); + DataTreeCohortRegistrationProxy cohortProxy = new DataTreeCohortRegistrationProxy<>(actorContext, subtree, cohort); cohortProxy.init(shardName); return cohortProxy; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java index 8ce28571ee..7d6b12baba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java @@ -17,11 +17,10 @@ import com.google.common.base.Preconditions; * Base class for factories instantiating delegates which are local to the * shard leader. * - * delegate type - * message type - * initial state type + * @param delegate type + * @param message type */ -abstract class LeaderLocalDelegateFactory extends DelegateFactory { +abstract class LeaderLocalDelegateFactory extends DelegateFactory { private final Shard shard; protected LeaderLocalDelegateFactory(final Shard shard) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index a1e506f6ec..c6f05ca70e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -44,6 +44,7 @@ import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; +import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -59,6 +60,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -647,6 +649,13 @@ public class Shard extends RaftActor { store.setRunOnPendingTransactionsComplete(operation); } + @Override + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) + .dataChangeListenerActors(changeSupport.getListenerActors()) + .commitCohortActors(store.getCohortActors()); + } + @Override public String persistenceId() { return this.name; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 83cbace1c8..4e9b05ff21 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -652,6 +652,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId); } + Collection getCohortActors() { + return cohortRegistry.getCohortActors(); + } + void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { cohortRegistry.process(sender, message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java new file mode 100644 index 0000000000..ba6f229de7 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import java.util.Collection; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; + +/** + * Extends OnDemandRaftState to add Shard state. + * + * @author Thomas Pantelis + */ +public class OnDemandShardState extends OnDemandRaftState { + private Collection treeChangeListenerActors; + private Collection dataChangeListenerActors; + private Collection commitCohortActors; + + public Collection getTreeChangeListenerActors() { + return treeChangeListenerActors; + } + + public Collection getDataChangeListenerActors() { + return dataChangeListenerActors; + } + + public Collection getCommitCohortActors() { + return commitCohortActors; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder extends AbstractBuilder { + private final OnDemandShardState state = new OnDemandShardState(); + + @Override + protected OnDemandRaftState state() { + return state; + } + + public Builder treeChangeListenerActors(Collection actors) { + state.treeChangeListenerActors = actors; + return self(); + } + + public Builder dataChangeListenerActors(Collection actors) { + state.dataChangeListenerActors = actors; + return self(); + } + + public Builder commitCohortActors(Collection actors) { + state.commitCohortActors = actors; + return self(); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java index 13b78a01a7..588c08704b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java @@ -22,6 +22,7 @@ import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.o import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode; import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNodeEntry; import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.testNodeWithOuter; + import akka.actor.ActorRef; import akka.testkit.TestActorRef; import org.junit.After; @@ -154,7 +155,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { private MockDataChangeListener registerChangeListener(final YangInstanceIdentifier path, final DataChangeScope scope, final int expectedEvents, final boolean isLeader) { MockDataChangeListener listener = new MockDataChangeListener(expectedEvents); - ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener)); + ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path)); support.onMessage(new RegisterChangeListener(path, dclActor, scope, false), isLeader, true); return listener; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java index 91cac88b84..580a0ab97b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.datastore; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH; + import akka.actor.ActorRef; import akka.actor.DeadLetter; import akka.actor.Props; @@ -31,7 +33,7 @@ public class DataChangeListenerTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class); final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class); - final Props props = DataChangeListener.props(mockListener); + final Props props = DataChangeListener.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsEnabled"); // Let the DataChangeListener know that notifications should be enabled @@ -52,7 +54,7 @@ public class DataChangeListenerTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class); final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class); - final Props props = DataChangeListener.props(mockListener); + final Props props = DataChangeListener.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsDisabled"); @@ -77,7 +79,7 @@ public class DataChangeListenerTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class); final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class); - final Props props = DataChangeListener.props(mockListener); + final Props props = DataChangeListener.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender"); getSystem().eventStream().subscribe(getRef(), DeadLetter.class); @@ -112,7 +114,7 @@ public class DataChangeListenerTest extends AbstractActorTest { AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class); Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataChanged(mockChangeEvent2); - Props props = DataChangeListener.props(mockListener); + Props props = DataChangeListener.props(mockListener, TEST_PATH); ActorRef subject = getSystem().actorOf(props, "testDataChangedWithListenerRuntimeEx"); // Let the DataChangeListener know that notifications should be enabled diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java index 73d520dfc3..1a586203a8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH; + import akka.actor.ActorRef; import akka.actor.DeadLetter; import akka.actor.Props; @@ -30,7 +32,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest { final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class); final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); - final Props props = DataTreeChangeListenerActor.props(mockListener); + final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsEnabled"); // Let the DataChangeListener know that notifications should be enabled @@ -51,7 +53,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest { final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class); final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); - final Props props = DataTreeChangeListenerActor.props(mockListener); + final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsDisabled"); @@ -76,7 +78,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest { final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class); final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); - final Props props = DataTreeChangeListenerActor.props(mockListener); + final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithNoSender"); getSystem().eventStream().subscribe(getRef(), DeadLetter.class); @@ -113,7 +115,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest { final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataTreeChanged(mockCandidates2); - Props props = DataTreeChangeListenerActor.props(mockListener); + Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithListenerRuntimeEx"); // Let the DataChangeListener know that notifications should be enabled diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java index 5dcc39d68d..395a0973fe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java @@ -12,6 +12,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; + import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; @@ -25,7 +26,6 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -55,14 +55,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); final DataTreeChangeListenerProxy proxy = - new DataTreeChangeListenerProxy<>(actorContext, mockListener); + new DataTreeChangeListenerProxy<>(actorContext, mockListener, path); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); new Thread() { @Override public void run() { - proxy.init("shard-1", path); + proxy.init("shard-1"); } }.start(); @@ -80,7 +80,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { reply(new RegisterDataTreeChangeListenerReply(getRef())); - for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) { + for(int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) { Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } @@ -111,14 +111,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(ClusteredDOMDataTreeChangeListener.class); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); final DataTreeChangeListenerProxy proxy = - new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener); + new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); new Thread() { @Override public void run() { - proxy.init("shard-1", path); + proxy.init("shard-1"); } }.start(); @@ -141,14 +141,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); - final DataTreeChangeListenerProxy proxy = - new DataTreeChangeListenerProxy<>(actorContext, mockListener); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( + actorContext, mockListener, path); + new Thread() { @Override public void run() { - proxy.init("shard-1", path); + proxy.init("shard-1"); } }.start(); @@ -169,14 +169,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); - final DataTreeChangeListenerProxy proxy = - new DataTreeChangeListenerProxy<>(actorContext, mockListener); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( + actorContext, mockListener, path); + new Thread() { @Override public void run() { - proxy.init("shard-1", path); + proxy.init("shard-1"); } }.start(); @@ -216,7 +216,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { String shardName = "shard-1"; final DataTreeChangeListenerProxy proxy = - new DataTreeChangeListenerProxy<>(actorContext, mockListener); + new DataTreeChangeListenerProxy<>(actorContext, mockListener, path); doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); @@ -225,7 +225,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { any(Object.class), any(Timeout.class)); doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext(); - proxy.init("shard-1", path); + proxy.init("shard-1"); Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); @@ -248,22 +248,18 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); - final DataTreeChangeListenerProxy proxy = - new DataTreeChangeListenerProxy<>(actorContext, mockListener); + final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( + actorContext, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME)); - - Answer> answer = new Answer>() { - @Override - public Future answer(InvocationOnMock invocation) { - proxy.close(); - return Futures.successful((Object)new RegisterDataTreeChangeListenerReply(getRef())); - } + Answer> answer = invocation -> { + proxy.close(); + return Futures.successful((Object)new RegisterDataTreeChangeListenerReply(getRef())); }; doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class)); - proxy.init(shardName, YangInstanceIdentifier.of(TestModel.TEST_QNAME)); + proxy.init(shardName); expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java index a11fc6bb1c..7cf125bd5a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java @@ -116,7 +116,7 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest { private MockDataTreeChangeListener registerChangeListener(final YangInstanceIdentifier path, final int expectedEvents, final boolean isLeader) { MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents); - ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener)); + ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, TEST_PATH)); support.onMessage(new RegisterDataTreeChangeListener(path, dclActor, false), isLeader, true); return listener; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index e4ca64ffe8..5c4e35a3a9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -138,8 +138,8 @@ public class ShardTest extends AbstractShardTest { shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender()); final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), - "testRegisterChangeListener-DataChangeListener"); + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, + TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener"); shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); @@ -215,16 +215,15 @@ public class ShardTest extends AbstractShardTest { setupInMemorySnapshotStore(); + final YangInstanceIdentifier path = TestModel.TEST_PATH; final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener"); final TestActorRef shard = actorFactory.createTestActor( Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterChangeListenerWhenNotLeaderInitially"); - final YangInstanceIdentifier path = TestModel.TEST_PATH; - // Wait until the shard receives the first ElectionTimeout message. assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); @@ -264,8 +263,8 @@ public class ShardTest extends AbstractShardTest { shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender()); final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), - "testRegisterDataTreeChangeListener-DataTreeChangeListener"); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, + TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener"); shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef()); @@ -319,16 +318,15 @@ public class ShardTest extends AbstractShardTest { setupInMemorySnapshotStore(); + final YangInstanceIdentifier path = TestModel.TEST_PATH; final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path), "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener"); final TestActorRef shard = actorFactory.createTestActor( Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration"); - final YangInstanceIdentifier path = TestModel.TEST_PATH; - assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); @@ -2064,8 +2062,9 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.shardElectionTimeoutFactor(1000). customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); + final YangInstanceIdentifier path = TestModel.TEST_PATH; final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), actorFactory.generateActorId(testName + "-DataChangeListener")); setupInMemorySnapshotStore(); @@ -2076,8 +2075,6 @@ public class ShardTest extends AbstractShardTest { waitUntilNoLeader(shard); - final YangInstanceIdentifier path = TestModel.TEST_PATH; - shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), RegisterChangeListenerReply.class); @@ -2119,7 +2116,7 @@ public class ShardTest extends AbstractShardTest { final YangInstanceIdentifier path = TestModel.TEST_PATH; final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path), actorFactory.generateActorId(testName + "-DataChangeListener")); followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); @@ -2141,8 +2138,8 @@ public class ShardTest extends AbstractShardTest { customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), - actorFactory.generateActorId(testName + "-DataTreeChangeListener")); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, + TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener")); setupInMemorySnapshotStore(); @@ -2193,7 +2190,7 @@ public class ShardTest extends AbstractShardTest { final YangInstanceIdentifier path = TestModel.TEST_PATH; final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path), actorFactory.generateActorId(testName + "-DataTreeChangeListener")); followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());