Add OnDemandShardState to report additional Shard state 83/51583/6
authorTom Pantelis <tpanteli@brocade.com>
Tue, 7 Feb 2017 21:48:03 +0000 (16:48 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 15 Feb 2017 03:55:30 +0000 (03:55 +0000)
Extended the OnDemandRaftState with OnDemandShardState to include
additional Shard state, including DTCL, DCL, and commit cohort actors.
This will enable us to report thus info from the JMX bean as it's useful
for debugging to have visibility into what listeners and cohorts
are registered.

The actors now also store the registered path. Both the instance and path
will be queried for debugging.

Change-Id: Iaa6c27c9aba3b5c0223199e6a3fc21bc54da95ba
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
25 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 08907e3fd25449d0208d9efcba84acba02624bfb..1c057d755369d50a2147cf7c3e2922783ae56fc8 100644 (file)
@@ -399,7 +399,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-        OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+        OnDemandRaftState.AbstractBuilder<?> builder = newOnDemandRaftStateBuilder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog().dataSize())
@@ -443,6 +443,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     }
 
+    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+        return OnDemandRaftState.builder();
+    }
+
     private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
 
index cf9bb620dd6a4aa98e73a5ba7198ff0e1bdd0b7c..9bbee881ea4afa3b1b99a772f96c228d9b8246c2 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nonnull;
 
 /**
  * The response to a GetOnDemandRaftState message.
@@ -41,7 +42,7 @@ public class OnDemandRaftState {
     private Map<String, String> peerAddresses = Collections.emptyMap();
     private Map<String, Boolean> peerVotingStates = Collections.emptyMap();
 
-    private OnDemandRaftState() {
+    protected OnDemandRaftState() {
     }
 
     public static Builder builder() {
@@ -132,116 +133,131 @@ public class OnDemandRaftState {
         return customRaftPolicyClassName;
     }
 
-    public static class Builder {
-        private final OnDemandRaftState stats = new OnDemandRaftState();
+    public abstract static class AbstractBuilder<T extends AbstractBuilder<T>> {
+        @SuppressWarnings("unchecked")
+        protected T self() {
+            return (T) this;
+        }
+
+        @Nonnull
+        protected abstract OnDemandRaftState state();
 
-        public Builder lastLogIndex(long value) {
-            stats.lastLogIndex = value;
-            return this;
+        public T lastLogIndex(long value) {
+            state().lastLogIndex = value;
+            return self();
         }
 
-        public Builder lastLogTerm(long value) {
-            stats.lastLogTerm = value;
-            return this;
+        public T lastLogTerm(long value) {
+            state().lastLogTerm = value;
+            return self();
         }
 
-        public Builder currentTerm(long value) {
-            stats.currentTerm = value;
-            return this;
+        public T currentTerm(long value) {
+            state().currentTerm = value;
+            return self();
         }
 
-        public Builder commitIndex(long value) {
-            stats.commitIndex = value;
-            return this;
+        public T commitIndex(long value) {
+            state().commitIndex = value;
+            return self();
         }
 
-        public Builder lastApplied(long value) {
-            stats.lastApplied = value;
-            return this;
+        public T lastApplied(long value) {
+            state().lastApplied = value;
+            return self();
         }
 
-        public Builder lastIndex(long value) {
-            stats.lastIndex = value;
-            return this;
+        public T lastIndex(long value) {
+            state().lastIndex = value;
+            return self();
         }
 
-        public Builder lastTerm(long value) {
-            stats.lastTerm = value;
-            return this;
+        public T lastTerm(long value) {
+            state().lastTerm = value;
+            return self();
         }
 
-        public Builder snapshotIndex(long value) {
-            stats.snapshotIndex = value;
-            return this;
+        public T snapshotIndex(long value) {
+            state().snapshotIndex = value;
+            return self();
         }
 
-        public Builder snapshotTerm(long value) {
-            stats.snapshotTerm = value;
-            return this;
+        public T snapshotTerm(long value) {
+            state().snapshotTerm = value;
+            return self();
         }
 
-        public Builder replicatedToAllIndex(long value) {
-            stats.replicatedToAllIndex = value;
-            return this;
+        public T replicatedToAllIndex(long value) {
+            state().replicatedToAllIndex = value;
+            return self();
         }
 
-        public Builder inMemoryJournalDataSize(long value) {
-            stats.inMemoryJournalDataSize = value;
-            return this;
+        public T inMemoryJournalDataSize(long value) {
+            state().inMemoryJournalDataSize = value;
+            return self();
         }
 
-        public Builder inMemoryJournalLogSize(long value) {
-            stats.inMemoryJournalLogSize = value;
-            return this;
+        public T inMemoryJournalLogSize(long value) {
+            state().inMemoryJournalLogSize = value;
+            return self();
         }
 
-        public Builder leader(String value) {
-            stats.leader = value;
-            return this;
+        public T leader(String value) {
+            state().leader = value;
+            return self();
         }
 
-        public Builder raftState(String value) {
-            stats.raftState = value;
-            return this;
+        public T raftState(String value) {
+            state().raftState = value;
+            return self();
         }
 
-        public Builder votedFor(String value) {
-            stats.votedFor = value;
-            return this;
+        public T votedFor(String value) {
+            state().votedFor = value;
+            return self();
         }
 
-        public Builder isVoting(boolean isVoting) {
-            stats.isVoting = isVoting;
-            return this;
+        public T isVoting(boolean isVoting) {
+            state().isVoting = isVoting;
+            return self();
         }
 
-        public Builder followerInfoList(List<FollowerInfo> followerInfoList) {
-            stats.followerInfoList = followerInfoList;
-            return this;
+        public T followerInfoList(List<FollowerInfo> followerInfoList) {
+            state().followerInfoList = followerInfoList;
+            return self();
         }
 
-        public Builder peerAddresses(Map<String, String> peerAddresses) {
-            stats.peerAddresses = peerAddresses;
-            return this;
+        public T peerAddresses(Map<String, String> peerAddresses) {
+            state().peerAddresses = peerAddresses;
+            return self();
         }
 
-        public Builder peerVotingStates(Map<String, Boolean> peerVotingStates) {
-            stats.peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
-            return this;
+        public T peerVotingStates(Map<String, Boolean> peerVotingStates) {
+            state().peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
+            return self();
         }
 
-        public Builder isSnapshotCaptureInitiated(boolean value) {
-            stats.isSnapshotCaptureInitiated = value;
-            return this;
+        public T isSnapshotCaptureInitiated(boolean value) {
+            state().isSnapshotCaptureInitiated = value;
+            return self();
         }
 
-        public Builder customRaftPolicyClassName(String className) {
-            stats.customRaftPolicyClassName = className;
-            return this;
+        public T customRaftPolicyClassName(String className) {
+            state().customRaftPolicyClassName = className;
+            return self();
         }
 
         public OnDemandRaftState build() {
-            return stats;
+            return state();
+        }
+    }
+
+    public static class Builder extends AbstractBuilder<Builder> {
+        private final OnDemandRaftState state = new OnDemandRaftState();
+
+        @Override
+        protected OnDemandRaftState state() {
+            return state;
         }
     }
 }
index f23f9567a7ddace8fd25ecdf8e651e94efcd1ed1..0821951a1a67b7cc30d7c98dae86994119af5cf6 100644 (file)
@@ -9,21 +9,18 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EventListener;
-import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class AbstractDataListenerSupport<L extends EventListener, M extends ListenerRegistrationMessage,
         D extends DelayedListenerRegistration<L, M>, R extends ListenerRegistration<L>>
-                extends LeaderLocalDelegateFactory<M, R, Optional<DataTreeCandidate>> {
+                extends LeaderLocalDelegateFactory<M, R> {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
@@ -68,8 +65,7 @@ abstract class AbstractDataListenerSupport<L extends EventListener, M extends Li
 
         final ListenerRegistration<L> registration;
         if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
-            final Entry<R, Optional<DataTreeCandidate>> res = createDelegate(message);
-            registration = res.getKey();
+            registration = createDelegate(message);
         } else {
             log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
index 36c822cd0b10d6586afde0bcdd5ebc434bbd328c..db32b36d6e71275c121f8b76c607fee8f1c038ff 100644 (file)
@@ -179,8 +179,8 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
 
         final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
-                new DataTreeChangeListenerProxy<>(actorContext, listener);
-        listenerRegistrationProxy.init(shardName, treeId);
+                new DataTreeChangeListenerProxy<>(actorContext, listener, treeId);
+        listenerRegistrationProxy.init(shardName);
 
         return listenerRegistrationProxy;
     }
index 872feeac9da203b3f3bf84e76ef1be61382697cc..dc19aa400f66d47fd59134b062f8b04c9bfb6f78 100644 (file)
@@ -9,9 +9,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
-import akka.japi.Creator;
 import com.google.common.base.Preconditions;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
@@ -30,10 +28,13 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 @Deprecated
 public class DataChangeListener extends AbstractUntypedActor {
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+    private final YangInstanceIdentifier registeredPath;
     private boolean notificationsEnabled = false;
 
-    public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
+    public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+            final YangInstanceIdentifier registeredPath) {
         this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
     }
 
     @Override
@@ -78,26 +79,8 @@ public class DataChangeListener extends AbstractUntypedActor {
         }
     }
 
-    public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier,
-                                                            NormalizedNode<?, ?>> listener) {
-        return Props.create(new DataChangeListenerCreator(listener));
-    }
-
-    private static class DataChangeListenerCreator implements Creator<DataChangeListener> {
-        private static final long serialVersionUID = 1L;
-
-        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
-                + "create remote instances of this actor and thus don't need it to be Serializable.")
-        final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-
-        DataChangeListenerCreator(
-                AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
-            this.listener = listener;
-        }
-
-        @Override
-        public DataChangeListener create() throws Exception {
-            return new DataChangeListener(listener);
-        }
+    public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+            final YangInstanceIdentifier registeredPath) {
+        return Props.create(DataChangeListener.class, listener, registeredPath);
     }
 }
index 9eba41aac73efb900bd33b3fe52e43c33e655eff..f0f4b7b9b6aba47481c6c0ea8bc890f8ed478f28 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -41,18 +42,18 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
 
     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
 
-    private volatile ActorSelection listenerRegistrationActor;
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-    private ActorRef dataChangeListenerActor;
     private final String shardName;
     private final ActorContext actorContext;
+    private ActorRef dataChangeListenerActor;
+    private volatile ActorSelection listenerRegistrationActor;
     private boolean closed = false;
 
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
             DataChangeListenerRegistrationProxy(String shardName, ActorContext actorContext, L listener) {
-        this.shardName = shardName;
-        this.actorContext = actorContext;
-        this.listener = listener;
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.listener = Preconditions.checkNotNull(listener);
     }
 
     @VisibleForTesting
@@ -92,7 +93,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
     public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
 
         dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-                DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
+                DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
 
         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
         findFuture.onComplete(new OnComplete<ActorRef>() {
index 26d8fa1b646af75e2775eea23a72d64ae75e28c8..9b2beccdad99740cbcba027e4a893d8cf9555133 100644 (file)
@@ -10,10 +10,15 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -26,14 +31,20 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport<
             DelayedDataChangeListenerRegistration, DataChangeListenerRegistration<
                     AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
 
+    private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+
     DataChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
+    Collection<ActorSelection> getListenerActors() {
+        return Collections.unmodifiableCollection(listenerActors);
+    }
+
     @Override
-    Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-            Optional<DataTreeCandidate>> createDelegate(final RegisterChangeListener message) {
-        ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
+    DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+            createDelegate(final RegisterChangeListener message) {
+        final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
         // If this shard is the leader then it will enable notifications else
@@ -57,7 +68,32 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport<
 
         getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue());
 
-        return regEntry;
+        listenerActors.add(dataChangeListenerPath);
+        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+            delegate = regEntry.getKey();
+        return new DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                NormalizedNode<?,?>>>() {
+            @Override
+            public void close() {
+                listenerActors.remove(dataChangeListenerPath);
+                delegate.close();
+            }
+
+            @Override
+            public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+                return delegate.getInstance();
+            }
+
+            @Override
+            public YangInstanceIdentifier getPath() {
+                return delegate.getPath();
+            }
+
+            @Override
+            public DataChangeScope getScope() {
+                return delegate.getScope();
+            }
+        };
     }
 
     @Override
index bccb48477b7f224120b37fa1244680e2873cdafb..2936a28b90a1f77cc640602c977751597fc8008b 100644 (file)
@@ -8,14 +8,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
-import akka.japi.Creator;
 import com.google.common.base.Preconditions;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 /**
  * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating
@@ -23,10 +22,13 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
  */
 final class DataTreeChangeListenerActor extends AbstractUntypedActor {
     private final DOMDataTreeChangeListener listener;
+    private final YangInstanceIdentifier registeredPath;
     private boolean notificationsEnabled = false;
 
-    private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) {
+    private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener,
+            final YangInstanceIdentifier registeredPath) {
         this.listener = Preconditions.checkNotNull(listener);
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
     }
 
     @Override
