Add OnDemandShardState to report additional Shard state 83/51583/6
authorTom Pantelis <tpanteli@brocade.com>
Tue, 7 Feb 2017 21:48:03 +0000 (16:48 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 15 Feb 2017 03:55:30 +0000 (03:55 +0000)
Extended the OnDemandRaftState with OnDemandShardState to include
additional Shard state, including DTCL, DCL, and commit cohort actors.
This will enable us to report thus info from the JMX bean as it's useful
for debugging to have visibility into what listeners and cohorts
are registered.

The actors now also store the registered path. Both the instance and path
will be queried for debugging.

Change-Id: Iaa6c27c9aba3b5c0223199e6a3fc21bc54da95ba
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
25 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 08907e3..1c057d7 100644 (file)
@@ -399,7 +399,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-        OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+        OnDemandRaftState.AbstractBuilder<?> builder = newOnDemandRaftStateBuilder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog().dataSize())
@@ -443,6 +443,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     }
 
+    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+        return OnDemandRaftState.builder();
+    }
+
     private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
 
index cf9bb62..9bbee88 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nonnull;
 
 /**
  * The response to a GetOnDemandRaftState message.
@@ -41,7 +42,7 @@ public class OnDemandRaftState {
     private Map<String, String> peerAddresses = Collections.emptyMap();
     private Map<String, Boolean> peerVotingStates = Collections.emptyMap();
 
-    private OnDemandRaftState() {
+    protected OnDemandRaftState() {
     }
 
     public static Builder builder() {
@@ -132,116 +133,131 @@ public class OnDemandRaftState {
         return customRaftPolicyClassName;
     }
 
-    public static class Builder {
-        private final OnDemandRaftState stats = new OnDemandRaftState();
+    public abstract static class AbstractBuilder<T extends AbstractBuilder<T>> {
+        @SuppressWarnings("unchecked")
+        protected T self() {
+            return (T) this;
+        }
+
+        @Nonnull
+        protected abstract OnDemandRaftState state();
 
-        public Builder lastLogIndex(long value) {
-            stats.lastLogIndex = value;
-            return this;
+        public T lastLogIndex(long value) {
+            state().lastLogIndex = value;
+            return self();
         }
 
-        public Builder lastLogTerm(long value) {
-            stats.lastLogTerm = value;
-            return this;
+        public T lastLogTerm(long value) {
+            state().lastLogTerm = value;
+            return self();
         }
 
-        public Builder currentTerm(long value) {
-            stats.currentTerm = value;
-            return this;
+        public T currentTerm(long value) {
+            state().currentTerm = value;
+            return self();
         }
 
-        public Builder commitIndex(long value) {
-            stats.commitIndex = value;
-            return this;
+        public T commitIndex(long value) {
+            state().commitIndex = value;
+            return self();
         }
 
-        public Builder lastApplied(long value) {
-            stats.lastApplied = value;
-            return this;
+        public T lastApplied(long value) {
+            state().lastApplied = value;
+            return self();
         }
 
-        public Builder lastIndex(long value) {
-            stats.lastIndex = value;
-            return this;
+        public T lastIndex(long value) {
+            state().lastIndex = value;
+            return self();
         }
 
-        public Builder lastTerm(long value) {
-            stats.lastTerm = value;
-            return this;
+        public T lastTerm(long value) {
+            state().lastTerm = value;
+            return self();
         }
 
-        public Builder snapshotIndex(long value) {
-            stats.snapshotIndex = value;
-            return this;
+        public T snapshotIndex(long value) {
+            state().snapshotIndex = value;
+            return self();
         }
 
-        public Builder snapshotTerm(long value) {
-            stats.snapshotTerm = value;
-            return this;
+        public T snapshotTerm(long value) {
+            state().snapshotTerm = value;
+            return self();
         }
 
-        public Builder replicatedToAllIndex(long value) {
-            stats.replicatedToAllIndex = value;
-            return this;
+        public T replicatedToAllIndex(long value) {
+            state().replicatedToAllIndex = value;
+            return self();
         }
 
-        public Builder inMemoryJournalDataSize(long value) {
-            stats.inMemoryJournalDataSize = value;
-            return this;
+        public T inMemoryJournalDataSize(long value) {
+            state().inMemoryJournalDataSize = value;
+            return self();
         }
 
-        public Builder inMemoryJournalLogSize(long value) {
-            stats.inMemoryJournalLogSize = value;
-            return this;
+        public T inMemoryJournalLogSize(long value) {
+            state().inMemoryJournalLogSize = value;
+            return self();
         }
 
-        public Builder leader(String value) {
-            stats.leader = value;
-            return this;
+        public T leader(String value) {
+            state().leader = value;
+            return self();
         }
 
-        public Builder raftState(String value) {
-            stats.raftState = value;
-            return this;
+        public T raftState(String value) {
+            state().raftState = value;
+            return self();
         }
 
-        public Builder votedFor(String value) {
-            stats.votedFor = value;
-            return this;
+        public T votedFor(String value) {
+            state().votedFor = value;
+            return self();
         }
 
-        public Builder isVoting(boolean isVoting) {
-            stats.isVoting = isVoting;
-            return this;
+        public T isVoting(boolean isVoting) {
+            state().isVoting = isVoting;
+            return self();
         }
 
-        public Builder followerInfoList(List<FollowerInfo> followerInfoList) {
-            stats.followerInfoList = followerInfoList;
-            return this;
+        public T followerInfoList(List<FollowerInfo> followerInfoList) {
+            state().followerInfoList = followerInfoList;
+            return self();
         }
 
-        public Builder peerAddresses(Map<String, String> peerAddresses) {
-            stats.peerAddresses = peerAddresses;
-            return this;
+        public T peerAddresses(Map<String, String> peerAddresses) {
+            state().peerAddresses = peerAddresses;
+            return self();
         }
 
-        public Builder peerVotingStates(Map<String, Boolean> peerVotingStates) {
-            stats.peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
-            return this;
+        public T peerVotingStates(Map<String, Boolean> peerVotingStates) {
+            state().peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
+            return self();
         }
 
-        public Builder isSnapshotCaptureInitiated(boolean value) {
-            stats.isSnapshotCaptureInitiated = value;
-            return this;
+        public T isSnapshotCaptureInitiated(boolean value) {
+            state().isSnapshotCaptureInitiated = value;
+            return self();
         }
 
-        public Builder customRaftPolicyClassName(String className) {
-            stats.customRaftPolicyClassName = className;
-            return this;
+        public T customRaftPolicyClassName(String className) {
+            state().customRaftPolicyClassName = className;
+            return self();
         }
 
         public OnDemandRaftState build() {
-            return stats;
+            return state();
+        }
+    }
+
+    public static class Builder extends AbstractBuilder<Builder> {
+        private final OnDemandRaftState state = new OnDemandRaftState();
+
+        @Override
+        protected OnDemandRaftState state() {
+            return state;
         }
     }
 }
index f23f956..0821951 100644 (file)
@@ -9,21 +9,18 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EventListener;
-import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class AbstractDataListenerSupport<L extends EventListener, M extends ListenerRegistrationMessage,
         D extends DelayedListenerRegistration<L, M>, R extends ListenerRegistration<L>>
-                extends LeaderLocalDelegateFactory<M, R, Optional<DataTreeCandidate>> {
+                extends LeaderLocalDelegateFactory<M, R> {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
@@ -68,8 +65,7 @@ abstract class AbstractDataListenerSupport<L extends EventListener, M extends Li
 
         final ListenerRegistration<L> registration;
         if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
-            final Entry<R, Optional<DataTreeCandidate>> res = createDelegate(message);
-            registration = res.getKey();
+            registration = createDelegate(message);
         } else {
             log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
index 36c822c..db32b36 100644 (file)
@@ -179,8 +179,8 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
 
         final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
-                new DataTreeChangeListenerProxy<>(actorContext, listener);
-        listenerRegistrationProxy.init(shardName, treeId);
+                new DataTreeChangeListenerProxy<>(actorContext, listener, treeId);
+        listenerRegistrationProxy.init(shardName);
 
         return listenerRegistrationProxy;
     }
index 872feea..dc19aa4 100644 (file)
@@ -9,9 +9,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
-import akka.japi.Creator;
 import com.google.common.base.Preconditions;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
@@ -30,10 +28,13 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 @Deprecated
 public class DataChangeListener extends AbstractUntypedActor {
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+    private final YangInstanceIdentifier registeredPath;
     private boolean notificationsEnabled = false;
 
-    public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
+    public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+            final YangInstanceIdentifier registeredPath) {
         this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
     }
 
     @Override
@@ -78,26 +79,8 @@ public class DataChangeListener extends AbstractUntypedActor {
         }
     }
 
-    public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier,
-                                                            NormalizedNode<?, ?>> listener) {
-        return Props.create(new DataChangeListenerCreator(listener));
-    }
-
-    private static class DataChangeListenerCreator implements Creator<DataChangeListener> {
-        private static final long serialVersionUID = 1L;
-
-        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
-                + "create remote instances of this actor and thus don't need it to be Serializable.")
-        final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-
-        DataChangeListenerCreator(
-                AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
-            this.listener = listener;
-        }
-
-        @Override
-        public DataChangeListener create() throws Exception {
-            return new DataChangeListener(listener);
-        }
+    public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+            final YangInstanceIdentifier registeredPath) {
+        return Props.create(DataChangeListener.class, listener, registeredPath);
     }
 }
index 9eba41a..f0f4b7b 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -41,18 +42,18 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
 
     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
 
-    private volatile ActorSelection listenerRegistrationActor;
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-    private ActorRef dataChangeListenerActor;
     private final String shardName;
     private final ActorContext actorContext;
+    private ActorRef dataChangeListenerActor;
+    private volatile ActorSelection listenerRegistrationActor;
     private boolean closed = false;
 
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
             DataChangeListenerRegistrationProxy(String shardName, ActorContext actorContext, L listener) {
-        this.shardName = shardName;
-        this.actorContext = actorContext;
-        this.listener = listener;
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.listener = Preconditions.checkNotNull(listener);
     }
 
     @VisibleForTesting
@@ -92,7 +93,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
     public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
 
         dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-                DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
+                DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
 
         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
         findFuture.onComplete(new OnComplete<ActorRef>() {
index 26d8fa1..9b2becc 100644 (file)
@@ -10,10 +10,15 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -26,14 +31,20 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport<
             DelayedDataChangeListenerRegistration, DataChangeListenerRegistration<
                     AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
 
+    private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+
     DataChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
+    Collection<ActorSelection> getListenerActors() {
+        return Collections.unmodifiableCollection(listenerActors);
+    }
+
     @Override
-    Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-            Optional<DataTreeCandidate>> createDelegate(final RegisterChangeListener message) {
-        ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
+    DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+            createDelegate(final RegisterChangeListener message) {
+        final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
         // If this shard is the leader then it will enable notifications else
@@ -57,7 +68,32 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport<
 
         getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue());
 
-        return regEntry;
+        listenerActors.add(dataChangeListenerPath);
+        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+            delegate = regEntry.getKey();
+        return new DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                NormalizedNode<?,?>>>() {
+            @Override
+            public void close() {
+                listenerActors.remove(dataChangeListenerPath);
+                delegate.close();
+            }
+
+            @Override
+            public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+                return delegate.getInstance();
+            }
+
+            @Override
+            public YangInstanceIdentifier getPath() {
+                return delegate.getPath();
+            }
+
+            @Override
+            public DataChangeScope getScope() {
+                return delegate.getScope();
+            }
+        };
     }
 
     @Override
index bccb484..2936a28 100644 (file)
@@ -8,14 +8,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
-import akka.japi.Creator;
 import com.google.common.base.Preconditions;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 /**
  * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating
@@ -23,10 +22,13 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
  */
 final class DataTreeChangeListenerActor extends AbstractUntypedActor {
     private final DOMDataTreeChangeListener listener;
+    private final YangInstanceIdentifier registeredPath;
     private boolean notificationsEnabled = false;
 
-    private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) {
+    private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener,
+            final YangInstanceIdentifier registeredPath) {
         this.listener = Preconditions.checkNotNull(listener);
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
     }
 
     @Override
