Add OnDemandShardState to report additional Shard state 05/51905/1
authorTom Pantelis <tpanteli@brocade.com>
Tue, 7 Feb 2017 21:48:03 +0000 (16:48 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 15 Feb 2017 13:30:05 +0000 (08:30 -0500)
Extended the OnDemandRaftState with OnDemandShardState to include
additional Shard state, including DTCL, DCL, and commit cohort actors.
This will enable us to report thus info from the JMX bean as it's useful
for debugging to have visibility into what listeners and cohorts
are registered.

The actors now also store the registered path. Both the instance and path
will be queried for debugging.

Change-Id: Iaa6c27c9aba3b5c0223199e6a3fc21bc54da95ba
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
25 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 88e64aa65d4b4d55d7a41475f665c088bca25b43..6bef0b593fb96b0d28b82c62b2504b6d07824fef 100644 (file)
@@ -422,7 +422,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-        OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+        OnDemandRaftState.AbstractBuilder<?> builder = newOnDemandRaftStateBuilder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog().dataSize())
@@ -466,6 +466,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     }
 
+    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+        return OnDemandRaftState.builder();
+    }
+
     private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
 
index 0bd85b1e6d19082eb1f86f0c9f272c3bddafec8a..97cdf723c4f2959b0c4ec69929843129bb46dc51 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nonnull;
 
 /**
  * The response to a GetOnDemandRaftState message,
@@ -41,7 +42,7 @@ public class OnDemandRaftState {
     private Map<String, String> peerAddresses = Collections.emptyMap();
     private Map<String, Boolean> peerVotingStates = Collections.emptyMap();
 
-    private OnDemandRaftState() {
+    protected OnDemandRaftState() {
     }
 
     public static Builder builder() {
@@ -132,116 +133,131 @@ public class OnDemandRaftState {
         return customRaftPolicyClassName;
     }
 
-    public static class Builder {
-        private final OnDemandRaftState stats = new OnDemandRaftState();
+    public abstract static class AbstractBuilder<T extends AbstractBuilder<T>> {
+        @SuppressWarnings("unchecked")
+        protected T self() {
+            return (T) this;
+        }
+
+        @Nonnull
+        protected abstract OnDemandRaftState state();
 
-        public Builder lastLogIndex(long value) {
-            stats.lastLogIndex = value;
-            return this;
+        public T lastLogIndex(long value) {
+            state().lastLogIndex = value;
+            return self();
         }
 
-        public Builder lastLogTerm(long value) {
-            stats.lastLogTerm = value;
-            return this;
+        public T lastLogTerm(long value) {
+            state().lastLogTerm = value;
+            return self();
         }
 
-        public Builder currentTerm(long value) {
-            stats.currentTerm = value;
-            return this;
+        public T currentTerm(long value) {
+            state().currentTerm = value;
+            return self();
         }
 
-        public Builder commitIndex(long value) {
-            stats.commitIndex = value;
-            return this;
+        public T commitIndex(long value) {
+            state().commitIndex = value;
+            return self();
         }
 
-        public Builder lastApplied(long value) {
-            stats.lastApplied = value;
-            return this;
+        public T lastApplied(long value) {
+            state().lastApplied = value;
+            return self();
         }
 
-        public Builder lastIndex(long value) {
-            stats.lastIndex = value;
-            return this;
+        public T lastIndex(long value) {
+            state().lastIndex = value;
+            return self();
         }
 
-        public Builder lastTerm(long value) {
-            stats.lastTerm = value;
-            return this;
+        public T lastTerm(long value) {
+            state().lastTerm = value;
+            return self();
         }
 
-        public Builder snapshotIndex(long value) {
-            stats.snapshotIndex = value;
-            return this;
+        public T snapshotIndex(long value) {
+            state().snapshotIndex = value;
+            return self();
         }
 
-        public Builder snapshotTerm(long value) {
-            stats.snapshotTerm = value;
-            return this;
+        public T snapshotTerm(long value) {
+            state().snapshotTerm = value;
+            return self();
         }
 
-        public Builder replicatedToAllIndex(long value) {
-            stats.replicatedToAllIndex = value;
-            return this;
+        public T replicatedToAllIndex(long value) {
+            state().replicatedToAllIndex = value;
+            return self();
         }
 
-        public Builder inMemoryJournalDataSize(long value) {
-            stats.inMemoryJournalDataSize = value;
-            return this;
+        public T inMemoryJournalDataSize(long value) {
+            state().inMemoryJournalDataSize = value;
+            return self();
         }
 
-        public Builder inMemoryJournalLogSize(long value) {
-            stats.inMemoryJournalLogSize = value;
-            return this;
+        public T inMemoryJournalLogSize(long value) {
+            state().inMemoryJournalLogSize = value;
+            return self();
         }
 
-        public Builder leader(String value) {
-            stats.leader = value;
-            return this;
+        public T leader(String value) {
+            state().leader = value;
+            return self();
         }
 
-        public Builder raftState(String value) {
-            stats.raftState = value;
-            return this;
+        public T raftState(String value) {
+            state().raftState = value;
+            return self();
         }
 
-        public Builder votedFor(String value) {
-            stats.votedFor = value;
-            return this;
+        public T votedFor(String value) {
+            state().votedFor = value;
+            return self();
         }
 
-        public Builder isVoting(boolean isVoting) {
-            stats.isVoting = isVoting;
-            return this;
+        public T isVoting(boolean isVoting) {
+            state().isVoting = isVoting;
+            return self();
         }
 
-        public Builder followerInfoList(List<FollowerInfo> followerInfoList) {
-            stats.followerInfoList = followerInfoList;
-            return this;
+        public T followerInfoList(List<FollowerInfo> followerInfoList) {
+            state().followerInfoList = followerInfoList;
+            return self();
         }
 
-        public Builder peerAddresses(Map<String, String> peerAddresses) {
-            stats.peerAddresses = peerAddresses;
-            return this;
+        public T peerAddresses(Map<String, String> peerAddresses) {
+            state().peerAddresses = peerAddresses;
+            return self();
         }
 
-        public Builder peerVotingStates(Map<String, Boolean> peerVotingStates) {
-            stats.peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
-            return this;
+        public T peerVotingStates(Map<String, Boolean> peerVotingStates) {
+            state().peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
+            return self();
         }
 
-        public Builder isSnapshotCaptureInitiated(boolean value) {
-            stats.isSnapshotCaptureInitiated = value;
-            return this;
+        public T isSnapshotCaptureInitiated(boolean value) {
+            state().isSnapshotCaptureInitiated = value;
+            return self();
         }
 
-        public Builder customRaftPolicyClassName(String className) {
-            stats.customRaftPolicyClassName = className;
-            return this;
+        public T customRaftPolicyClassName(String className) {
+            state().customRaftPolicyClassName = className;
+            return self();
         }
 
         public OnDemandRaftState build() {
-            return stats;
+            return state();
+        }
+    }
+
+    public static class Builder extends AbstractBuilder<Builder> {
+        private final OnDemandRaftState state = new OnDemandRaftState();
+
+        @Override
+        protected OnDemandRaftState state() {
+            return state;
         }
     }
 }
index a253b794db15871f69117f291be14ef4d1193a23..fde9ff3a46304701189b4ef053f88dace6ebe3dd 100644 (file)
@@ -9,21 +9,18 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EventListener;
-import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-abstract class AbstractDataListenerSupport<L extends EventListener, R extends ListenerRegistrationMessage,
-        D extends DelayedListenerRegistration<L, R>, LR extends ListenerRegistration<L>>
-                extends LeaderLocalDelegateFactory<R, LR, Optional<DataTreeCandidate>> {
+abstract class AbstractDataListenerSupport<L extends EventListener, M extends ListenerRegistrationMessage,
+        D extends DelayedListenerRegistration<L, M>, R extends ListenerRegistration<L>>
+                extends LeaderLocalDelegateFactory<M, R> {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
@@ -63,13 +60,12 @@ abstract class AbstractDataListenerSupport<L extends EventListener, R extends Li
     }
 
     @Override
-    void onMessage(R message, boolean isLeader, boolean hasLeader) {
+    void onMessage(M message, boolean isLeader, boolean hasLeader) {
         log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader);
 
         final ListenerRegistration<L> registration;
-        if((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
-            final Entry<LR, Optional<DataTreeCandidate>> res = createDelegate(message);
-            registration = res.getKey();
+        if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
+            registration = createDelegate(message);
         } else {
             log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
@@ -99,7 +95,7 @@ abstract class AbstractDataListenerSupport<L extends EventListener, R extends Li
         actors.add(actor);
     }
 
-    protected abstract D newDelayedListenerRegistration(R message);
+    protected abstract D newDelayedListenerRegistration(M message);
 
     protected abstract ActorRef newRegistrationActor(ListenerRegistration<L> registration);
 
index d572beb4d4d668dfc574599ea51cced32a27fd54..f2c2f1dbf5c3ecd5eb5b1ba2eda5c9684fc3ac6a 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
-import akka.japi.Creator;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
@@ -31,10 +30,13 @@ public class DataChangeListener extends AbstractUntypedActor {
     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListener.class);
 
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+    private final YangInstanceIdentifier registeredPath;
     private boolean notificationsEnabled = false;
 
-    public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
+    public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+            final YangInstanceIdentifier registeredPath) {
         this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
     }
 
     @Override
@@ -78,24 +80,8 @@ public class DataChangeListener extends AbstractUntypedActor {
         }
     }
 
-    public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier,
-                                                            NormalizedNode<?, ?>> listener) {
-        return Props.create(new DataChangeListenerCreator(listener));
-    }
-
-    private static class DataChangeListenerCreator implements Creator<DataChangeListener> {
-        private static final long serialVersionUID = 1L;
-
-        final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-
-        DataChangeListenerCreator(
-                AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
-            this.listener = listener;
-        }
-
-        @Override
-        public DataChangeListener create() throws Exception {
-            return new DataChangeListener(listener);
-        }
+    public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+            final YangInstanceIdentifier registeredPath) {
+        return Props.create(DataChangeListener.class, listener, registeredPath);
     }
 }
index 534beead0ed4d256c4626870fbee54582add8d9f..738d256369eaadfb668bf6c08c79ba54b0de39d8 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -41,19 +42,19 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
 
     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
 
-    private volatile ActorSelection listenerRegistrationActor;
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-    private ActorRef dataChangeListenerActor;
     private final String shardName;
     private final ActorContext actorContext;
+    private ActorRef dataChangeListenerActor;
+    private volatile ActorSelection listenerRegistrationActor;
     private boolean closed = false;
 
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
                                                               DataChangeListenerRegistrationProxy (
             String shardName, ActorContext actorContext, L listener) {
-        this.shardName = shardName;
-        this.actorContext = actorContext;
-        this.listener = listener;
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.listener = Preconditions.checkNotNull(listener);
     }
 
     @VisibleForTesting
@@ -93,7 +94,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
     public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
 
         dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-                DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
+                DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
 
         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
         findFuture.onComplete(new OnComplete<ActorRef>() {
index f4b6bcc9fd56801a2ea077151a947627e2099595..57e20059e69cfed0229a15ae67f642b3a385d684 100644 (file)
@@ -10,10 +10,15 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -26,14 +31,20 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport<
             DelayedDataChangeListenerRegistration, DataChangeListenerRegistration<
                     AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
 
+    private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+
     DataChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
+    Collection<ActorSelection> getListenerActors() {
+        return Collections.unmodifiableCollection(listenerActors);
+    }
+
     @Override
-    Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-            Optional<DataTreeCandidate>> createDelegate(final RegisterChangeListener message) {
-        ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
+    DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+            createDelegate(final RegisterChangeListener message) {
+        final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
         // If this shard is the leader then it will enable notifications else
@@ -57,7 +68,32 @@ final class DataChangeListenerSupport extends AbstractDataListenerSupport<
 
         getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue());
 
-        return regEntry;
+        listenerActors.add(dataChangeListenerPath);
+        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+            delegate = regEntry.getKey();
+        return new DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                NormalizedNode<?,?>>>() {
+            @Override
+            public void close() {
+                listenerActors.remove(dataChangeListenerPath);
+                delegate.close();
+            }
+
+            @Override
+            public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+                return delegate.getInstance();
+            }
+
+            @Override
+            public YangInstanceIdentifier getPath() {
+                return delegate.getPath();
+            }
+
+            @Override
+            public DataChangeScope getScope() {
+                return delegate.getScope();
+            }
+        };
     }
 
     @Override
index 03978f2b66387bcd483b05bcc58c4afb9ac7db8b..7e6d87cc02093753a0f2aa57049839bb70015665 100644 (file)
@@ -8,13 +8,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
-import akka.japi.Creator;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,10 +25,13 @@ import org.slf4j.LoggerFactory;
 final class DataTreeChangeListenerActor extends AbstractUntypedActor {
     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerActor.class);
     private final DOMDataTreeChangeListener listener;
+    private final YangInstanceIdentifier registeredPath;
     private boolean notificationsEnabled = false;
 
-    private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) {
+    private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener,
+            final YangInstanceIdentifier registeredPath) {
         this.listener = Preconditions.checkNotNull(listener);
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
     }
 
     @Override
@@ -71,21 +74,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor {
                 listener);
     }
 
-    public static Props props(final DOMDataTreeChangeListener listener) {
-        return Props.create(new DataTreeChangeListenerCreator(listener));
-    }
-
-    private static final class DataTreeChangeListenerCreator implements Creator<DataTreeChangeListenerActor> {
-        private static final long serialVersionUID = 1L;
-        private final DOMDataTreeChangeListener listener;
-
-        DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) {
-            this.listener = Preconditions.checkNotNull(listener);
-        }
-
-        @Override
-        public DataTreeChangeListenerActor create() {
-            return new DataTreeChangeListenerActor(listener);
-        }
+    public static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) {
+        return Props.create(DataTreeChangeListenerActor.class, listener, registeredPath);
     }
 }
index a45ae52afd4abde5f7bb8fd10df8063c7d571a38..72d9097191d06e006eaf5ff829799bdae4c5beac 100644 (file)
@@ -37,15 +37,19 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
     private final ActorRef dataChangeListenerActor;
     private final ActorContext actorContext;
+    private final YangInstanceIdentifier registeredPath;
 
     @GuardedBy("this")
     private ActorSelection listenerRegistrationActor;
 
-    public DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
+    DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener,
+            final YangInstanceIdentifier registeredPath) {
         super(listener);
         this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
         this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-            DataTreeChangeListenerActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+                DataTreeChangeListenerActor.props(getInstance(), registeredPath)
+                    .withDispatcher(actorContext.getNotificationDispatcherPath()));
     }
 
     @Override
@@ -58,19 +62,19 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
         dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    void init(final String shardName, final YangInstanceIdentifier treeId) {
+    void init(final String shardName) {
         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
         findFuture.onComplete(new OnComplete<ActorRef>() {
             @Override
             public void onComplete(final Throwable failure, final ActorRef shard) {
                 if (failure instanceof LocalShardNotFoundException) {
-                    LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " +
-                            "cannot be registered", shardName, getInstance(), treeId);
+                    LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
+                            + "cannot be registered", shardName, getInstance(), registeredPath);
                 } else if (failure != null) {
-                    LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " +
-                            "cannot be registered: {}", shardName, getInstance(), treeId, failure);
+                    LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
+                            + "cannot be registered: {}", shardName, getInstance(), registeredPath, failure);
                 } else {
-                    doRegistration(shard, treeId);
+                    doRegistration(shard);
                 }
             }
         }, actorContext.getClientDispatcher());
@@ -93,10 +97,10 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
         actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
     }
 
-    private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
+    private void doRegistration(final ActorRef shard) {
 
         Future<Object> future = actorContext.executeOperationAsync(shard,
-                new RegisterDataTreeChangeListener(path, dataChangeListenerActor,
+                new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor,
                         getInstance() instanceof ClusteredDOMDataTreeChangeListener),
                 actorContext.getDatastoreContext().getShardInitializationTimeout());
 
@@ -105,7 +109,7 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
             public void onComplete(final Throwable failure, final Object result) {
                 if (failure != null) {
                     LOG.error("Failed to register DataTreeChangeListener {} at path {}",
-                            getInstance(), path.toString(), failure);
+                            getInstance(), registeredPath, failure);
                 } else {
                     RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
                     setListenerRegistrationActor(actorContext.actorSelection(
index fa55523db0087467d4919075cbc39abc2a46621b..abafae80493bf74ca2ec3580739bf8cab98fd682 100644 (file)
@@ -10,7 +10,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
@@ -20,14 +24,21 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
         RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration, ListenerRegistration<DOMDataTreeChangeListener>> {
+
+    private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+
     DataTreeChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
+    Collection<ActorSelection> getListenerActors() {
+        return Collections.unmodifiableCollection(listenerActors);
+    }
+
     @Override
-    Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> createDelegate(
+    ListenerRegistration<DOMDataTreeChangeListener> createDelegate(
             final RegisterDataTreeChangeListener message) {
-        ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
+        final ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
         // If this shard is the leader then it will enable notifications else
@@ -50,7 +61,20 @@ final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DO
         getShard().getDataStore().notifyOfInitialData(message.getPath(),
                 regEntry.getKey().getInstance(), regEntry.getValue());
 
-        return regEntry;
+        listenerActors.add(dataChangeListenerPath);
+        final ListenerRegistration<DOMDataTreeChangeListener> delegate = regEntry.getKey();
+        return new ListenerRegistration<DOMDataTreeChangeListener>() {
+            @Override
+            public DOMDataTreeChangeListener getInstance() {
+                return delegate.getInstance();
+            }
+
+            @Override
+            public void close() {
+                listenerActors.remove(dataChangeListenerPath);
+                delegate.close();
+            }
+        };
     }
 
     @Override
index 10ffe1f7b7dd40793820caa078b58be7855a65e1..1ef0e09654eb92fb6fffbb1b1188e5f3fb82e067 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.mdsal.common.api.PostPreCommitStep;
 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,10 +32,12 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
     private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class);
     private final CohortBehaviour<?> idleState = new Idle();
     private final DOMDataTreeCommitCohort cohort;
+    private final YangInstanceIdentifier registeredPath;
     private CohortBehaviour<?> currentState = idleState;
 
-    private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
+    private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
         this.cohort = Preconditions.checkNotNull(cohort);
+        this.registeredPath = Preconditions.checkNotNull(registeredPath);
     }
 
     @Override
@@ -146,7 +149,8 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
             } else if (message instanceof Abort) {
                 return abort();
             }
-            throw new UnsupportedOperationException();
+            throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
+                    message.getClass(), getClass().getSimpleName()));
         }
 
         abstract CohortBehaviour<?> abort();
@@ -268,7 +272,7 @@ final class DataTreeCohortActor extends AbstractUntypedActor {
 
     }
 
-    static Props props(final DOMDataTreeCommitCohort cohort) {
-        return Props.create(DataTreeCohortActor.class, cohort);
+    static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
+        return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
     }
 }
index fb3743d426d1ac4636db0d43b8cef37eb7094b27..e232e4fa849283b1ba0ab710e29f89931f795451 100644 (file)
@@ -15,6 +15,7 @@ import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +49,9 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
 
     private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
 
+    Collection<ActorRef> getCohortActors() {
+        return Collections.unmodifiableCollection(cohortToNode.keySet());
+    }
 
     void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
         takeLock();
index c269312c8dd0aab428ec19e6c3c21e82a6105051..3c5aeaa49f8b9e14731c9bb75d20b3f8ae6677c5 100644 (file)
@@ -42,8 +42,8 @@ public class DataTreeCohortRegistrationProxy<C extends DOMDataTreeCommitCohort>
         super(cohort);
         this.subtree = Preconditions.checkNotNull(subtree);
         this.actorContext = Preconditions.checkNotNull(actorContext);
-        this.actor = actorContext.getActorSystem().actorOf(
-                DataTreeCohortActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+        this.actor = actorContext.getActorSystem().actorOf(DataTreeCohortActor.props(getInstance(),
+                subtree.getRootIdentifier()).withDispatcher(actorContext.getNotificationDispatcherPath()));
     }
 
 
index 8f18cb74dc79c6cd7cb6c08b03640b9df173dcd8..944107039a80450717f503358da6b2a8a4236d7b 100644 (file)
@@ -7,12 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Optional;
 import java.util.EventListener;
-import java.util.Map.Entry;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 abstract class DelayedListenerRegistration<L extends EventListener, R> implements ListenerRegistration<L> {
     private final R registrationMessage;
@@ -34,10 +31,9 @@ abstract class DelayedListenerRegistration<L extends EventListener, R> implement
     }
 
     synchronized <LR extends ListenerRegistration<L>> void createDelegate(
-            final LeaderLocalDelegateFactory<R, LR, Optional<DataTreeCandidate>> factory) {
+            final LeaderLocalDelegateFactory<R, LR> factory) {
         if (!closed) {
-            final Entry<LR, Optional<DataTreeCandidate>> res = factory.createDelegate(registrationMessage);
-            this.delegate = res.getKey();
+            this.delegate = factory.createDelegate(registrationMessage);
         }
     }
 
index 3b9b7adc6b951731e8bc829b44032bebf0de7db7..cd1b548b8e4823735ec27baf9846cdee9b38e285 100644 (file)
@@ -7,15 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.Map.Entry;
-
 /**
  * Base class for factories instantiating delegates.
  *
- * <D> delegate type
- * <M> message type
- * <I> initial state type
+ * @param <M> message type
+ * @param <D> delegate type
  */