@@ -70,24 +72,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor {
                 listener);
     }
 
-    public static Props props(final DOMDataTreeChangeListener listener) {
-        return Props.create(new DataTreeChangeListenerCreator(listener));
-    }
-
-    private static final class DataTreeChangeListenerCreator implements Creator<DataTreeChangeListenerActor> {
-        private static final long serialVersionUID = 1L;
-
-        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
-                + "create remote instances of this actor and thus don't need it to be Serializable.")
-        private final DOMDataTreeChangeListener listener;
-
-        DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) {
-            this.listener = Preconditions.checkNotNull(listener);
-        }
-
-        @Override
-        public DataTreeChangeListenerActor create() {
-            return new DataTreeChangeListenerActor(listener);
-        }
+    public static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) {
+        return Props.create(DataTreeChangeListenerActor.class, listener, registeredPath);
     }
 }
index 8a9b466b6ef6deed0b6db3f01ac34dc933ba58f8..f60e676013ae0d2ed3c9b7ed0aab265da112b4d4 100644 (file)
@@ -37,15 +37,18 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
     private final ActorRef dataChangeListenerActor;
     private final ActorContext actorContext;
+    private final YangInstanceIdentifier registeredPath;
 
     @GuardedBy("this")
     private ActorSelection listenerRegistrationActor;
 