@@ -70,24 +72,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor {
                 listener);
     }
 
-    public static Props props(final DOMDataTreeChangeListener listener) {
-        return Props.create(new DataTreeChangeListenerCreator(listener));
-    }
-
-    private static final class DataTreeChangeListenerCreator implements Creator<DataTreeChangeListenerActor> {
-        private static final long serialVersionUID = 1L;
-
-        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
-                + "create remote instances of this actor and thus don't need it to be Serializable.")
-        private final DOMDataTreeChangeListener listener;
-
-        DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) {
-            this.listener = Preconditions.checkNotNull(listener);
-        }
-
-        @Override
-        public DataTreeChangeListenerActor create() {
-            return new DataTreeChangeListenerActor(listener);
-        }
+    public static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) {
+        return Props.create(DataTreeChangeListenerActor.class, listener, registeredPath);
     }
 }
index 8a9b466..f60e676 100644 (file)
@@ -37,15 +37,18 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
     private final ActorRef dataChangeListenerActor;
     private final ActorContext actorContext;
+    private final YangInstanceIdentifier registeredPath;
 
     @GuardedBy("this")
     private ActorSelection listenerRegistrationActor;
 
-    DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
+    DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener,
+            final YangInstanceIdentifier registeredPath) {
         super(listener);
         this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
         this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-                DataTreeChangeListenerActor.props(getInstance())
+                DataTreeChangeListenerActor.props(getInstance(), registeredPath)
                     .withDispatcher(actorContext.getNotificationDispatcherPath()));
     }
 