-abstract class DelegateFactory<M, D, I> {
-    abstract Entry<D, I> createDelegate(M message);
+abstract class DelegateFactory<M, D> {
+    abstract D createDelegate(M message);
 }
index a925e93566e9dcff73fd33f1dab458dc2028cfdf..d21a4d4ccde36ca529966a1463bbebdd4bddbe59 100644 (file)
@@ -174,8 +174,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
 
         final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
-                new DataTreeChangeListenerProxy<L>(actorContext, listener);
-        listenerRegistrationProxy.init(shardName, treeId);
+                new DataTreeChangeListenerProxy<>(actorContext, listener, treeId);
+        listenerRegistrationProxy.init(shardName);
 
         return listenerRegistrationProxy;
     }
@@ -192,7 +192,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
         LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
 
-        DataTreeCohortRegistrationProxy<C> cohortProxy = new DataTreeCohortRegistrationProxy<C>(actorContext, subtree, cohort);
+        DataTreeCohortRegistrationProxy<C> cohortProxy = new DataTreeCohortRegistrationProxy<>(actorContext, subtree, cohort);
         cohortProxy.init(shardName);
         return cohortProxy;
     }
index 8ce28571ee16b22bca1d1a5a8d76e5c88dd51583..7d6b12babaccf7ecf0126af510bde73b0975f4dc 100644 (file)
@@ -17,11 +17,10 @@ import com.google.common.base.Preconditions;
  * Base class for factories instantiating delegates which are local to the
  * shard leader.
  *