-    DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
+    DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener,
+            final YangInstanceIdentifier registeredPath) {
         super(listener);
         this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
         this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-                DataTreeChangeListenerActor.props(getInstance())
+                DataTreeChangeListenerActor.props(getInstance(), registeredPath)
                     .withDispatcher(actorContext.getNotificationDispatcherPath()));
     }
 
@@ -59,19 +62,19 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
         dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    void init(final String shardName, final YangInstanceIdentifier treeId) {
+    void init(final String shardName) {
         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
         findFuture.onComplete(new OnComplete<ActorRef>() {
             @Override
             public void onComplete(final Throwable failure, final ActorRef shard) {
                 if (failure instanceof LocalShardNotFoundException) {
                     LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
-                            + "cannot be registered", shardName, getInstance(), treeId);
+                            + "cannot be registered", shardName, getInstance(), registeredPath);
                 } else if (failure != null) {
                     LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
-                            + "cannot be registered: {}", shardName, getInstance(), treeId, failure);
+                            + "cannot be registered: {}", shardName, getInstance(), registeredPath, failure);
                 } else {
-                    doRegistration(shard, treeId);
+                    doRegistration(shard);
                 }
             }
         }, actorContext.getClientDispatcher());
@@ -94,10 +97,10 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
         actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
     }
 