@@ -59,19 +62,19 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
         dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    void init(final String shardName, final YangInstanceIdentifier treeId) {
+    void init(final String shardName) {
         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
         findFuture.onComplete(new OnComplete<ActorRef>() {
             @Override
             public void onComplete(final Throwable failure, final ActorRef shard) {
                 if (failure instanceof LocalShardNotFoundException) {
                     LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
-                            + "cannot be registered", shardName, getInstance(), treeId);
+                            + "cannot be registered", shardName, getInstance(), registeredPath);
                 } else if (failure != null) {
                     LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
-                            + "cannot be registered: {}", shardName, getInstance(), treeId, failure);
+                            + "cannot be registered: {}", shardName, getInstance(), registeredPath, failure);
                 } else {
-                    doRegistration(shard, treeId);
+                    doRegistration(shard);
                 }
             }
         }, actorContext.getClientDispatcher());
@@ -94,10 +97,10 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
         actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
     }
 
-    private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
+    private void doRegistration(final ActorRef shard) {
 
         Future<Object> future = actorContext.executeOperationAsync(shard,
-                new RegisterDataTreeChangeListener(path, dataChangeListenerActor,
+                new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor,
                         getInstance() instanceof ClusteredDOMDataTreeChangeListener),
                 actorContext.getDatastoreContext().getShardInitializationTimeout());
 
@@ -106,7 +109,7 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
             public void onComplete(final Throwable failure, final Object result) {
                 if (failure != null) {
                     LOG.error("Failed to register DataTreeChangeListener {} at path {}",
-                            getInstance(), path.toString(), failure);
+                            getInstance(), registeredPath, failure);
                 } else {
                     RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
                     setListenerRegistrationActor(actorContext.actorSelection(
index f039ab7..5b8e5f8 100644 (file)
@@ -10,7 +10,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
@@ -21,14 +25,21 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
         RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration,
         ListenerRegistration<DOMDataTreeChangeListener>> {
+
+    private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+
     DataTreeChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
+    Collection<ActorSelection> getListenerActors() {
+        return Collections.unmodifiableCollection(listenerActors);
+    }
+
     @Override
-    Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> createDelegate(
+    ListenerRegistration<DOMDataTreeChangeListener> createDelegate(
             final RegisterDataTreeChangeListener message) {
-        ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
+        final ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
         // If this shard is the leader then it will enable notifications else
@@ -51,7 +62,20 @@ final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DO
         getShard().getDataStore().notifyOfInitialData(message.getPath(),
                 regEntry.getKey().getInstance(), regEntry.getValue());
 
-        return regEntry;
+        listenerActors.add(dataChangeListenerPath);
+        final ListenerRegistration<DOMDataTreeChangeListener> delegate = regEntry.getKey();
+        return new ListenerRegistration<DOMDataTreeChangeListener>() {
+            @Override
+            public DOMDataTreeChangeListener getInstance() {
+                return delegate.getInstance();
+            }
+
+            @Override
+            public void close() {
+                listenerActors.remove(dataChangeListenerPath);
+                delegate.close();
+            }
+        };
     }
 
     @Override
index 0be8f09..e6ff10d 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.mdsal.common.api.PostPreCommitStep;
 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -28,10 +29,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 final class DataTreeCohortActor extends AbstractUntypedActor {
     private final CohortBehaviour<?> idleState = new Idle();
     private final DOMDataTreeCommitCohort cohort;
+    private final YangInstanceIdentifier registeredPath;
     private CohortBehaviour<?> currentState = idleState;
 
-    private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
+    private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
         this.cohort = Preconditions.checkNotNull(cohort);
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
     }
 
     @Override
@@ -143,7 +146,8 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
             } else if (message instanceof Abort) {
                 return abort();
             }
-            throw new UnsupportedOperationException();
+            throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
+                    message.getClass(), getClass().getSimpleName()));
         }
 
         abstract CohortBehaviour<?> abort();
@@ -269,7 +273,7 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
 
     }
 
-    static Props props(final DOMDataTreeCommitCohort cohort) {
-        return Props.create(DataTreeCohortActor.class, cohort);
+    static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
+        return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
     }
 }