- * <D> delegate type
- * <M> message type
- * <I> initial state type
+ * @param <D> delegate type
+ * @param <M> message type
  */
-abstract class LeaderLocalDelegateFactory<M, D, I> extends DelegateFactory<M, D, I> {
+abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
     private final Shard shard;
 
     protected LeaderLocalDelegateFactory(final Shard shard) {
index a1e506f6ecba7dca921a6df5d7f871237ca25850..c6f05ca70e7e857c1d0b84d0d095cbcf368e71e6 100644 (file)
@@ -44,6 +44,7 @@ import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -59,6 +60,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -647,6 +649,13 @@ public class Shard extends RaftActor {
         store.setRunOnPendingTransactionsComplete(operation);
     }
 
+    @Override
+    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+        return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
+                .dataChangeListenerActors(changeSupport.getListenerActors())
+                .commitCohortActors(store.getCohortActors());
+    }
+
     @Override
     public String persistenceId() {
         return this.name;
index 83cbace1c88bd76f1ea1a55198543fe30f322526..4e9b05ff218d15b52e248e7e67a190bbcc83a669 100644 (file)
@@ -652,6 +652,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
     }
 
+    Collection<ActorRef> getCohortActors() {
+        return cohortRegistry.getCohortActors();
+    }
+
     void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
         cohortRegistry.process(sender, message);
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/OnDemandShardState.java
new file mode 100644 (file)
index 0000000..ba6f229
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+
+/**
+ * Extends OnDemandRaftState to add Shard state.
+ *
+ * @author Thomas Pantelis
+ */
+public class OnDemandShardState extends OnDemandRaftState {
+    private Collection<ActorSelection> treeChangeListenerActors;
+    private Collection<ActorSelection> dataChangeListenerActors;
+    private Collection<ActorRef> commitCohortActors;
+
+    public Collection<ActorSelection> getTreeChangeListenerActors() {
+        return treeChangeListenerActors;
+    }
+
+    public Collection<ActorSelection> getDataChangeListenerActors() {
+        return dataChangeListenerActors;
+    }
+
+    public Collection<ActorRef> getCommitCohortActors() {
+        return commitCohortActors;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder extends AbstractBuilder<Builder> {
+        private final OnDemandShardState state = new OnDemandShardState();
+
+        @Override
+        protected OnDemandRaftState state() {
+            return state;
+        }
+
+        public Builder treeChangeListenerActors(Collection<ActorSelection> actors) {
+            state.treeChangeListenerActors = actors;
+            return self();
+        }
+
+        public Builder dataChangeListenerActors(Collection<ActorSelection> actors) {
+            state.dataChangeListenerActors = actors;
+            return self();
+        }
+
+        public Builder commitCohortActors(Collection<ActorRef> actors) {
+            state.commitCohortActors = actors;
+            return self();
+        }
+    }
+}
index 13b78a01a7a5fb5a1057cec06e646cad304e0b2e..588c08704b1de71a3c57f206a9f50a444f852eea 100644 (file)
@@ -22,6 +22,7 @@ import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.o
 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNodeEntry;
 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.testNodeWithOuter;
+
 import akka.actor.ActorRef;
 import akka.testkit.TestActorRef;
 import org.junit.After;
@@ -154,7 +155,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
     private MockDataChangeListener registerChangeListener(final YangInstanceIdentifier path, final DataChangeScope scope,
             final int expectedEvents, final boolean isLeader) {
         MockDataChangeListener listener = new MockDataChangeListener(expectedEvents);
-        ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener));
+        ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path));
 
         support.onMessage(new RegisterChangeListener(path, dclActor, scope, false), isLeader, true);
         return listener;