-    private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
+    private void doRegistration(final ActorRef shard) {
 
         Future<Object> future = actorContext.executeOperationAsync(shard,
-                new RegisterDataTreeChangeListener(path, dataChangeListenerActor,
+                new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor,
                         getInstance() instanceof ClusteredDOMDataTreeChangeListener),
                 actorContext.getDatastoreContext().getShardInitializationTimeout());
 
@@ -106,7 +109,7 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
             public void onComplete(final Throwable failure, final Object result) {
                 if (failure != null) {
                     LOG.error("Failed to register DataTreeChangeListener {} at path {}",
-                            getInstance(), path.toString(), failure);
+                            getInstance(), registeredPath, failure);
                 } else {
                     RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
                     setListenerRegistrationActor(actorContext.actorSelection(
index f039ab707ba76fa4749c7895469d2c5af40b12ba..5b8e5f8abfb871f4bd2c503ce7de1853b4c5aac5 100644 (file)
@@ -10,7 +10,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
@@ -21,14 +25,21 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
         RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration,
         ListenerRegistration<DOMDataTreeChangeListener>> {
+
+    private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+
     DataTreeChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
+    Collection<ActorSelection> getListenerActors() {
+        return Collections.unmodifiableCollection(listenerActors);
+    }
+
     @Override
-    Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> createDelegate(
+    ListenerRegistration<DOMDataTreeChangeListener> createDelegate(
             final RegisterDataTreeChangeListener message) {
-        ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
+        final ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
         // If this shard is the leader then it will enable notifications else
@@ -51,7 +62,20 @@ final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DO
         getShard().getDataStore().notifyOfInitialData(message.getPath(),
                 regEntry.getKey().getInstance(), regEntry.getValue());
 
-        return regEntry;
+        listenerActors.add(dataChangeListenerPath);
+        final ListenerRegistration<DOMDataTreeChangeListener> delegate = regEntry.getKey();
+        return new ListenerRegistration<DOMDataTreeChangeListener>() {
+            @Override
+            public DOMDataTreeChangeListener getInstance() {
+                return delegate.getInstance();
+            }
+
+            @Override
+            public void close() {
+                listenerActors.remove(dataChangeListenerPath);
+                delegate.close();
+            }
+        };
     }
 
     @Override
index 0be8f0986557f75a763b557c3c62f84fb63b682d..e6ff10d8315cd47da347ae4764d20c7b825b51eb 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.mdsal.common.api.PostPreCommitStep;
 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -28,10 +29,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 final class DataTreeCohortActor extends AbstractUntypedActor {
     private final CohortBehaviour<?> idleState = new Idle();
     private final DOMDataTreeCommitCohort cohort;
+    private final YangInstanceIdentifier registeredPath;
     private CohortBehaviour<?> currentState = idleState;
 
-    private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
+    private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
         this.cohort = Preconditions.checkNotNull(cohort);
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
     }
 
     @Override
@@ -143,7 +146,8 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
             } else if (message instanceof Abort) {
                 return abort();
             }
-            throw new UnsupportedOperationException();
+            throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
+                    message.getClass(), getClass().getSimpleName()));
         }
 
         abstract CohortBehaviour<?> abort();
@@ -269,7 +273,7 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
 
     }
 
-    static Props props(final DOMDataTreeCommitCohort cohort) {
-        return Props.create(DataTreeCohortActor.class, cohort);
+    static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
+        return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
     }
 }