index 4364e22..f79c047 100644 (file)
@@ -15,6 +15,7 @@ import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +49,10 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
 
     private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
 
+    Collection<ActorRef> getCohortActors() {
+        return Collections.unmodifiableCollection(cohortToNode.keySet());
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
         takeLock();
index 18146b7..cfe12a1 100644 (file)
@@ -42,8 +42,8 @@ public class DataTreeCohortRegistrationProxy<C extends DOMDataTreeCommitCohort>
         super(cohort);
         this.subtree = Preconditions.checkNotNull(subtree);
         this.actorContext = Preconditions.checkNotNull(actorContext);
-        this.actor = actorContext.getActorSystem().actorOf(
-                DataTreeCohortActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+        this.actor = actorContext.getActorSystem().actorOf(DataTreeCohortActor.props(getInstance(),
+                subtree.getRootIdentifier()).withDispatcher(actorContext.getNotificationDispatcherPath()));
     }
 
 
index 837242d..ac13272 100644 (file)
@@ -7,12 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Optional;
 import java.util.EventListener;
-import java.util.Map.Entry;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 abstract class DelayedListenerRegistration<L extends EventListener, M> implements ListenerRegistration<L> {
     private final M registrationMessage;
@@ -34,10 +31,9 @@ abstract class DelayedListenerRegistration<L extends EventListener, M> implement
     }
 
     synchronized <R extends ListenerRegistration<L>> void createDelegate(
-            final LeaderLocalDelegateFactory<M, R, Optional<DataTreeCandidate>> factory) {
+            final LeaderLocalDelegateFactory<M, R> factory) {
         if (!closed) {
-            final Entry<R, Optional<DataTreeCandidate>> res = factory.createDelegate(registrationMessage);
-            this.delegate = res.getKey();
+            this.delegate = factory.createDelegate(registrationMessage);
         }
     }
 
index 1542fd6..cd1b548 100644 (file)
@@ -7,15 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.Map.Entry;
-
 /**
  * Base class for factories instantiating delegates.
  *
  * @param <M> message type
  * @param <D> delegate type
- * @param <I> initial state type
  */
-abstract class DelegateFactory<M, D, I> {
-    abstract Entry<D, I> createDelegate(M message);
+abstract class DelegateFactory<M, D> {
+    abstract D createDelegate(M message);
 }
index fbd974a..0f76629 100644 (file)
@@ -19,9 +19,8 @@ import com.google.common.base.Preconditions;
  *
  * @param <D> delegate type
  * @param <M> message type
- * @param <I> initial state type
  */
-abstract class LeaderLocalDelegateFactory<M, D, I> extends DelegateFactory<M, D, I> {
+abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
     private final Shard shard;
 
     protected LeaderLocalDelegateFactory(final Shard shard) {
index 84c399d..d5d129f 100644 (file)
@@ -65,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -80,6 +81,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -783,6 +785,13 @@ public class Shard extends RaftActor {
         store.setRunOnPendingTransactionsComplete(operation);
     }
 
+    @Override
+    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+        return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
+                .dataChangeListenerActors(changeSupport.getListenerActors())
+                .commitCohortActors(store.getCohortActors());
+    }
+
     @Override
     public String persistenceId() {
         return this.name;
index 2821912..d398afe 100644 (file)
@@ -772,6 +772,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingCommit();
     }
 
+    Collection<ActorRef> getCohortActors() {
+        return cohortRegistry.getCohortActors();
+    }
+
     void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
         cohortRegistry.process(sender, message);
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java
new file mode 100644 (file)
index 0000000..ba6f229
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+
+/**
+ * Extends OnDemandRaftState to add Shard state.
+ *
+ * @author Thomas Pantelis
+ */
+public class OnDemandShardState extends OnDemandRaftState {
+    private Collection<ActorSelection> treeChangeListenerActors;
+    private Collection<ActorSelection> dataChangeListenerActors;
+    private Collection<ActorRef> commitCohortActors;
+
+    public Collection<ActorSelection> getTreeChangeListenerActors() {
+        return treeChangeListenerActors;
+    }
+
+    public Collection<ActorSelection> getDataChangeListenerActors() {
+        return dataChangeListenerActors;
+    }
+
+    public Collection<ActorRef> getCommitCohortActors() {
+        return commitCohortActors;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder extends AbstractBuilder<Builder> {
+        private final OnDemandShardState state = new OnDemandShardState();
+
+        @Override
+        protected OnDemandRaftState state() {
+            return state;
+        }
+
+        public Builder treeChangeListenerActors(Collection<ActorSelection> actors) {
+            state.treeChangeListenerActors = actors;
+            return self();
+        }
+
+        public Builder dataChangeListenerActors(Collection<ActorSelection> actors) {
+            state.dataChangeListenerActors = actors;
+            return self();
+        }
+
+        public Builder commitCohortActors(Collection<ActorRef> actors) {
+            state.commitCohortActors = actors;
+            return self();
+        }
+    }
+}
index fc70e33..9721766 100644 (file)
@@ -30,6 +30,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
@@ -53,7 +54,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
 
                 final Shard shard = actor.underlyingActor();
                 final MockDataChangeListener listener = new MockDataChangeListener(0);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, TEST_PATH),
                         "testChangeListenerWithNoInitialData-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
                 support.onMessage(new RegisterChangeListener(TEST_PATH, dclActor, DataChangeScope.ONE, false),
@@ -79,7 +80,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
                 writeToStore(shard.getDataStore(), TestModel.TEST_PATH,
                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, TEST_PATH),
                         "testInitialChangeListenerEventWithContainerPath-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
                 support.onMessage(new RegisterChangeListener(TEST_PATH, dclActor, DataChangeScope.ONE, false),
@@ -104,7 +105,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
                 mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
 
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, OUTER_LIST_PATH),
                         "testInitialChangeListenerEventWithListPath-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
                 support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH, dclActor, DataChangeScope.ONE, false),
@@ -137,11 +138,11 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
                         ImmutableNodes.containerNode(OUTER_CONTAINER_QNAME));
 
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME);
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         "testInitialChangeListenerEventWithWildcardedListPath-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
-                support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME), dclActor,
-                        DataChangeScope.ONE, false), true, true);
+                support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), true, true);
 
                 listener.waitForChangeEvents();
                 listener.verifyCreatedData(0, outerEntryPath(1));