index 91cac88b846f8c5d81037081696bef8bdfa47106..580a0ab97bcbec82d7399b2b87526c3b5f767ecb 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+
 import akka.actor.ActorRef;
 import akka.actor.DeadLetter;
 import akka.actor.Props;
@@ -31,7 +33,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
             final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
-            final Props props = DataChangeListener.props(mockListener);
+            final Props props = DataChangeListener.props(mockListener, TEST_PATH);
             final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
 
             // Let the DataChangeListener know that notifications should be enabled
@@ -52,7 +54,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
             final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
-            final Props props = DataChangeListener.props(mockListener);
+            final Props props = DataChangeListener.props(mockListener, TEST_PATH);
             final ActorRef subject =
                 getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
 
@@ -77,7 +79,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
             final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
-            final Props props = DataChangeListener.props(mockListener);
+            final Props props = DataChangeListener.props(mockListener, TEST_PATH);
             final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender");
 
             getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
@@ -112,7 +114,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
             Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataChanged(mockChangeEvent2);
 
-            Props props = DataChangeListener.props(mockListener);
+            Props props = DataChangeListener.props(mockListener, TEST_PATH);
             ActorRef subject = getSystem().actorOf(props, "testDataChangedWithListenerRuntimeEx");
 
             // Let the DataChangeListener know that notifications should be enabled
