}
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+ OnDemandRaftState.AbstractBuilder<?> builder = newOnDemandRaftStateBuilder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog().dataSize())
}
+ protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ return OnDemandRaftState.builder();
+ }
+
private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nonnull;
/**
* The response to a GetOnDemandRaftState message,
private Map<String, String> peerAddresses = Collections.emptyMap();
private Map<String, Boolean> peerVotingStates = Collections.emptyMap();
- private OnDemandRaftState() {
+ protected OnDemandRaftState() {
}
public static Builder builder() {
return customRaftPolicyClassName;
}
- public static class Builder {
- private final OnDemandRaftState stats = new OnDemandRaftState();
+ public abstract static class AbstractBuilder<T extends AbstractBuilder<T>> {
+ @SuppressWarnings("unchecked")
+ protected T self() {
+ return (T) this;
+ }
+
+ @Nonnull
+ protected abstract OnDemandRaftState state();
- public Builder lastLogIndex(long value) {
- stats.lastLogIndex = value;
- return this;
+ public T lastLogIndex(long value) {
+ state().lastLogIndex = value;
+ return self();
}
- public Builder lastLogTerm(long value) {
- stats.lastLogTerm = value;
- return this;
+ public T lastLogTerm(long value) {
+ state().lastLogTerm = value;
+ return self();
}
- public Builder currentTerm(long value) {
- stats.currentTerm = value;
- return this;
+ public T currentTerm(long value) {
+ state().currentTerm = value;
+ return self();
}
- public Builder commitIndex(long value) {
- stats.commitIndex = value;
- return this;
+ public T commitIndex(long value) {
+ state().commitIndex = value;
+ return self();
}
- public Builder lastApplied(long value) {
- stats.lastApplied = value;
- return this;
+ public T lastApplied(long value) {
+ state().lastApplied = value;
+ return self();
}
- public Builder lastIndex(long value) {
- stats.lastIndex = value;
- return this;
+ public T lastIndex(long value) {
+ state().lastIndex = value;
+ return self();
}
- public Builder lastTerm(long value) {
- stats.lastTerm = value;
- return this;
+ public T lastTerm(long value) {
+ state().lastTerm = value;
+ return self();
}
- public Builder snapshotIndex(long value) {
- stats.snapshotIndex = value;
- return this;
+ public T snapshotIndex(long value) {
+ state().snapshotIndex = value;
+ return self();
}
- public Builder snapshotTerm(long value) {
- stats.snapshotTerm = value;
- return this;
+ public T snapshotTerm(long value) {
+ state().snapshotTerm = value;
+ return self();
}
- public Builder replicatedToAllIndex(long value) {
- stats.replicatedToAllIndex = value;
- return this;
+ public T replicatedToAllIndex(long value) {
+ state().replicatedToAllIndex = value;
+ return self();
}
- public Builder inMemoryJournalDataSize(long value) {
- stats.inMemoryJournalDataSize = value;
- return this;
+ public T inMemoryJournalDataSize(long value) {
+ state().inMemoryJournalDataSize = value;
+ return self();
}
- public Builder inMemoryJournalLogSize(long value) {
- stats.inMemoryJournalLogSize = value;
- return this;
+ public T inMemoryJournalLogSize(long value) {
+ state().inMemoryJournalLogSize = value;
+ return self();
}
- public Builder leader(String value) {
- stats.leader = value;
- return this;
+ public T leader(String value) {
+ state().leader = value;
+ return self();
}
- public Builder raftState(String value) {
- stats.raftState = value;
- return this;
+ public T raftState(String value) {
+ state().raftState = value;
+ return self();
}
- public Builder votedFor(String value) {
- stats.votedFor = value;
- return this;
+ public T votedFor(String value) {
+ state().votedFor = value;
+ return self();
}
- public Builder isVoting(boolean isVoting) {
- stats.isVoting = isVoting;
- return this;
+ public T isVoting(boolean isVoting) {
+ state().isVoting = isVoting;
+ return self();
}
- public Builder followerInfoList(List<FollowerInfo> followerInfoList) {
- stats.followerInfoList = followerInfoList;
- return this;
+ public T followerInfoList(List<FollowerInfo> followerInfoList) {
+ state().followerInfoList = followerInfoList;
+ return self();
}
- public Builder peerAddresses(Map<String, String> peerAddresses) {
- stats.peerAddresses = peerAddresses;
- return this;
+ public T peerAddresses(Map<String, String> peerAddresses) {
+ state().peerAddresses = peerAddresses;
+ return self();
}
- public Builder peerVotingStates(Map<String, Boolean> peerVotingStates) {
- stats.peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
- return this;
+ public T peerVotingStates(Map<String, Boolean> peerVotingStates) {
+ state().peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
+ return self();
}
- public Builder isSnapshotCaptureInitiated(boolean value) {
- stats.isSnapshotCaptureInitiated = value;
- return this;
+ public T isSnapshotCaptureInitiated(boolean value) {
+ state().isSnapshotCaptureInitiated = value;
+ return self();
}
- public Builder customRaftPolicyClassName(String className) {
- stats.customRaftPolicyClassName = className;
- return this;
+ public T customRaftPolicyClassName(String className) {
+ state().customRaftPolicyClassName = className;
+ return self();
}
public OnDemandRaftState build() {
- return stats;
+ return state();
+ }
+ }
+
+ public static class Builder extends AbstractBuilder<Builder> {
+ private final OnDemandRaftState state = new OnDemandRaftState();
+
+ @Override
+ protected OnDemandRaftState state() {
+ return state;
}
}
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EventListener;
-import java.util.Map.Entry;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract class AbstractDataListenerSupport<L extends EventListener, R extends ListenerRegistrationMessage,
- D extends DelayedListenerRegistration<L, R>, LR extends ListenerRegistration<L>>
- extends LeaderLocalDelegateFactory<R, LR, Optional<DataTreeCandidate>> {
+abstract class AbstractDataListenerSupport<L extends EventListener, M extends ListenerRegistrationMessage,
+ D extends DelayedListenerRegistration<L, M>, R extends ListenerRegistration<L>>
+ extends LeaderLocalDelegateFactory<M, R> {
private final Logger log = LoggerFactory.getLogger(getClass());
private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
}
@Override
- void onMessage(R message, boolean isLeader, boolean hasLeader) {
+ void onMessage(M message, boolean isLeader, boolean hasLeader) {
log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader);
final ListenerRegistration<L> registration;
- if((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
- final Entry<LR, Optional<DataTreeCandidate>> res = createDelegate(message);
- registration = res.getKey();
+ if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
+ registration = createDelegate(message);
} else {
log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
actors.add(actor);
}
- protected abstract D newDelayedListenerRegistration(R message);
+ protected abstract D newDelayedListenerRegistration(M message);
protected abstract ActorRef newRegistrationActor(ListenerRegistration<L> registration);
package org.opendaylight.controller.cluster.datastore;
import akka.actor.Props;
-import akka.japi.Creator;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
private static final Logger LOG = LoggerFactory.getLogger(DataChangeListener.class);
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+ private final YangInstanceIdentifier registeredPath;
private boolean notificationsEnabled = false;
- public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
+ public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+ final YangInstanceIdentifier registeredPath) {
this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
+ this.registeredPath = Preconditions.checkNotNull(registeredPath);
}
@Override
}
}
- public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>> listener) {
- return Props.create(new DataChangeListenerCreator(listener));
- }
-
- private static class DataChangeListenerCreator implements Creator<DataChangeListener> {
- private static final long serialVersionUID = 1L;
-
- final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-
- DataChangeListenerCreator(
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
- this.listener = listener;
- }
-
- @Override
- public DataChangeListener create() throws Exception {
- return new DataChangeListener(listener);
- }
+ public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+ final YangInstanceIdentifier registeredPath) {
+ return Props.create(DataChangeListener.class, listener, registeredPath);
}
}
import akka.actor.PoisonPill;
import akka.dispatch.OnComplete;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
- private volatile ActorSelection listenerRegistrationActor;
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
- private ActorRef dataChangeListenerActor;
private final String shardName;
private final ActorContext actorContext;
+ private ActorRef dataChangeListenerActor;
+ private volatile ActorSelection listenerRegistrationActor;
private boolean closed = false;
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
DataChangeListenerRegistrationProxy (
String shardName, ActorContext actorContext, L listener) {
- this.shardName = shardName;
- this.actorContext = actorContext;
- this.listener = listener;
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.listener = Preconditions.checkNotNull(listener);
}
@VisibleForTesting
public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
dataChangeListenerActor = actorContext.getActorSystem().actorOf(
- DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
+ DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
findFuture.onComplete(new OnComplete<ActorRef>() {
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map.Entry;
+import java.util.Set;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
DelayedDataChangeListenerRegistration, DataChangeListenerRegistration<
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
+ private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+
DataChangeListenerSupport(final Shard shard) {
super(shard);
}
+ Collection<ActorSelection> getListenerActors() {
+ return Collections.unmodifiableCollection(listenerActors);
+ }
+
@Override
- Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
- Optional<DataTreeCandidate>> createDelegate(final RegisterChangeListener message) {
- ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
+ DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ createDelegate(final RegisterChangeListener message) {
+ final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
// Notify the listener if notifications should be enabled or not
// If this shard is the leader then it will enable notifications else
getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue());
- return regEntry;
+ listenerActors.add(dataChangeListenerPath);
+ final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ delegate = regEntry.getKey();
+ return new DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?,?>>>() {
+ @Override
+ public void close() {
+ listenerActors.remove(dataChangeListenerPath);
+ delegate.close();
+ }
+
+ @Override
+ public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+ return delegate.getInstance();
+ }
+
+ @Override
+ public YangInstanceIdentifier getPath() {
+ return delegate.getPath();
+ }
+
+ @Override
+ public DataChangeScope getScope() {
+ return delegate.getScope();
+ }
+ };
}
@Override
package org.opendaylight.controller.cluster.datastore;
import akka.actor.Props;
-import akka.japi.Creator;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class DataTreeChangeListenerActor extends AbstractUntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerActor.class);
private final DOMDataTreeChangeListener listener;
+ private final YangInstanceIdentifier registeredPath;
private boolean notificationsEnabled = false;
- private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) {
+ private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener,
+ final YangInstanceIdentifier registeredPath) {
this.listener = Preconditions.checkNotNull(listener);
+ this.registeredPath = Preconditions.checkNotNull(registeredPath);
}
@Override
listener);
}
- public static Props props(final DOMDataTreeChangeListener listener) {
- return Props.create(new DataTreeChangeListenerCreator(listener));
- }
-
- private static final class DataTreeChangeListenerCreator implements Creator<DataTreeChangeListenerActor> {
- private static final long serialVersionUID = 1L;
- private final DOMDataTreeChangeListener listener;
-
- DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) {
- this.listener = Preconditions.checkNotNull(listener);
- }
-
- @Override
- public DataTreeChangeListenerActor create() {
- return new DataTreeChangeListenerActor(listener);
- }
+ public static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) {
+ return Props.create(DataTreeChangeListenerActor.class, listener, registeredPath);
}
}
private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
private final ActorRef dataChangeListenerActor;
private final ActorContext actorContext;
+ private final YangInstanceIdentifier registeredPath;
@GuardedBy("this")
private ActorSelection listenerRegistrationActor;
- public DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
+ DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener,
+ final YangInstanceIdentifier registeredPath) {
super(listener);
this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.registeredPath = Preconditions.checkNotNull(registeredPath);
this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
- DataTreeChangeListenerActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+ DataTreeChangeListenerActor.props(getInstance(), registeredPath)
+ .withDispatcher(actorContext.getNotificationDispatcherPath()));
}
@Override
dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
- void init(final String shardName, final YangInstanceIdentifier treeId) {
+ void init(final String shardName) {
Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
findFuture.onComplete(new OnComplete<ActorRef>() {
@Override
public void onComplete(final Throwable failure, final ActorRef shard) {
if (failure instanceof LocalShardNotFoundException) {
- LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " +
- "cannot be registered", shardName, getInstance(), treeId);
+ LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
+ + "cannot be registered", shardName, getInstance(), registeredPath);
} else if (failure != null) {
- LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " +
- "cannot be registered: {}", shardName, getInstance(), treeId, failure);
+ LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
+ + "cannot be registered: {}", shardName, getInstance(), registeredPath, failure);
} else {
- doRegistration(shard, treeId);
+ doRegistration(shard);
}
}
}, actorContext.getClientDispatcher());
actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
}
- private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
+ private void doRegistration(final ActorRef shard) {
Future<Object> future = actorContext.executeOperationAsync(shard,
- new RegisterDataTreeChangeListener(path, dataChangeListenerActor,
+ new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor,
getInstance() instanceof ClusteredDOMDataTreeChangeListener),
actorContext.getDatastoreContext().getShardInitializationTimeout());
public void onComplete(final Throwable failure, final Object result) {
if (failure != null) {
LOG.error("Failed to register DataTreeChangeListener {} at path {}",
- getInstance(), path.toString(), failure);
+ getInstance(), registeredPath, failure);
} else {
RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
setListenerRegistrationActor(actorContext.actorSelection(
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map.Entry;
+import java.util.Set;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration, ListenerRegistration<DOMDataTreeChangeListener>> {
+
+ private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+
DataTreeChangeListenerSupport(final Shard shard) {
super(shard);
}
+ Collection<ActorSelection> getListenerActors() {
+ return Collections.unmodifiableCollection(listenerActors);
+ }
+
@Override
- Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> createDelegate(
+ ListenerRegistration<DOMDataTreeChangeListener> createDelegate(
final RegisterDataTreeChangeListener message) {
- ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
+ final ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
// Notify the listener if notifications should be enabled or not
// If this shard is the leader then it will enable notifications else
getShard().getDataStore().notifyOfInitialData(message.getPath(),
regEntry.getKey().getInstance(), regEntry.getValue());
- return regEntry;
+ listenerActors.add(dataChangeListenerPath);
+ final ListenerRegistration<DOMDataTreeChangeListener> delegate = regEntry.getKey();
+ return new ListenerRegistration<DOMDataTreeChangeListener>() {
+ @Override
+ public DOMDataTreeChangeListener getInstance() {
+ return delegate.getInstance();
+ }
+
+ @Override
+ public void close() {
+ listenerActors.remove(dataChangeListenerPath);
+ delegate.close();
+ }
+ };
}
@Override
import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class);
private final CohortBehaviour<?> idleState = new Idle();
private final DOMDataTreeCommitCohort cohort;
+ private final YangInstanceIdentifier registeredPath;
private CohortBehaviour<?> currentState = idleState;
- private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
+ private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
this.cohort = Preconditions.checkNotNull(cohort);
+ this.registeredPath = Preconditions.checkNotNull(registeredPath);
}
@Override
} else if (message instanceof Abort) {
return abort();
}
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
+ message.getClass(), getClass().getSimpleName()));
}
abstract CohortBehaviour<?> abort();
}
- static Props props(final DOMDataTreeCommitCohort cohort) {
- return Props.create(DataTreeCohortActor.class, cohort);
+ static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
+ return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
}
}
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
+ Collection<ActorRef> getCohortActors() {
+ return Collections.unmodifiableCollection(cohortToNode.keySet());
+ }
void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
takeLock();
super(cohort);
this.subtree = Preconditions.checkNotNull(subtree);
this.actorContext = Preconditions.checkNotNull(actorContext);
- this.actor = actorContext.getActorSystem().actorOf(
- DataTreeCohortActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+ this.actor = actorContext.getActorSystem().actorOf(DataTreeCohortActor.props(getInstance(),
+ subtree.getRootIdentifier()).withDispatcher(actorContext.getNotificationDispatcherPath()));
}
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Optional;
import java.util.EventListener;
-import java.util.Map.Entry;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
abstract class DelayedListenerRegistration<L extends EventListener, R> implements ListenerRegistration<L> {
private final R registrationMessage;
}
synchronized <LR extends ListenerRegistration<L>> void createDelegate(
- final LeaderLocalDelegateFactory<R, LR, Optional<DataTreeCandidate>> factory) {
+ final LeaderLocalDelegateFactory<R, LR> factory) {
if (!closed) {
- final Entry<LR, Optional<DataTreeCandidate>> res = factory.createDelegate(registrationMessage);
- this.delegate = res.getKey();
+ this.delegate = factory.createDelegate(registrationMessage);
}
}
*/
package org.opendaylight.controller.cluster.datastore;
-import java.util.Map.Entry;
-
/**
* Base class for factories instantiating delegates.
*
- * <D> delegate type
- * <M> message type
- * <I> initial state type
+ * @param <M> message type
+ * @param <D> delegate type
*/
-abstract class DelegateFactory<M, D, I> {
- abstract Entry<D, I> createDelegate(M message);
+abstract class DelegateFactory<M, D> {
+ abstract D createDelegate(M message);
}
LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
- new DataTreeChangeListenerProxy<L>(actorContext, listener);
- listenerRegistrationProxy.init(shardName, treeId);
+ new DataTreeChangeListenerProxy<>(actorContext, listener, treeId);
+ listenerRegistrationProxy.init(shardName);
return listenerRegistrationProxy;
}
final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
- DataTreeCohortRegistrationProxy<C> cohortProxy = new DataTreeCohortRegistrationProxy<C>(actorContext, subtree, cohort);
+ DataTreeCohortRegistrationProxy<C> cohortProxy = new DataTreeCohortRegistrationProxy<>(actorContext, subtree, cohort);
cohortProxy.init(shardName);
return cohortProxy;
}
* Base class for factories instantiating delegates which are local to the
* shard leader.
*
- * <D> delegate type
- * <M> message type
- * <I> initial state type
+ * @param <D> delegate type
+ * @param <M> message type
*/
-abstract class LeaderLocalDelegateFactory<M, D, I> extends DelegateFactory<M, D, I> {
+abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
private final Shard shard;
protected LeaderLocalDelegateFactory(final Shard shard) {
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
store.setRunOnPendingTransactionsComplete(operation);
}
+ @Override
+ protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
+ .dataChangeListenerActors(changeSupport.getListenerActors())
+ .commitCohortActors(store.getCohortActors());
+ }
+
@Override
public String persistenceId() {
return this.name;
LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
}
+ Collection<ActorRef> getCohortActors() {
+ return cohortRegistry.getCohortActors();
+ }
+
void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
cohortRegistry.process(sender, message);
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+
+/**
+ * Extends OnDemandRaftState to add Shard state.
+ *
+ * @author Thomas Pantelis
+ */
+public class OnDemandShardState extends OnDemandRaftState {
+ private Collection<ActorSelection> treeChangeListenerActors;
+ private Collection<ActorSelection> dataChangeListenerActors;
+ private Collection<ActorRef> commitCohortActors;
+
+ public Collection<ActorSelection> getTreeChangeListenerActors() {
+ return treeChangeListenerActors;
+ }
+
+ public Collection<ActorSelection> getDataChangeListenerActors() {
+ return dataChangeListenerActors;
+ }
+
+ public Collection<ActorRef> getCommitCohortActors() {
+ return commitCohortActors;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder extends AbstractBuilder<Builder> {
+ private final OnDemandShardState state = new OnDemandShardState();
+
+ @Override
+ protected OnDemandRaftState state() {
+ return state;
+ }
+
+ public Builder treeChangeListenerActors(Collection<ActorSelection> actors) {
+ state.treeChangeListenerActors = actors;
+ return self();
+ }
+
+ public Builder dataChangeListenerActors(Collection<ActorSelection> actors) {
+ state.dataChangeListenerActors = actors;
+ return self();
+ }
+
+ public Builder commitCohortActors(Collection<ActorRef> actors) {
+ state.commitCohortActors = actors;
+ return self();
+ }
+ }
+}
import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNodeEntry;
import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.testNodeWithOuter;
+
import akka.actor.ActorRef;
import akka.testkit.TestActorRef;
import org.junit.After;
private MockDataChangeListener registerChangeListener(final YangInstanceIdentifier path, final DataChangeScope scope,
final int expectedEvents, final boolean isLeader) {
MockDataChangeListener listener = new MockDataChangeListener(expectedEvents);
- ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener));
+ ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path));
support.onMessage(new RegisterChangeListener(path, dclActor, scope, false), isLeader, true);
return listener;
package org.opendaylight.controller.cluster.datastore;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+
import akka.actor.ActorRef;
import akka.actor.DeadLetter;
import akka.actor.Props;
new JavaTestKit(getSystem()) {{
final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
- final Props props = DataChangeListener.props(mockListener);
+ final Props props = DataChangeListener.props(mockListener, TEST_PATH);
final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
// Let the DataChangeListener know that notifications should be enabled
new JavaTestKit(getSystem()) {{
final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
- final Props props = DataChangeListener.props(mockListener);
+ final Props props = DataChangeListener.props(mockListener, TEST_PATH);
final ActorRef subject =
getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
new JavaTestKit(getSystem()) {{
final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
- final Props props = DataChangeListener.props(mockListener);
+ final Props props = DataChangeListener.props(mockListener, TEST_PATH);
final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender");
getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataChanged(mockChangeEvent2);
- Props props = DataChangeListener.props(mockListener);
+ Props props = DataChangeListener.props(mockListener, TEST_PATH);
ActorRef subject = getSystem().actorOf(props, "testDataChangedWithListenerRuntimeEx");
// Let the DataChangeListener know that notifications should be enabled
*/
package org.opendaylight.controller.cluster.datastore;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+
import akka.actor.ActorRef;
import akka.actor.DeadLetter;
import akka.actor.Props;
final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
- final Props props = DataTreeChangeListenerActor.props(mockListener);
+ final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsEnabled");
// Let the DataChangeListener know that notifications should be enabled
final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
- final Props props = DataTreeChangeListenerActor.props(mockListener);
+ final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
final ActorRef subject =
getSystem().actorOf(props, "testDataTreeChangedNotificationsDisabled");
final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
- final Props props = DataTreeChangeListenerActor.props(mockListener);
+ final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithNoSender");
getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataTreeChanged(mockCandidates2);
- Props props = DataTreeChangeListenerActor.props(mockListener);
+ Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithListenerRuntimeEx");
// Let the DataChangeListener know that notifications should be enabled
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
ActorContext actorContext = new ActorContext(getSystem(), getRef(),
mock(ClusterWrapper.class), mock(Configuration.class));
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
- new DataTreeChangeListenerProxy<>(actorContext, mockListener);
+ new DataTreeChangeListenerProxy<>(actorContext, mockListener, path);
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
new Thread() {
@Override
public void run() {
- proxy.init("shard-1", path);
+ proxy.init("shard-1");
}
}.start();
reply(new RegisterDataTreeChangeListenerReply(getRef()));
- for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+ for(int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(ClusteredDOMDataTreeChangeListener.class);
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
- new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener);
+ new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path);
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
new Thread() {
@Override
public void run() {
- proxy.init("shard-1", path);
+ proxy.init("shard-1");
}
}.start();
ActorContext actorContext = new ActorContext(getSystem(), getRef(),
mock(ClusterWrapper.class), mock(Configuration.class));
- final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
- new DataTreeChangeListenerProxy<>(actorContext, mockListener);
-
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
+ actorContext, mockListener, path);
+
new Thread() {
@Override
public void run() {
- proxy.init("shard-1", path);
+ proxy.init("shard-1");
}
}.start();
ActorContext actorContext = new ActorContext(getSystem(), getRef(),
mock(ClusterWrapper.class), mock(Configuration.class));
- final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
- new DataTreeChangeListenerProxy<>(actorContext, mockListener);
-
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
+ actorContext, mockListener, path);
+
new Thread() {
@Override
public void run() {
- proxy.init("shard-1", path);
+ proxy.init("shard-1");
}
}.start();
String shardName = "shard-1";
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
- new DataTreeChangeListenerProxy<>(actorContext, mockListener);
+ new DataTreeChangeListenerProxy<>(actorContext, mockListener, path);
doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
any(Object.class), any(Timeout.class));
doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
- proxy.init("shard-1", path);
+ proxy.init("shard-1");
Assert.assertEquals("getListenerRegistrationActor", null,
proxy.getListenerRegistrationActor());
doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
- final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
- new DataTreeChangeListenerProxy<>(actorContext, mockListener);
+ final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
+ actorContext, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
-
- Answer<Future<Object>> answer = new Answer<Future<Object>>() {
- @Override
- public Future<Object> answer(InvocationOnMock invocation) {
- proxy.close();
- return Futures.successful((Object)new RegisterDataTreeChangeListenerReply(getRef()));
- }
+ Answer<Future<Object>> answer = invocation -> {
+ proxy.close();
+ return Futures.successful((Object)new RegisterDataTreeChangeListenerReply(getRef()));
};
doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
any(Object.class), any(Timeout.class));
- proxy.init(shardName, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
+ proxy.init(shardName);
expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class);
private MockDataTreeChangeListener registerChangeListener(final YangInstanceIdentifier path,
final int expectedEvents, final boolean isLeader) {
MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
- ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
+ ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, TEST_PATH));
support.onMessage(new RegisterDataTreeChangeListener(path, dclActor, false), isLeader, true);
return listener;
}
shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
- "testRegisterChangeListener-DataChangeListener");
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
+ TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
setupInMemorySnapshotStore();
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
"testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testRegisterChangeListenerWhenNotLeaderInitially");
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
// Wait until the shard receives the first ElectionTimeout message.
assertEquals("Got first ElectionTimeout", true,
onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
- "testRegisterDataTreeChangeListener-DataTreeChangeListener");
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+ TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
setupInMemorySnapshotStore();
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
assertEquals("Got first ElectionTimeout", true,
onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
actorFactory.generateActorId(testName + "-DataChangeListener"));
setupInMemorySnapshotStore();
waitUntilNoLeader(shard);
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
RegisterChangeListenerReply.class);
final YangInstanceIdentifier path = TestModel.TEST_PATH;
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
actorFactory.generateActorId(testName + "-DataChangeListener"));
followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
- actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+ TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
setupInMemorySnapshotStore();
final YangInstanceIdentifier path = TestModel.TEST_PATH;
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());