@@ -169,12 +170,12 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
                                 outerNodeEntry(2, innerNode("three", "four")))));
 
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME)
+                        .node(INNER_LIST_QNAME).node(INNER_LIST_QNAME);
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         "testInitialChangeListenerEventWithNestedWildcardedListsPath-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
-                support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME)
-                        .node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor, DataChangeScope.ONE, false),
-                            true, true);
+                support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), true, true);
 
 
                 listener.waitForChangeEvents();
@@ -185,12 +186,13 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
 
                 // Register for a specific outer list entry
                 final MockDataChangeListener listener2 = new MockDataChangeListener(1);
-                final ActorRef dclActor2 = actorFactory.createActor(DataChangeListener.props(listener2),
+                final YangInstanceIdentifier path2 = OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME)
+                        .node(INNER_LIST_QNAME);
+                final ActorRef dclActor2 = actorFactory.createActor(DataChangeListener.props(listener2, path2),
                         "testInitialChangeListenerEventWithNestedWildcardedListsPath-DataChangeListener2");
                 final DataChangeListenerSupport support2 = new DataChangeListenerSupport(shard);
-                support2.onMessage(new RegisterChangeListener(
-                        OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor2,
-                        DataChangeScope.ONE, false), true, true);
+                support2.onMessage(new RegisterChangeListener(path2, dclActor2, DataChangeScope.ONE, false),
+                        true, true);
 
                 listener2.waitForChangeEvents();
                 listener2.verifyCreatedData(0, innerEntryPath(1, "one"));