index 73d520dfc34da0123bf7f2cc2c5b5570a0725613..1a586203a84c937be27f4d3139c56dd3eeef0755 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+
 import akka.actor.ActorRef;
 import akka.actor.DeadLetter;
 import akka.actor.Props;
@@ -30,7 +32,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
             final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
             final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
             final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
-            final Props props = DataTreeChangeListenerActor.props(mockListener);
+            final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
             final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsEnabled");
 
             // Let the DataChangeListener know that notifications should be enabled
@@ -51,7 +53,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
             final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
             final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
             final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
-            final Props props = DataTreeChangeListenerActor.props(mockListener);
+            final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
             final ActorRef subject =
                     getSystem().actorOf(props, "testDataTreeChangedNotificationsDisabled");
 
@@ -76,7 +78,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
             final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
             final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
             final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
-            final Props props = DataTreeChangeListenerActor.props(mockListener);
+            final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
             final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithNoSender");
 
             getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
@@ -113,7 +115,7 @@ public class DataTreeChangeListenerActorTest extends AbstractActorTest {
             final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
             Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataTreeChanged(mockCandidates2);
 
-            Props props = DataTreeChangeListenerActor.props(mockListener);
+            Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
             ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithListenerRuntimeEx");
 
             // Let the DataChangeListener know that notifications should be enabled
index 5dcc39d68d3bbd7f2432216a25d0fe1c301c43a1..395a0973fe1bcb9bb066e4563cdb862b414b7309 100644 (file)
@@ -12,6 +12,7 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
@@ -25,7 +26,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -55,14 +55,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
             ActorContext actorContext = new ActorContext(getSystem(), getRef(),
                     mock(ClusterWrapper.class), mock(Configuration.class));
 
+            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
             final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
-                    new DataTreeChangeListenerProxy<>(actorContext, mockListener);
+                    new DataTreeChangeListenerProxy<>(actorContext, mockListener, path);
 
-            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
             new Thread() {
                 @Override
                 public void run() {
-                    proxy.init("shard-1", path);
+                    proxy.init("shard-1");
                 }
 
             }.start();
@@ -80,7 +80,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
             reply(new RegisterDataTreeChangeListenerReply(getRef()));
 
 
-            for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+            for(int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
             }
 
@@ -111,14 +111,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
 
             ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(ClusteredDOMDataTreeChangeListener.class);
 
+            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
             final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
-                    new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener);
+                        new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path);
 