index 4364e22d2ba85120e2c162d23f1547578caff589..f79c0475bb73afc025545704ac590ab058961723 100644 (file)
@@ -15,6 +15,7 @@ import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +49,10 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
 
     private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
 
+    Collection<ActorRef> getCohortActors() {
+        return Collections.unmodifiableCollection(cohortToNode.keySet());
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
         takeLock();
index 18146b7f1f962c8d183041c8eaee8c481e6327bf..cfe12a1bc65e117832887579b50e32ad2f3ccdef 100644 (file)
@@ -42,8 +42,8 @@ public class DataTreeCohortRegistrationProxy<C extends DOMDataTreeCommitCohort>
         super(cohort);
         this.subtree = Preconditions.checkNotNull(subtree);
         this.actorContext = Preconditions.checkNotNull(actorContext);
-        this.actor = actorContext.getActorSystem().actorOf(
-                DataTreeCohortActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+        this.actor = actorContext.getActorSystem().actorOf(DataTreeCohortActor.props(getInstance(),
+                subtree.getRootIdentifier()).withDispatcher(actorContext.getNotificationDispatcherPath()));
     }
 
 
index 837242dec468dcfcf2b01af816acdfcbce5c8f5b..ac132721c5dcc9954a152f0fcc5d1a94787e0fe5 100644 (file)
@@ -7,12 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Optional;
 import java.util.EventListener;
-import java.util.Map.Entry;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 abstract class DelayedListenerRegistration<L extends EventListener, M> implements ListenerRegistration<L> {
     private final M registrationMessage;
@@ -34,10 +31,9 @@ abstract class DelayedListenerRegistration<L extends EventListener, M> implement
     }
 
     synchronized <R extends ListenerRegistration<L>> void createDelegate(
-            final LeaderLocalDelegateFactory<M, R, Optional<DataTreeCandidate>> factory) {
+            final LeaderLocalDelegateFactory<M, R> factory) {
         if (!closed) {
-            final Entry<R, Optional<DataTreeCandidate>> res = factory.createDelegate(registrationMessage);
-            this.delegate = res.getKey();
+            this.delegate = factory.createDelegate(registrationMessage);
         }
     }
 
index 1542fd683252ea3c726a35054249ea4333c4f588..cd1b548b8e4823735ec27baf9846cdee9b38e285 100644 (file)
@@ -7,15 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.Map.Entry;
-
 /**
  * Base class for factories instantiating delegates.
  *
  * @param <M> message type
  * @param <D> delegate type
- * @param <I> initial state type
  */
-abstract class DelegateFactory<M, D, I> {
-    abstract Entry<D, I> createDelegate(M message);
+abstract class DelegateFactory<M, D> {
+    abstract D createDelegate(M message);
 }
index fbd974a1da8bb8777721944914b153230bbe954b..0f766295409c73e0305f6d45dd3fbc2df605afce 100644 (file)
@@ -19,9 +19,8 @@ import com.google.common.base.Preconditions;
  *
  * @param <D> delegate type
  * @param <M> message type
- * @param <I> initial state type
  */
-abstract class LeaderLocalDelegateFactory<M, D, I> extends DelegateFactory<M, D, I> {
+abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
     private final Shard shard;
 
     protected LeaderLocalDelegateFactory(final Shard shard) {
index 84c399deb26be96fe969f76bf66692f7ee4b8438..d5d129f22172cb392657de55360812014dbb7cae 100644 (file)
@@ -65,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -80,6 +81,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -783,6 +785,13 @@ public class Shard extends RaftActor {
         store.setRunOnPendingTransactionsComplete(operation);
     }
 
+    @Override
+    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+        return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
+                .dataChangeListenerActors(changeSupport.getListenerActors())
+                .commitCohortActors(store.getCohortActors());
+    }
+
     @Override
     public String persistenceId() {
         return this.name;
index 282191216dc01ec002087d08b9fcb542510d226d..d398afefa7869cb70cc2eec74cf9153a918b162f 100644 (file)
@@ -772,6 +772,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingCommit();
     }
 
+    Collection<ActorRef> getCohortActors() {
+        return cohortRegistry.getCohortActors();
+    }
+
     void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
         cohortRegistry.process(sender, message);
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java
new file mode 100644 (file)
index 0000000..ba6f229
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+
+/**
+ * Extends OnDemandRaftState to add Shard state.
+ *
+ * @author Thomas Pantelis
+ */
+public class OnDemandShardState extends OnDemandRaftState {
+    private Collection<ActorSelection> treeChangeListenerActors;
+    private Collection<ActorSelection> dataChangeListenerActors;
+    private Collection<ActorRef> commitCohortActors;
+
+    public Collection<ActorSelection> getTreeChangeListenerActors() {
+        return treeChangeListenerActors;
+    }
+
+    public Collection<ActorSelection> getDataChangeListenerActors() {
+        return dataChangeListenerActors;
+    }
+
+    public Collection<ActorRef> getCommitCohortActors() {
+        return commitCohortActors;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder extends AbstractBuilder<Builder> {
+        private final OnDemandShardState state = new OnDemandShardState();
+
+        @Override
+        protected OnDemandRaftState state() {
+            return state;
+        }
+
+        public Builder treeChangeListenerActors(Collection<ActorSelection> actors) {
+            state.treeChangeListenerActors = actors;
+            return self();
+        }
+
+        public Builder dataChangeListenerActors(Collection<ActorSelection> actors) {
+            state.dataChangeListenerActors = actors;
+            return self();
+        }
+
+        public Builder commitCohortActors(Collection<ActorRef> actors) {
+            state.commitCohortActors = actors;
+            return self();
+        }
+    }
+}
index fc70e3308c4f0a965c0e8fc501a735adb4e281a1..9721766d9f7709d5d398c8628b07b72735a210d5 100644 (file)
@@ -30,6 +30,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
@@ -53,7 +54,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
 
                 final Shard shard = actor.underlyingActor();
                 final MockDataChangeListener listener = new MockDataChangeListener(0);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, TEST_PATH),
                         "testChangeListenerWithNoInitialData-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
                 support.onMessage(new RegisterChangeListener(TEST_PATH, dclActor, DataChangeScope.ONE, false),
@@ -79,7 +80,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
                 writeToStore(shard.getDataStore(), TestModel.TEST_PATH,
                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, TEST_PATH),
                         "testInitialChangeListenerEventWithContainerPath-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
                 support.onMessage(new RegisterChangeListener(TEST_PATH, dclActor, DataChangeScope.ONE, false),
@@ -104,7 +105,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
                 mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
 
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, OUTER_LIST_PATH),
                         "testInitialChangeListenerEventWithListPath-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
                 support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH, dclActor, DataChangeScope.ONE, false),
@@ -137,11 +138,11 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
                         ImmutableNodes.containerNode(OUTER_CONTAINER_QNAME));
 
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME);
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         "testInitialChangeListenerEventWithWildcardedListPath-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
-                support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME), dclActor,
-                        DataChangeScope.ONE, false), true, true);
+                support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), true, true);
 
                 listener.waitForChangeEvents();
                 listener.verifyCreatedData(0, outerEntryPath(1));