@@ -219,12 +221,12 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
                                 outerNodeEntry(2, innerNode("three", "four")))));
 
                 final MockDataChangeListener listener = new MockDataChangeListener(0);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME)
+                        .node(INNER_LIST_QNAME);
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         "testInitialChangeListenerEventWhenNotInitiallyLeader-DataChangeListener");
                 final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
-                support.onMessage(new RegisterChangeListener(
-                        OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor,
-                        DataChangeScope.ONE, false), false, true);
+                support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), false, true);
 
                 listener.expectNoMoreChanges("Unexpected initial change event");
                 listener.reset(1);
index 544a566..f3dcaa2 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+
 import akka.actor.ActorRef;
 import akka.actor.DeadLetter;
 import akka.actor.Props;
@@ -30,7 +32,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             {
                 final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
                 final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
-                final Props props = DataChangeListener.props(mockListener);
+                final Props props = DataChangeListener.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
 
                 // Let the DataChangeListener know that notifications should be
@@ -53,7 +55,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             {
                 final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
                 final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
-                final Props props = DataChangeListener.props(mockListener);
+                final Props props = DataChangeListener.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
 
                 subject.tell(new DataChanged(mockChangeEvent), getRef());
@@ -78,7 +80,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             {
                 final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
                 final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
-                final Props props = DataChangeListener.props(mockListener);
+                final Props props = DataChangeListener.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender");
 
                 getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
@@ -115,7 +117,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
                 AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
                 Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataChanged(mockChangeEvent2);
 
-                Props props = DataChangeListener.props(mockListener);
+                Props props = DataChangeListener.props(mockListener, TEST_PATH);
                 ActorRef subject = getSystem().actorOf(props, "testDataChangedWithListenerRuntimeEx");
 
                 // Let the DataChangeListener know that notifications should be
index 6b8658c..62b95c2 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+
 import akka.actor.ActorRef;
 import akka.actor.DeadLetter;
 import akka.actor.Props;
@@ -31,7 +33,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
                 final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
                 final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
                 final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
-                final Props props = DataTreeChangeListenerActor.props(mockListener);
+                final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsEnabled");
 
                 // Let the DataChangeListener know that notifications should be
@@ -54,7 +56,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
                 final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
                 final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
                 final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
-                final Props props = DataTreeChangeListenerActor.props(mockListener);
+                final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsDisabled");
 
                 subject.tell(new DataTreeChanged(mockCandidates), getRef());
@@ -79,7 +81,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
                 final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
                 final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
                 final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
-                final Props props = DataTreeChangeListenerActor.props(mockListener);
+                final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
                 final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithNoSender");
 
                 getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
@@ -119,7 +121,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
                 final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
                 Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataTreeChanged(mockCandidates2);
 
-                Props props = DataTreeChangeListenerActor.props(mockListener);
+                Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
                 ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithListenerRuntimeEx");
 
                 // Let the DataChangeListener know that notifications should be
index 8cf0d83..e6b20a7 100644 (file)
@@ -56,14 +56,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
                         mock(Configuration.class));
 
+                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                        actorContext, mockListener);
+                        actorContext, mockListener, path);
 