-            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
             new Thread() {
                 @Override
                 public void run() {
-                    proxy.init("shard-1", path);
+                    proxy.init("shard-1");
                 }
 
             }.start();
@@ -141,14 +141,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
             ActorContext actorContext = new ActorContext(getSystem(), getRef(),
                     mock(ClusterWrapper.class), mock(Configuration.class));
 
-            final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
-                    new DataTreeChangeListenerProxy<>(actorContext, mockListener);
-
             final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+            final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
+                        actorContext, mockListener, path);
+
             new Thread() {
                 @Override
                 public void run() {
-                    proxy.init("shard-1", path);
+                    proxy.init("shard-1");
                 }
 
             }.start();
@@ -169,14 +169,14 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
             ActorContext actorContext = new ActorContext(getSystem(), getRef(),
                     mock(ClusterWrapper.class), mock(Configuration.class));
 
-            final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
-                    new DataTreeChangeListenerProxy<>(actorContext, mockListener);
-
             final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+            final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
+                        actorContext, mockListener, path);
+
             new Thread() {
                 @Override
                 public void run() {
-                    proxy.init("shard-1", path);
+                    proxy.init("shard-1");
                 }
 
             }.start();
@@ -216,7 +216,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
 
             String shardName = "shard-1";
             final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
-                    new DataTreeChangeListenerProxy<>(actorContext, mockListener);
+                    new DataTreeChangeListenerProxy<>(actorContext, mockListener, path);
 
             doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
             doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