@@ -169,12 +170,12 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
                                 outerNodeEntry(2, innerNode("three", "four")))));
 
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME)
+                        .node(INNER_LIST_QNAME).node(INNER_LIST_QNAME);
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         "testInitialChangeListenerEventWithNestedWildcardedListsPath-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
-                support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME)
-                        .node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor, DataChangeScope.ONE, false),
-                            true, true);
+                support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), true, true);
 
 
                 listener.waitForChangeEvents();
@@ -185,12 +186,13 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
 
                 // Register for a specific outer list entry
                 final MockDataChangeListener listener2 = new MockDataChangeListener(1);
-                final ActorRef dclActor2 = actorFactory.createActor(DataChangeListener.props(listener2),
+                final YangInstanceIdentifier path2 = OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME)
+                        .node(INNER_LIST_QNAME);
+                final ActorRef dclActor2 = actorFactory.createActor(DataChangeListener.props(listener2, path2),
                         "testInitialChangeListenerEventWithNestedWildcardedListsPath-DataChangeListener2");
                 final DataChangeListenerSupport support2 = new DataChangeListenerSupport(shard);
-                support2.onMessage(new RegisterChangeListener(
-                        OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor2,
-                        DataChangeScope.ONE, false), true, true);
+                support2.onMessage(new RegisterChangeListener(path2, dclActor2, DataChangeScope.ONE, false),
+                        true, true);
 
                 listener2.waitForChangeEvents();
                 listener2.verifyCreatedData(0, innerEntryPath(1, "one"));