-                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 new Thread() {
                     @Override
                     public void run() {
-                        proxy.init("shard-1", path);
+                        proxy.init("shard-1");
                     }
 
                 }.start();
@@ -115,14 +115,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
                         ClusteredDOMDataTreeChangeListener.class);
 
+                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
-                        new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener);
+                        new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path);
 
-                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 new Thread() {
                     @Override
                     public void run() {
-                        proxy.init("shard-1", path);
+                        proxy.init("shard-1");
                     }
 
                 }.start();
@@ -150,14 +150,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
                         mock(Configuration.class));
 
+                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                        actorContext, mockListener);
+                        actorContext, mockListener, path);
 
-                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 new Thread() {
                     @Override
                     public void run() {
-                        proxy.init("shard-1", path);
+                        proxy.init("shard-1");
                     }
 
                 }.start();
@@ -182,14 +182,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
                         mock(Configuration.class));
 
+                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                        actorContext, mockListener);
+                        actorContext, mockListener, path);
 
-                final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
                 new Thread() {
                     @Override
                     public void run() {
-                        proxy.init("shard-1", path);
+                        proxy.init("shard-1");
                     }
 
                 }.start();
@@ -230,7 +230,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
 
                 String shardName = "shard-1";
                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                        actorContext, mockListener);