@@ -225,7 +225,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
                     any(Object.class), any(Timeout.class));
             doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
 
-            proxy.init("shard-1", path);
+            proxy.init("shard-1");
 
             Assert.assertEquals("getListenerRegistrationActor", null,
                     proxy.getListenerRegistrationActor());
@@ -248,22 +248,18 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
             doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
             doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
 
-            final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
-                    new DataTreeChangeListenerProxy<>(actorContext, mockListener);
+            final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
+                    actorContext, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
 
-
-            Answer<Future<Object>> answer = new Answer<Future<Object>>() {
-                @Override
-                public Future<Object> answer(InvocationOnMock invocation) {
-                    proxy.close();
-                    return Futures.successful((Object)new RegisterDataTreeChangeListenerReply(getRef()));
-                }
+            Answer<Future<Object>> answer = invocation -> {
+                proxy.close();
+                return Futures.successful((Object)new RegisterDataTreeChangeListenerReply(getRef()));
             };
 
             doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
                     any(Object.class), any(Timeout.class));
 
-            proxy.init(shardName, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
+            proxy.init(shardName);
 
             expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class);
 
index a11fc6bb1c4ac004120540d098233c915c3ca78f..7cf125bd5a87ef759071cdf7e21960d700f65d82 100644 (file)
@@ -116,7 +116,7 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
     private MockDataTreeChangeListener registerChangeListener(final YangInstanceIdentifier path,
             final int expectedEvents, final boolean isLeader) {
         MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
-        ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
+        ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, TEST_PATH));
         support.onMessage(new RegisterDataTreeChangeListener(path, dclActor, false), isLeader, true);
         return listener;
     }