@@ -219,12 +221,12 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
                                 outerNodeEntry(2, innerNode("three", "four")))));
 
                 final MockDataChangeListener listener = new MockDataChangeListener(0);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME)
+                        .node(INNER_LIST_QNAME);
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         "testInitialChangeListenerEventWhenNotInitiallyLeader-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
-                support.onMessage(new RegisterChangeListener(
-                        OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor,
-                        DataChangeScope.ONE, false), false, true);
+                support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), false, true);
 
                 listener.expectNoMoreChanges("Unexpected initial change event");
                 listener.reset(1);
index 544a56628c385888e958a23df385a35d07a3fb51..f3dcaa2b709810d169ae1ae1cc0b54c716051fe9 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+
 import akka.actor.ActorRef;
 import akka.actor.DeadLetter;
 import akka.actor.Props;
@@ -30,7 +32,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             {
                 final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
                 final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
-                final Props props = DataChangeListener.props(mockListener);
+                final Props props = DataChangeListener.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
 
                 // Let the DataChangeListener know that notifications should be
@@ -53,7 +55,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             {
                 final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
                 final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
-                final Props props = DataChangeListener.props(mockListener);
+                final Props props = DataChangeListener.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
 
                 subject.tell(new DataChanged(mockChangeEvent), getRef());
@@ -78,7 +80,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             {
                 final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
                 final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
-                final Props props = DataChangeListener.props(mockListener);
+                final Props props = DataChangeListener.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender");
 
                 getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
@@ -115,7 +117,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
                 AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
                 Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataChanged(mockChangeEvent2);
 
-                Props props = DataChangeListener.props(mockListener);
+                Props props = DataChangeListener.props(mockListener, TEST_PATH);
                 ActorRef subject = getSystem().actorOf(props, "testDataChangedWithListenerRuntimeEx");
 
                 // Let the DataChangeListener know that notifications should be
index 6b8658cc4989e39be2f42a935c227389a08be07c..62b95c214b1d50068ffec642821adae9bb1913c9 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+
 import akka.actor.ActorRef;
 import akka.actor.DeadLetter;
 import akka.actor.Props;
@@ -31,7 +33,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
                 final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
                 final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
                 final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
-                final Props props = DataTreeChangeListenerActor.props(mockListener);
+                final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsEnabled");
 
                 // Let the DataChangeListener know that notifications should be
@@ -54,7 +56,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
                 final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
                 final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
                 final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
-                final Props props = DataTreeChangeListenerActor.props(mockListener);
+                final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsDisabled");
 
                 subject.tell(new DataTreeChanged(mockCandidates), getRef());
@@ -79,7 +81,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
                 final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
                 final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
                 final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
-                final Props props = DataTreeChangeListenerActor.props(mockListener);
+                final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithNoSender");
 
                 getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
@@ -119,7 +121,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
                 final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
                 Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataTreeChanged(mockCandidates2);
 
-                Props props = DataTreeChangeListenerActor.props(mockListener);
+                Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
                 ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithListenerRuntimeEx");
 
                 // Let the DataChangeListener know that notifications should be
index 8cf0d83a79dcfa6239e1b783352173264e924ab2..e6b20a720673e9ed3a7b8d5ebce77b3fe705a610 100644 (file)
@@ -56,14 +56,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
                         mock(Configuration.class));
 
+                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                        actorContext, mockListener);
+                        actorContext, mockListener, path);
 
-                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 new Thread() {
                     @Override
                     public void run() {
-                        proxy.init("shard-1", path);
+                        proxy.init("shard-1");
                     }
 
                 }.start();
@@ -115,14 +115,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
                         ClusteredDOMDataTreeChangeListener.class);
 
+                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
-                        new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener);
+                        new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path);
 
-                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 new Thread() {
                     @Override
                     public void run() {
-                        proxy.init("shard-1", path);
+                        proxy.init("shard-1");
                     }
 
                 }.start();
