}
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+ OnDemandRaftState.AbstractBuilder<?> builder = newOnDemandRaftStateBuilder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog().dataSize())
}
+ protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ return OnDemandRaftState.builder();
+ }
+
private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nonnull;
/**
* The response to a GetOnDemandRaftState message.
private Map<String, String> peerAddresses = Collections.emptyMap();
private Map<String, Boolean> peerVotingStates = Collections.emptyMap();
- private OnDemandRaftState() {
+ protected OnDemandRaftState() {
}
public static Builder builder() {
return customRaftPolicyClassName;
}
- public static class Builder {
- private final OnDemandRaftState stats = new OnDemandRaftState();
+ public abstract static class AbstractBuilder<T extends AbstractBuilder<T>> {
+ @SuppressWarnings("unchecked")
+ protected T self() {
+ return (T) this;
+ }
+
+ @Nonnull
+ protected abstract OnDemandRaftState state();
- public Builder lastLogIndex(long value) {
- stats.lastLogIndex = value;
- return this;
+ public T lastLogIndex(long value) {
+ state().lastLogIndex = value;
+ return self();
}
- public Builder lastLogTerm(long value) {
- stats.lastLogTerm = value;
- return this;
+ public T lastLogTerm(long value) {
+ state().lastLogTerm = value;
+ return self();
}
- public Builder currentTerm(long value) {
- stats.currentTerm = value;
- return this;
+ public T currentTerm(long value) {
+ state().currentTerm = value;
+ return self();
}
- public Builder commitIndex(long value) {
- stats.commitIndex = value;
- return this;
+ public T commitIndex(long value) {
+ state().commitIndex = value;
+ return self();
}
- public Builder lastApplied(long value) {
- stats.lastApplied = value;
- return this;
+ public T lastApplied(long value) {
+ state().lastApplied = value;
+ return self();
}
- public Builder lastIndex(long value) {
- stats.lastIndex = value;
- return this;
+ public T lastIndex(long value) {
+ state().lastIndex = value;
+ return self();
}
- public Builder lastTerm(long value) {
- stats.lastTerm = value;
- return this;
+ public T lastTerm(long value) {
+ state().lastTerm = value;
+ return self();
}
- public Builder snapshotIndex(long value) {
- stats.snapshotIndex = value;
- return this;
+ public T snapshotIndex(long value) {
+ state().snapshotIndex = value;
+ return self();
}
- public Builder snapshotTerm(long value) {
- stats.snapshotTerm = value;
- return this;
+ public T snapshotTerm(long value) {
+ state().snapshotTerm = value;
+ return self();
}
- public Builder replicatedToAllIndex(long value) {
- stats.replicatedToAllIndex = value;
- return this;
+ public T replicatedToAllIndex(long value) {
+ state().replicatedToAllIndex = value;
+ return self();
}
- public Builder inMemoryJournalDataSize(long value) {
- stats.inMemoryJournalDataSize = value;
- return this;
+ public T inMemoryJournalDataSize(long value) {
+ state().inMemoryJournalDataSize = value;
+ return self();
}
- public Builder inMemoryJournalLogSize(long value) {
- stats.inMemoryJournalLogSize = value;
- return this;
+ public T inMemoryJournalLogSize(long value) {
+ state().inMemoryJournalLogSize = value;
+ return self();
}
- public Builder leader(String value) {
- stats.leader = value;
- return this;
+ public T leader(String value) {
+ state().leader = value;
+ return self();
}
- public Builder raftState(String value) {
- stats.raftState = value;
- return this;
+ public T raftState(String value) {
+ state().raftState = value;
+ return self();
}
- public Builder votedFor(String value) {
- stats.votedFor = value;
- return this;
+ public T votedFor(String value) {
+ state().votedFor = value;
+ return self();
}
- public Builder isVoting(boolean isVoting) {
- stats.isVoting = isVoting;
- return this;
+ public T isVoting(boolean isVoting) {
+ state().isVoting = isVoting;
+ return self();
}
- public Builder followerInfoList(List<FollowerInfo> followerInfoList) {
- stats.followerInfoList = followerInfoList;
- return this;
+ public T followerInfoList(List<FollowerInfo> followerInfoList) {
+ state().followerInfoList = followerInfoList;
+ return self();
}
- public Builder peerAddresses(Map<String, String> peerAddresses) {
- stats.peerAddresses = peerAddresses;
- return this;
+ public T peerAddresses(Map<String, String> peerAddresses) {
+ state().peerAddresses = peerAddresses;
+ return self();
}
- public Builder peerVotingStates(Map<String, Boolean> peerVotingStates) {
- stats.peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
- return this;
+ public T peerVotingStates(Map<String, Boolean> peerVotingStates) {
+ state().peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
+ return self();
}
- public Builder isSnapshotCaptureInitiated(boolean value) {
- stats.isSnapshotCaptureInitiated = value;
- return this;
+ public T isSnapshotCaptureInitiated(boolean value) {
+ state().isSnapshotCaptureInitiated = value;
+ return self();
}
- public Builder customRaftPolicyClassName(String className) {
- stats.customRaftPolicyClassName = className;
- return this;
+ public T customRaftPolicyClassName(String className) {
+ state().customRaftPolicyClassName = className;
+ return self();
}
public OnDemandRaftState build() {
- return stats;
+ return state();
+ }
+ }
+
+ public static class Builder extends AbstractBuilder<Builder> {
+ private final OnDemandRaftState state = new OnDemandRaftState();
+
+ @Override
+ protected OnDemandRaftState state() {
+ return state;
}
}
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EventListener;
-import java.util.Map.Entry;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractDataListenerSupport<L extends EventListener, M extends ListenerRegistrationMessage,
D extends DelayedListenerRegistration<L, M>, R extends ListenerRegistration<L>>
- extends LeaderLocalDelegateFactory<M, R, Optional<DataTreeCandidate>> {
+ extends LeaderLocalDelegateFactory<M, R> {
private final Logger log = LoggerFactory.getLogger(getClass());
private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
final ListenerRegistration<L> registration;
if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
- final Entry<R, Optional<DataTreeCandidate>> res = createDelegate(message);
- registration = res.getKey();
+ registration = createDelegate(message);
} else {
log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
- new DataTreeChangeListenerProxy<>(actorContext, listener);
- listenerRegistrationProxy.init(shardName, treeId);
+ new DataTreeChangeListenerProxy<>(actorContext, listener, treeId);
+ listenerRegistrationProxy.init(shardName);
return listenerRegistrationProxy;
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.Props;
-import akka.japi.Creator;
import com.google.common.base.Preconditions;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
@Deprecated
public class DataChangeListener extends AbstractUntypedActor {
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+ private final YangInstanceIdentifier registeredPath;
private boolean notificationsEnabled = false;
- public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
+ public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+ final YangInstanceIdentifier registeredPath) {
this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
+ this.registeredPath = Preconditions.checkNotNull(registeredPath);
}
@Override
}
}
- public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>> listener) {
- return Props.create(new DataChangeListenerCreator(listener));
- }
-
- private static class DataChangeListenerCreator implements Creator<DataChangeListener> {
- private static final long serialVersionUID = 1L;
-
- @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
- + "create remote instances of this actor and thus don't need it to be Serializable.")
- final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-
- DataChangeListenerCreator(
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
- this.listener = listener;
- }
-
- @Override
- public DataChangeListener create() throws Exception {
- return new DataChangeListener(listener);
- }
+ public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+ final YangInstanceIdentifier registeredPath) {
+ return Props.create(DataChangeListener.class, listener, registeredPath);
}
}
import akka.actor.PoisonPill;
import akka.dispatch.OnComplete;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
- private volatile ActorSelection listenerRegistrationActor;
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
- private ActorRef dataChangeListenerActor;
private final String shardName;
private final ActorContext actorContext;
+ private ActorRef dataChangeListenerActor;
+ private volatile ActorSelection listenerRegistrationActor;
private boolean closed = false;
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
DataChangeListenerRegistrationProxy(String shardName, ActorContext actorContext, L listener) {
- this.shardName = shardName;
- this.actorContext = actorContext;
- this.listener = listener;
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.listener = Preconditions.checkNotNull(listener);
}
@VisibleForTesting
public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
dataChangeListenerActor = actorContext.getActorSystem().actorOf(
- DataChangeListener.props(listener).withDispatcher(actorContext.getNotificationDispatcherPath()));
+ DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
findFuture.onComplete(new OnComplete<ActorRef>() {
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map.Entry;
+import java.util.Set;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
DelayedDataChangeListenerRegistration, DataChangeListenerRegistration<
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
+ private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+
DataChangeListenerSupport(final Shard shard) {
super(shard);
}
+ Collection<ActorSelection> getListenerActors() {
+ return Collections.unmodifiableCollection(listenerActors);
+ }
+
@Override
- Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
- Optional<DataTreeCandidate>> createDelegate(final RegisterChangeListener message) {
- ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
+ DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ createDelegate(final RegisterChangeListener message) {
+ final ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
// Notify the listener if notifications should be enabled or not
// If this shard is the leader then it will enable notifications else
getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue());
- return regEntry;
+ listenerActors.add(dataChangeListenerPath);
+ final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ delegate = regEntry.getKey();
+ return new DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?,?>>>() {
+ @Override
+ public void close() {
+ listenerActors.remove(dataChangeListenerPath);
+ delegate.close();
+ }
+
+ @Override
+ public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+ return delegate.getInstance();
+ }
+
+ @Override
+ public YangInstanceIdentifier getPath() {
+ return delegate.getPath();
+ }
+
+ @Override
+ public DataChangeScope getScope() {
+ return delegate.getScope();
+ }
+ };
}
@Override
package org.opendaylight.controller.cluster.datastore;
import akka.actor.Props;
-import akka.japi.Creator;
import com.google.common.base.Preconditions;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
/**
* Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating
*/
final class DataTreeChangeListenerActor extends AbstractUntypedActor {
private final DOMDataTreeChangeListener listener;
+ private final YangInstanceIdentifier registeredPath;
private boolean notificationsEnabled = false;
- private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) {
+ private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener,
+ final YangInstanceIdentifier registeredPath) {
this.listener = Preconditions.checkNotNull(listener);
+ this.registeredPath = Preconditions.checkNotNull(registeredPath);
}
@Override
listener);
}
- public static Props props(final DOMDataTreeChangeListener listener) {
- return Props.create(new DataTreeChangeListenerCreator(listener));
- }
-
- private static final class DataTreeChangeListenerCreator implements Creator<DataTreeChangeListenerActor> {
- private static final long serialVersionUID = 1L;
-
- @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but we don't "
- + "create remote instances of this actor and thus don't need it to be Serializable.")
- private final DOMDataTreeChangeListener listener;
-
- DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) {
- this.listener = Preconditions.checkNotNull(listener);
- }
-
- @Override
- public DataTreeChangeListenerActor create() {
- return new DataTreeChangeListenerActor(listener);
- }
+ public static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) {
+ return Props.create(DataTreeChangeListenerActor.class, listener, registeredPath);
}
}
private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
private final ActorRef dataChangeListenerActor;
private final ActorContext actorContext;
+ private final YangInstanceIdentifier registeredPath;
@GuardedBy("this")
private ActorSelection listenerRegistrationActor;
- DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
+ DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener,
+ final YangInstanceIdentifier registeredPath) {
super(listener);
this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.registeredPath = Preconditions.checkNotNull(registeredPath);
this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
- DataTreeChangeListenerActor.props(getInstance())
+ DataTreeChangeListenerActor.props(getInstance(), registeredPath)
.withDispatcher(actorContext.getNotificationDispatcherPath()));
}
dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
- void init(final String shardName, final YangInstanceIdentifier treeId) {
+ void init(final String shardName) {
Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
findFuture.onComplete(new OnComplete<ActorRef>() {
@Override
public void onComplete(final Throwable failure, final ActorRef shard) {
if (failure instanceof LocalShardNotFoundException) {
LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
- + "cannot be registered", shardName, getInstance(), treeId);
+ + "cannot be registered", shardName, getInstance(), registeredPath);
} else if (failure != null) {
LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
- + "cannot be registered: {}", shardName, getInstance(), treeId, failure);
+ + "cannot be registered: {}", shardName, getInstance(), registeredPath, failure);
} else {
- doRegistration(shard, treeId);
+ doRegistration(shard);
}
}
}, actorContext.getClientDispatcher());
actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
}
- private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
+ private void doRegistration(final ActorRef shard) {
Future<Object> future = actorContext.executeOperationAsync(shard,
- new RegisterDataTreeChangeListener(path, dataChangeListenerActor,
+ new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor,
getInstance() instanceof ClusteredDOMDataTreeChangeListener),
actorContext.getDatastoreContext().getShardInitializationTimeout());
public void onComplete(final Throwable failure, final Object result) {
if (failure != null) {
LOG.error("Failed to register DataTreeChangeListener {} at path {}",
- getInstance(), path.toString(), failure);
+ getInstance(), registeredPath, failure);
} else {
RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
setListenerRegistrationActor(actorContext.actorSelection(
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map.Entry;
+import java.util.Set;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration,
ListenerRegistration<DOMDataTreeChangeListener>> {
+
+ private final Set<ActorSelection> listenerActors = Sets.newConcurrentHashSet();
+
DataTreeChangeListenerSupport(final Shard shard) {
super(shard);
}
+ Collection<ActorSelection> getListenerActors() {
+ return Collections.unmodifiableCollection(listenerActors);
+ }
+
@Override
- Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> createDelegate(
+ ListenerRegistration<DOMDataTreeChangeListener> createDelegate(
final RegisterDataTreeChangeListener message) {
- ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
+ final ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
// Notify the listener if notifications should be enabled or not
// If this shard is the leader then it will enable notifications else
getShard().getDataStore().notifyOfInitialData(message.getPath(),
regEntry.getKey().getInstance(), regEntry.getValue());
- return regEntry;
+ listenerActors.add(dataChangeListenerPath);
+ final ListenerRegistration<DOMDataTreeChangeListener> delegate = regEntry.getKey();
+ return new ListenerRegistration<DOMDataTreeChangeListener>() {
+ @Override
+ public DOMDataTreeChangeListener getInstance() {
+ return delegate.getInstance();
+ }
+
+ @Override
+ public void close() {
+ listenerActors.remove(dataChangeListenerPath);
+ delegate.close();
+ }
+ };
}
@Override
import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
final class DataTreeCohortActor extends AbstractUntypedActor {
private final CohortBehaviour<?> idleState = new Idle();
private final DOMDataTreeCommitCohort cohort;
+ private final YangInstanceIdentifier registeredPath;
private CohortBehaviour<?> currentState = idleState;
- private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
+ private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
this.cohort = Preconditions.checkNotNull(cohort);
+ this.registeredPath = Preconditions.checkNotNull(registeredPath);
}
@Override
} else if (message instanceof Abort) {
return abort();
}
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
+ message.getClass(), getClass().getSimpleName()));
}
abstract CohortBehaviour<?> abort();
}
- static Props props(final DOMDataTreeCommitCohort cohort) {
- return Props.create(DataTreeCohortActor.class, cohort);
+ static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
+ return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
}
}
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
+ Collection<ActorRef> getCohortActors() {
+ return Collections.unmodifiableCollection(cohortToNode.keySet());
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
takeLock();
super(cohort);
this.subtree = Preconditions.checkNotNull(subtree);
this.actorContext = Preconditions.checkNotNull(actorContext);
- this.actor = actorContext.getActorSystem().actorOf(
- DataTreeCohortActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+ this.actor = actorContext.getActorSystem().actorOf(DataTreeCohortActor.props(getInstance(),
+ subtree.getRootIdentifier()).withDispatcher(actorContext.getNotificationDispatcherPath()));
}
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Optional;
import java.util.EventListener;
-import java.util.Map.Entry;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
abstract class DelayedListenerRegistration<L extends EventListener, M> implements ListenerRegistration<L> {
private final M registrationMessage;
}
synchronized <R extends ListenerRegistration<L>> void createDelegate(
- final LeaderLocalDelegateFactory<M, R, Optional<DataTreeCandidate>> factory) {
+ final LeaderLocalDelegateFactory<M, R> factory) {
if (!closed) {
- final Entry<R, Optional<DataTreeCandidate>> res = factory.createDelegate(registrationMessage);
- this.delegate = res.getKey();
+ this.delegate = factory.createDelegate(registrationMessage);
}
}
*/
package org.opendaylight.controller.cluster.datastore;
-import java.util.Map.Entry;
-
/**
* Base class for factories instantiating delegates.
*
* @param <M> message type
* @param <D> delegate type
- * @param <I> initial state type
*/
-abstract class DelegateFactory<M, D, I> {
- abstract Entry<D, I> createDelegate(M message);
+abstract class DelegateFactory<M, D> {
+ abstract D createDelegate(M message);
}
*
* @param <D> delegate type
* @param <M> message type
- * @param <I> initial state type
*/
-abstract class LeaderLocalDelegateFactory<M, D, I> extends DelegateFactory<M, D, I> {
+abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
private final Shard shard;
protected LeaderLocalDelegateFactory(final Shard shard) {
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
store.setRunOnPendingTransactionsComplete(operation);
}
+ @Override
+ protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
+ .dataChangeListenerActors(changeSupport.getListenerActors())
+ .commitCohortActors(store.getCohortActors());
+ }
+
@Override
public String persistenceId() {
return this.name;
processNextPendingCommit();
}
+ Collection<ActorRef> getCohortActors() {
+ return cohortRegistry.getCohortActors();
+ }
+
void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
cohortRegistry.process(sender, message);
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+
+/**
+ * Extends OnDemandRaftState to add Shard state.
+ *
+ * @author Thomas Pantelis
+ */
+public class OnDemandShardState extends OnDemandRaftState {
+ private Collection<ActorSelection> treeChangeListenerActors;
+ private Collection<ActorSelection> dataChangeListenerActors;
+ private Collection<ActorRef> commitCohortActors;
+
+ public Collection<ActorSelection> getTreeChangeListenerActors() {
+ return treeChangeListenerActors;
+ }
+
+ public Collection<ActorSelection> getDataChangeListenerActors() {
+ return dataChangeListenerActors;
+ }
+
+ public Collection<ActorRef> getCommitCohortActors() {
+ return commitCohortActors;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder extends AbstractBuilder<Builder> {
+ private final OnDemandShardState state = new OnDemandShardState();
+
+ @Override
+ protected OnDemandRaftState state() {
+ return state;
+ }
+
+ public Builder treeChangeListenerActors(Collection<ActorSelection> actors) {
+ state.treeChangeListenerActors = actors;
+ return self();
+ }
+
+ public Builder dataChangeListenerActors(Collection<ActorSelection> actors) {
+ state.dataChangeListenerActors = actors;
+ return self();
+ }
+
+ public Builder commitCohortActors(Collection<ActorRef> actors) {
+ state.commitCohortActors = actors;
+ return self();
+ }
+ }
+}
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
final Shard shard = actor.underlyingActor();
final MockDataChangeListener listener = new MockDataChangeListener(0);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, TEST_PATH),
"testChangeListenerWithNoInitialData-DataChangeListener");
final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
support.onMessage(new RegisterChangeListener(TEST_PATH, dclActor, DataChangeScope.ONE, false),
writeToStore(shard.getDataStore(), TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME));
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, TEST_PATH),
"testInitialChangeListenerEventWithContainerPath-DataChangeListener");
final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
support.onMessage(new RegisterChangeListener(TEST_PATH, dclActor, DataChangeScope.ONE, false),
mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, OUTER_LIST_PATH),
"testInitialChangeListenerEventWithListPath-DataChangeListener");
final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH, dclActor, DataChangeScope.ONE, false),
ImmutableNodes.containerNode(OUTER_CONTAINER_QNAME));
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME);
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
"testInitialChangeListenerEventWithWildcardedListPath-DataChangeListener");
final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
- support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME), dclActor,
- DataChangeScope.ONE, false), true, true);
+ support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), true, true);
listener.waitForChangeEvents();
listener.verifyCreatedData(0, outerEntryPath(1));
outerNodeEntry(2, innerNode("three", "four")))));
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME)
+ .node(INNER_LIST_QNAME).node(INNER_LIST_QNAME);
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
"testInitialChangeListenerEventWithNestedWildcardedListsPath-DataChangeListener");
final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
- support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME)
- .node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor, DataChangeScope.ONE, false),
- true, true);
+ support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), true, true);
listener.waitForChangeEvents();
// Register for a specific outer list entry
final MockDataChangeListener listener2 = new MockDataChangeListener(1);
- final ActorRef dclActor2 = actorFactory.createActor(DataChangeListener.props(listener2),
+ final YangInstanceIdentifier path2 = OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME)
+ .node(INNER_LIST_QNAME);
+ final ActorRef dclActor2 = actorFactory.createActor(DataChangeListener.props(listener2, path2),
"testInitialChangeListenerEventWithNestedWildcardedListsPath-DataChangeListener2");
final DataChangeListenerSupport support2 = new DataChangeListenerSupport(shard);
- support2.onMessage(new RegisterChangeListener(
- OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor2,
- DataChangeScope.ONE, false), true, true);
+ support2.onMessage(new RegisterChangeListener(path2, dclActor2, DataChangeScope.ONE, false),
+ true, true);
listener2.waitForChangeEvents();
listener2.verifyCreatedData(0, innerEntryPath(1, "one"));
outerNodeEntry(2, innerNode("three", "four")))));
final MockDataChangeListener listener = new MockDataChangeListener(0);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME)
+ .node(INNER_LIST_QNAME);
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
"testInitialChangeListenerEventWhenNotInitiallyLeader-DataChangeListener");
final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
- support.onMessage(new RegisterChangeListener(
- OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), dclActor,
- DataChangeScope.ONE, false), false, true);
+ support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), false, true);
listener.expectNoMoreChanges("Unexpected initial change event");
listener.reset(1);
package org.opendaylight.controller.cluster.datastore;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+
import akka.actor.ActorRef;
import akka.actor.DeadLetter;
import akka.actor.Props;
{
final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
- final Props props = DataChangeListener.props(mockListener);
+ final Props props = DataChangeListener.props(mockListener, TEST_PATH);
final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
// Let the DataChangeListener know that notifications should be
{
final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
- final Props props = DataChangeListener.props(mockListener);
+ final Props props = DataChangeListener.props(mockListener, TEST_PATH);
final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
subject.tell(new DataChanged(mockChangeEvent), getRef());
{
final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
- final Props props = DataChangeListener.props(mockListener);
+ final Props props = DataChangeListener.props(mockListener, TEST_PATH);
final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender");
getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataChanged(mockChangeEvent2);
- Props props = DataChangeListener.props(mockListener);
+ Props props = DataChangeListener.props(mockListener, TEST_PATH);
ActorRef subject = getSystem().actorOf(props, "testDataChangedWithListenerRuntimeEx");
// Let the DataChangeListener know that notifications should be
*/
package org.opendaylight.controller.cluster.datastore;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+
import akka.actor.ActorRef;
import akka.actor.DeadLetter;
import akka.actor.Props;
final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
- final Props props = DataTreeChangeListenerActor.props(mockListener);
+ final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsEnabled");
// Let the DataChangeListener know that notifications should be
final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
- final Props props = DataTreeChangeListenerActor.props(mockListener);
+ final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsDisabled");
subject.tell(new DataTreeChanged(mockCandidates), getRef());
final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class);
final ImmutableList<DataTreeCandidate> mockCandidates = ImmutableList.of(mockTreeCandidate);
final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
- final Props props = DataTreeChangeListenerActor.props(mockListener);
+ final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithNoSender");
getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class);
Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataTreeChanged(mockCandidates2);
- Props props = DataTreeChangeListenerActor.props(mockListener);
+ Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH);
ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithListenerRuntimeEx");
// Let the DataChangeListener know that notifications should be
ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
mock(Configuration.class));
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
- actorContext, mockListener);
+ actorContext, mockListener, path);
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
new Thread() {
@Override
public void run() {
- proxy.init("shard-1", path);
+ proxy.init("shard-1");
}
}.start();
ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
ClusteredDOMDataTreeChangeListener.class);
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
- new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener);
+ new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path);
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
new Thread() {
@Override
public void run() {
- proxy.init("shard-1", path);
+ proxy.init("shard-1");
}
}.start();
ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
mock(Configuration.class));
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
- actorContext, mockListener);
+ actorContext, mockListener, path);
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
new Thread() {
@Override
public void run() {
- proxy.init("shard-1", path);
+ proxy.init("shard-1");
}
}.start();
ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
mock(Configuration.class));
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
- actorContext, mockListener);
+ actorContext, mockListener, path);
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
new Thread() {
@Override
public void run() {
- proxy.init("shard-1", path);
+ proxy.init("shard-1");
}
}.start();
String shardName = "shard-1";
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
- actorContext, mockListener);
+ actorContext, mockListener, path);
doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
.executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class));
doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
- proxy.init("shard-1", path);
+ proxy.init("shard-1");
Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
- actorContext, mockListener);
+ actorContext, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
Answer<Future<Object>> answer = invocation -> {
proxy.close();
doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), any(Object.class),
any(Timeout.class));
- proxy.init(shardName, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
+ proxy.init(shardName);
expectMsgClass(duration("5 seconds"), CloseDataTreeChangeListenerRegistration.class);
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import scala.concurrent.Await;
private Entry<MockDataTreeChangeListener, ActorSelection> registerChangeListener(final YangInstanceIdentifier path,
final int expectedEvents) {
MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
- ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
+ ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, TestModel.TEST_PATH));
try {
RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply)
shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
- "testRegisterChangeListener-DataChangeListener");
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
+ TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
AsyncDataBroker.DataChangeScope.BASE, true), getRef());
setupInMemorySnapshotStore();
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
"testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
final TestActorRef<Shard> shard = actorFactory.createTestActor(
new ShardTestKit(getSystem()) {
{
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
// Wait until the shard receives the first ElectionTimeout
// message.
assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
- "testRegisterDataTreeChangeListener-DataTreeChangeListener");
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+ TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
setupInMemorySnapshotStore();
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
final TestActorRef<Shard> shard = actorFactory.createTestActor(
Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
"testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
new ShardTestKit(getSystem()) {
{
assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
actorFactory.generateActorId(testName + "-DataChangeListener"));
setupInMemorySnapshotStore();
waitUntilNoLeader(shard);
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
final YangInstanceIdentifier path = TestModel.TEST_PATH;
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
actorFactory.generateActorId(testName + "-DataChangeListener"));
followerShard.tell(
.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
- actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
+ TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
setupInMemorySnapshotStore();
final YangInstanceIdentifier path = TestModel.TEST_PATH;
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());