index e4ca64ffe812b2422308287945e4106a749a65a5..5c4e35a3a99d60095d6f0e2a6f08d4b81aed350c 100644 (file)
@@ -138,8 +138,8 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
 
             final MockDataChangeListener listener = new MockDataChangeListener(1);
-            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
-                    "testRegisterChangeListener-DataChangeListener");
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
+                    TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
 
             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
@@ -215,16 +215,15 @@ public class ShardTest extends AbstractShardTest {
 
             setupInMemorySnapshotStore();
 
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
             final MockDataChangeListener listener = new MockDataChangeListener(1);
-            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
 
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testRegisterChangeListenerWhenNotLeaderInitially");
 
-            final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
             // Wait until the shard receives the first ElectionTimeout message.
             assertEquals("Got first ElectionTimeout", true,
                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
@@ -264,8 +263,8 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
 
             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
-                    "testRegisterDataTreeChangeListener-DataTreeChangeListener");
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                    TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
 
             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
 
@@ -319,16 +318,15 @@ public class ShardTest extends AbstractShardTest {
 
             setupInMemorySnapshotStore();
 
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
 
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
 
-            final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
             assertEquals("Got first ElectionTimeout", true,
                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
@@ -2064,8 +2062,9 @@ public class ShardTest extends AbstractShardTest {
             dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
             final MockDataChangeListener listener = new MockDataChangeListener(1);
-            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                     actorFactory.generateActorId(testName + "-DataChangeListener"));
 
             setupInMemorySnapshotStore();
@@ -2076,8 +2075,6 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilNoLeader(shard);
 
-            final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                 RegisterChangeListenerReply.class);
@@ -2119,7 +2116,7 @@ public class ShardTest extends AbstractShardTest {
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
             final MockDataChangeListener listener = new MockDataChangeListener(1);
-            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
                     actorFactory.generateActorId(testName + "-DataChangeListener"));
 
             followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
@@ -2141,8 +2138,8 @@ public class ShardTest extends AbstractShardTest {
                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
 
             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
-                    actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+                    TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
             setupInMemorySnapshotStore();
 
@@ -2193,7 +2190,7 @@ public class ShardTest extends AbstractShardTest {
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
-            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
             followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());