@@ -150,14 +150,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
                         mock(Configuration.class));
 
+                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                        actorContext, mockListener);
+                        actorContext, mockListener, path);
 
-                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 new Thread() {
                     @Override
                     public void run() {
-                        proxy.init("shard-1", path);
+                        proxy.init("shard-1");
                     }
 
                 }.start();
@@ -182,14 +182,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
                         mock(Configuration.class));
 
+                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                        actorContext, mockListener);
+                        actorContext, mockListener, path);
 
-                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 new Thread() {
                     @Override
                     public void run() {
-                        proxy.init("shard-1", path);
+                        proxy.init("shard-1");
                     }
 
                 }.start();
@@ -230,7 +230,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
 
                 String shardName = "shard-1";
                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                        actorContext, mockListener);
+                        actorContext, mockListener, path);
 
                 doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
                 doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
@@ -238,7 +238,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                         .executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class));
                 doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
 
-                proxy.init("shard-1", path);
+                proxy.init("shard-1");
 
                 Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
 
@@ -265,7 +265,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
 
                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                        actorContext, mockListener);
+                        actorContext, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
 
                 Answer<Future<Object>> answer = invocation -> {
                     proxy.close();
@@ -275,7 +275,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), any(Object.class),
                         any(Timeout.class));
 
-                proxy.init(shardName, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
+                proxy.init(shardName);
 
                 expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class);
 
index 6d09d1cfac3cd4a36003fc7b4bbb8f3aae53c81d..00f0f4bdf0502095aefad3858ffa18f63625e1a6 100644 (file)
@@ -38,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChang
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import scala.concurrent.Await;
@@ -157,7 +158,7 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
     private Entry<MockDataTreeChangeListener, ActorSelection> registerChangeListener(final YangInstanceIdentifier path,
             final int expectedEvents) {
         MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
-        ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
+        ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, TestModel.TEST_PATH));
 
         try {
             RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply)
index 2edbff2380b5ba96b7622a97a5a453cb30976898..45cfd29d255af37b91df19553261570079c65081 100644 (file)
@@ -139,8 +139,8 @@ public class ShardTest extends AbstractShardTest {
                 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
 
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
-                        "testRegisterChangeListener-DataChangeListener");
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
+                        TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
 
                 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
                         AsyncDataBroker.DataChangeScope.BASE, true), getRef());
@@ -216,8 +216,9 @@ public class ShardTest extends AbstractShardTest {
 
         setupInMemorySnapshotStore();
 
+        final YangInstanceIdentifier path = TestModel.TEST_PATH;
         final MockDataChangeListener listener = new MockDataChangeListener(1);
-        final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+        final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
 
         final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -226,8 +227,6 @@ public class ShardTest extends AbstractShardTest {
 
         new ShardTestKit(getSystem()) {
             {
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
                 // Wait until the shard receives the first ElectionTimeout
                 // message.
                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
@@ -271,8 +270,8 @@ public class ShardTest extends AbstractShardTest {
                 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
 
                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
-                        "testRegisterDataTreeChangeListener-DataTreeChangeListener");
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                        TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
 
                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
 
@@ -326,16 +325,15 @@ public class ShardTest extends AbstractShardTest {
 
         setupInMemorySnapshotStore();
 
+        final YangInstanceIdentifier path = TestModel.TEST_PATH;
         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-        final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+        final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
 
         final TestActorRef<Shard> shard = actorFactory.createTestActor(
                 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
 
-        final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
         new ShardTestKit(getSystem()) {
             {
                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
@@ -2129,8 +2127,9 @@ public class ShardTest extends AbstractShardTest {
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
+                final YangInstanceIdentifier path = TestModel.TEST_PATH;
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         actorFactory.generateActorId(testName + "-DataChangeListener"));
 
                 setupInMemorySnapshotStore();
@@ -2141,8 +2140,6 @@ public class ShardTest extends AbstractShardTest {
 
                 waitUntilNoLeader(shard);
 
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
                 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
                         getRef());
                 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
@@ -2189,7 +2186,7 @@ public class ShardTest extends AbstractShardTest {
 
                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         actorFactory.generateActorId(testName + "-DataChangeListener"));
 
                 followerShard.tell(
@@ -2215,8 +2212,8 @@ public class ShardTest extends AbstractShardTest {
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
-                        actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                        TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
                 setupInMemorySnapshotStore();
 
@@ -2271,7 +2268,7 @@ public class ShardTest extends AbstractShardTest {
 
                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
                         actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
                 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());