+                        actorContext, mockListener, path);
 
                 doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
                 doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
@@ -238,7 +238,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                         .executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class));
                 doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
 
-                proxy.init("shard-1", path);
+                proxy.init("shard-1");
 
                 Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
 
@@ -265,7 +265,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
 
                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                        actorContext, mockListener);
+                        actorContext, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
 
                 Answer<Future<Object>> answer = invocation -> {
                     proxy.close();
@@ -275,7 +275,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                 doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), any(Object.class),
                         any(Timeout.class));
 
-                proxy.init(shardName, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
+                proxy.init(shardName);
 
                 expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class);
 
index 6d09d1c..00f0f4b 100644 (file)
@@ -38,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChang
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import scala.concurrent.Await;
@@ -157,7 +158,7 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
     private Entry<MockDataTreeChangeListener, ActorSelection> registerChangeListener(final YangInstanceIdentifier path,
             final int expectedEvents) {
         MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
-        ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
+        ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, TestModel.TEST_PATH));
 
         try {
             RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply)
index 2edbff2..45cfd29 100644 (file)
@@ -139,8 +139,8 @@ public class ShardTest extends AbstractShardTest {
                 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
 
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
-                        "testRegisterChangeListener-DataChangeListener");
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
+                        TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
 
                 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
                         AsyncDataBroker.DataChangeScope.BASE, true), getRef());
@@ -216,8 +216,9 @@ public class ShardTest extends AbstractShardTest {
 
         setupInMemorySnapshotStore();
 
+        final YangInstanceIdentifier path = TestModel.TEST_PATH;
         final MockDataChangeListener listener = new MockDataChangeListener(1);
-        final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+        final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
 
         final TestActorRef<Shard> shard = actorFactory.createTestActor(
@@ -226,8 +227,6 @@ public class ShardTest extends AbstractShardTest {
 
         new ShardTestKit(getSystem()) {
             {
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
                 // Wait until the shard receives the first ElectionTimeout
                 // message.
                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
@@ -271,8 +270,8 @@ public class ShardTest extends AbstractShardTest {
                 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
 
                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
-                        "testRegisterDataTreeChangeListener-DataTreeChangeListener");
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                        TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
 
                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
 
@@ -326,16 +325,15 @@ public class ShardTest extends AbstractShardTest {
 
         setupInMemorySnapshotStore();
 
+        final YangInstanceIdentifier path = TestModel.TEST_PATH;
         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-        final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+        final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
 
         final TestActorRef<Shard> shard = actorFactory.createTestActor(
                 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
 
-        final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
         new ShardTestKit(getSystem()) {
             {
                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
@@ -2129,8 +2127,9 @@ public class ShardTest extends AbstractShardTest {
                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
+                final YangInstanceIdentifier path = TestModel.TEST_PATH;
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         actorFactory.generateActorId(testName + "-DataChangeListener"));
 
                 setupInMemorySnapshotStore();
@@ -2141,8 +2140,6 @@ public class ShardTest extends AbstractShardTest {
 
                 waitUntilNoLeader(shard);
 
-                final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
                 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
                         getRef());
                 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
@@ -2189,7 +2186,7 @@ public class ShardTest extends AbstractShardTest {
 
                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
                 final MockDataChangeListener listener = new MockDataChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                         actorFactory.generateActorId(testName + "-DataChangeListener"));
 
                 followerShard.tell(
@@ -2215,8 +2212,8 @@ public class ShardTest extends AbstractShardTest {
                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
-                        actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                        TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
                 setupInMemorySnapshotStore();
 
@@ -2271,7 +2268,7 @@ public class ShardTest extends AbstractShardTest {
 
                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+                final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
                         actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
                 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.