<feature name="odl-nsf-service" description="OpenDaylight :: NSF :: Network Service Functions in Controller" version="${project.version}">
<feature version="${sal.version}">odl-adsal-all</feature>
- <feature version="${project.version}">odl-nsf-controller-managers</feature>
- <feature version="${project.version}">odl-adsal-controller-northbound</feature>
+ <feature version="${project.version}">odl-nsf-managers</feature>
+ <feature version="${project.version}">odl-adsal-northbound</feature>
</feature>
<feature name="odl-nsf-managers" description="OpenDaylight :: AD-SAL :: Network Service Functions" version="${project.version}">
<bundle>mvn:org.opendaylight.controller/topologymanager/${topologymanager.version}</bundle>
<bundle>mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/forwarding.staticrouting/${forwarding.staticrouting}</bundle>
-
- <bundle>mvn:org.opendaylight.controller.thirdparty/net.sf.jung2/2.0.1</bundle>
- <bundle>mvn:org.opendaylight.controller/routing.dijkstra_implementation/${routing.dijkstra_implementation.version}</bundle>
- </feature>
-
- <feature name="odl-nsf-controller-managers" description="OpenDaylight :: AD-SAL :: Network Service Functions in Controller" version="${project.version}">
- <feature version="${commons.opendaylight.version}">odl-base-all</feature>
- <feature version="${sal.version}">odl-adsal-all</feature>
- <bundle>mvn:org.opendaylight.controller/usermanager/${usermanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/usermanager.implementation/${usermanager.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/appauth/${appauth.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/connectionmanager/${connectionmanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/connectionmanager.implementation/${connectionmanager.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/containermanager/${containermanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/containermanager.implementation/${containermanager.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/statisticsmanager/${statisticsmanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/statisticsmanager.implementation/${statisticsmanager.implementation.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/switchmanager/${switchmanager.api.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/switchmanager.implementation/${switchmanager.implementation.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/forwardingrulesmanager/${forwardingrulesmanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/forwardingrulesmanager.implementation/${forwardingrulesmanager.implementation.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/topologymanager/${topologymanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version}</bundle>
-
<bundle>mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version}</bundle>
<bundle>mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version}</bundle>
<bundle>mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version}</bundle>
<bundle>mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}</bundle>
<bundle>mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}</bundle>
</feature>
-
- <feature name="odl-adsal-controller-northbound" description="OpenDaylight :: AD-SAL :: Northbound APIs in Controller" version="${project.version}">
- <feature version="${commons.opendaylight.version}">odl-base-all</feature>
- <feature version="${project.version}">odl-nsf-managers</feature>
- <bundle>mvn:org.ow2.asm/asm-all/${asm.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/bundlescanner/${bundlescanner.api.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/bundlescanner.implementation/${bundlescanner.implementation.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/commons.northbound/${northbound.commons.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/connectionmanager.northbound/${connectionmanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/flowprogrammer.northbound/${flowprogrammer.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/hosttracker.northbound/${hosttracker.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/networkconfig.bridgedomain.northbound/${networkconfig.bridgedomain.northbound.version}</bundle>
- <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/${eclipse.persistence.version}</bundle>
- <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.core/${eclipse.persistence.version}</bundle>
- <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/${eclipse.persistence.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/forwarding.staticrouting.northbound/${forwarding.staticrouting.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/statistics.northbound/${statistics.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/subnets.northbound/${subnets.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/switchmanager.northbound/${switchmanager.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}</bundle>
- </feature>
</features>
<artifactId>${artifactId}-impl</artifactId>
<version>${symbol_dollar}{project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${symbol_dollar}{project.groupId}</groupId>
+ <artifactId>${artifactId}-impl</artifactId>
+ <version>${symbol_dollar}{project.version}</version>
+ <type>xml</type>
+ <classifier>config</classifier>
+ </dependency>
<dependency>
<groupId>${symbol_dollar}{project.groupId}</groupId>
<artifactId>${artifactId}-api</artifactId>
<yang-ext.version>2013.09.07.7-SNAPSHOT</yang-ext.version>
<yang-jmx-generator.version>1.1.0-SNAPSHOT</yang-jmx-generator.version>
<yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
- <sshd-core.version>0.12.0</sshd-core.version>
+ <sshd-core.version>0.14.0</sshd-core.version>
<jmh.version>0.9.7</jmh.version>
<lmax.version>3.3.0</lmax.version>
</properties>
package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.regex.Pattern;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.regex.Pattern;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
private final EventSourceService eventSourceService;
private final RpcProviderRegistry rpcRegistry;
- private final ExecutorService executorService;
public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
+
this.dataBroker = dataBroker;
- this.executorService = Executors.newCachedThreadPool();
this.rpcRegistry = rpcRegistry;
aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
+
}
private <T extends DataObject> void putData(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path, final T data) {
+ final InstanceIdentifier<T> path,
+ final T data){
final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
tx.put(store, path, data, true);
tx.submit();
+
}
private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
}
private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
- executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic));
+
+ final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
+
+ final CheckedFuture<Optional<Topology>, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH);
+
+ Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
+
+ @Override
+ public void onSuccess(Optional<Topology> data) {
+ if(data.isPresent()) {
+ final List<Node> nodes = data.get().getNode();
+ for (final Node node : nodes) {
+ if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
+ eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
+ }
+ }
+ }
+ tx.close();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Can not notify existing nodes {}", t);
+ tx.close();
+ }
+
+ });
+
}
@Override
// FIXME: Return registration object.
}
- private class NotifyAllNodeExecutor implements Runnable {
-
- private final EventSourceTopic topic;
- private final DataBroker dataBroker;
- private final Pattern nodeIdPatternRegex;
-
- public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) {
- this.topic = topic;
- this.dataBroker = dataBroker;
- this.nodeIdPatternRegex = nodeIdPatternRegex;
- }
-
- @Override
- public void run() {
- //# Code reader note: Context of Node type is NetworkTopology
- final List<Node> nodes = snapshot();
- for (final Node node : nodes) {
- if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
- topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
- }
- }
- }
-
- private List<Node> snapshot() {
- try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) {
-
- final Optional<Topology> data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet();
-
- if(data.isPresent()) {
- final List<Node> nodeList = data.get().getNode();
- if(nodeList != null) {
- return nodeList;
- }
- }
- return Collections.emptyList();
- } catch (final ReadFailedException e) {
- LOG.error("Unable to retrieve node list.", e);
- return Collections.emptyList();
- }
- }
- }
}
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class FollowerLogInformationImpl implements FollowerLogInformation {
- private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
- private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
-
private final String id;
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final RaftActorContext context;
- private volatile long nextIndex;
+ private long nextIndex;
- private volatile long matchIndex;
+ private long matchIndex;
private long lastReplicatedIndex = -1L;
}
@Override
- public long incrNextIndex(){
- return NEXT_INDEX_UPDATER.incrementAndGet(this);
+ public long incrNextIndex() {
+ return nextIndex++;
}
@Override
public long decrNextIndex() {
- return NEXT_INDEX_UPDATER.decrementAndGet(this);
+ return nextIndex--;
}
@Override
@Override
public long incrMatchIndex(){
- return MATCH_INDEX_UPDATER.incrementAndGet(this);
+ return matchIndex++;
}
@Override
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
public void apply(ApplyJournalEntries param) throws Exception {
}
};
+ private static final String COMMIT_SNAPSHOT = "commit_snapshot";
protected final Logger LOG = LoggerFactory.getLogger(getClass());
*/
private final RaftActorContextImpl context;
+ private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
+
/**
* The in-memory journal
*/
private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
- private CaptureSnapshot captureSnapshot = null;
-
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
+ private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
+
public RaftActor(String id, Map<String, String> peerAddresses) {
this(id, peerAddresses, Optional.<ConfigParams>absent());
}
}
protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
- RaftActorBehavior oldBehavior = currentBehavior;
+ reusableBehaviorStateHolder.init(currentBehavior);
currentBehavior = newBehavior;
- handleBehaviorChange(oldBehavior, currentBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
}
@Override public void handleCommand(Object message) {
LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
persistenceId(), saveSnapshotFailure.cause());
- context.getReplicatedLog().snapshotRollback();
-
- LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- context.getReplicatedLog().size());
+ context.getSnapshotManager().rollback();
} else if (message instanceof CaptureSnapshot) {
LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
- if(captureSnapshot == null) {
- captureSnapshot = (CaptureSnapshot)message;
- createSnapshot();
- }
+ context.getSnapshotManager().create(createSnapshotProcedure);
- } else if (message instanceof CaptureSnapshotReply){
+ } else if (message instanceof CaptureSnapshotReply) {
handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
+ } else if (message.equals(COMMIT_SNAPSHOT)) {
+ commitSnapshot(-1);
} else {
- RaftActorBehavior oldBehavior = currentBehavior;
+ reusableBehaviorStateHolder.init(currentBehavior);
+
currentBehavior = currentBehavior.handleMessage(getSender(), message);
- handleBehaviorChange(oldBehavior, currentBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
}
}
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog.dataSize())
.inMemoryJournalLogSize(replicatedLog.size())
- .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+ .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
.lastApplied(context.getLastApplied())
.lastIndex(replicatedLog.lastIndex())
.lastTerm(replicatedLog.lastTerm())
}
- private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+ private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
+ RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
+
if (oldBehavior != currentBehavior){
onStateChanged();
}
- String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
- String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
+ String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId();
+ String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
// it can happen that the state has not changed but the leader has changed.
- onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+ Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
+ if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
+ if(roleChangeNotifier.isPresent()) {
+ roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
+ }
+
+ onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+ }
- if (getRoleChangeNotifier().isPresent() &&
+ if (roleChangeNotifier.isPresent() &&
(oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
- getRoleChangeNotifier().get().tell(
- new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
- getSelf());
+ roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
+ currentBehavior.state().name()), getSelf());
}
}
// the state to durable storage
self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
- // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
- if(!context.isSnapshotCaptureInitiated()){
- raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
- raftContext.getTermInformation().getCurrentTerm());
- raftContext.getReplicatedLog().snapshotCommit();
- } else {
- LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
- persistenceId(), getId());
- }
+ context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+
} else if (clientActor != null) {
// Send message for replication
currentBehavior.handleMessage(getSelf(),
}
protected void commitSnapshot(long sequenceNumber) {
- context.getReplicatedLog().snapshotCommit();
-
- // TODO: Not sure if we want to be this aggressive with trimming stuff
- trimPersistentData(sequenceNumber);
+ context.getSnapshotManager().commit(persistence(), sequenceNumber);
}
/**
protected void onLeaderChanged(String oldLeader, String newLeader){};
- private void trimPersistentData(long sequenceNumber) {
- // Trim akka snapshots
- // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
- // For now guessing that it is ANDed.
- persistence().deleteSnapshots(new SnapshotSelectionCriteria(
- sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
-
- // Trim akka journal
- persistence().deleteMessages(sequenceNumber);
- }
-
private String getLeaderAddress(){
if(isLeader()){
return getSelf().path().toString();
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
- // create a snapshot object from the state provided and save it
- // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
- Snapshot sn = Snapshot.create(snapshotBytes,
- context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
- captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
- captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
- persistence().saveSnapshot(sn);
-
- LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
-
- long dataThreshold = getTotalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- if (context.getReplicatedLog().dataSize() > dataThreshold) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
- persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
- captureSnapshot.getLastAppliedIndex());
- }
-
- // if memory is less, clear the log based on lastApplied.
- // this could/should only happen if one of the followers is down
- // as normally we keep removing from the log when its replicated to all.
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());
-
- // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
- // install snapshot to a follower.
- if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- }
- } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
- // clear the log based on replicatedToAllIndex
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
- captureSnapshot.getReplicatedToAllTerm());
-
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- } else {
- // The replicatedToAllIndex was not found in the log
- // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
- // In this scenario we may need to save the snapshot to the akka persistence
- // snapshot for recovery but we do not need to do the replicated log trimming.
- context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm());
- }
-
-
- LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
- "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm());
-
- if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
- // this would be call straight to the leader and won't initiate in serialization
- currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
- ByteString.copyFrom(snapshotBytes)));
- }
-
- captureSnapshot = null;
- context.setSnapshotCaptureInitiated(false);
+ context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
}
protected long getTotalMemory() {
}
private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-
private static final int DATA_SIZE_DIVIDER = 5;
- private long dataSizeSinceLastSnapshot = 0;
+ private long dataSizeSinceLastSnapshot = 0L;
+
public ReplicatedLogImpl(Snapshot snapshot) {
super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
long dataSizeForCheck = dataSize;
dataSizeSinceLastSnapshot += logEntrySize;
- long journalSize = lastIndex() + 1;
- if(!hasFollowers()) {
+ if (!hasFollowers()) {
// When we do not have followers we do not maintain an in-memory log
// due to this the journalSize will never become anything close to the
// snapshot batch count. In fact will mostly be 1.
// as if we were maintaining a real snapshot
dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
}
-
+ long journalSize = replicatedLogEntry.getIndex() + 1;
long dataThreshold = getTotalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
- // when a snaphsot is being taken, captureSnapshot != null
- if (!context.isSnapshotCaptureInitiated() &&
- ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
- dataSizeForCheck > dataThreshold)) {
-
- dataSizeSinceLastSnapshot = 0;
+ context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
- " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+ if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+ || dataSizeForCheck > dataThreshold)) {
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
+ boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+ currentBehavior.getReplicatedToAllIndex());
- ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
- if (!hasFollowers()) {
- lastAppliedIndex = replicatedLogEntry.getIndex();
- lastAppliedTerm = replicatedLogEntry.getTerm();
- } else if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
+ if(started){
+ dataSizeSinceLastSnapshot = 0;
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
- LOG.debug("{}: Snapshot Capture lastApplied:{} ",
- persistenceId(), context.getLastApplied());
- LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
- lastAppliedIndex);
- LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
- lastAppliedTerm);
- }
-
- // send a CaptureSnapshot to self to make the expensive operation async.
- long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
- getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
- null);
- context.setSnapshotCaptureInitiated(true);
}
+
if (callback != null){
callback.apply(replicatedLogEntry);
}
@Override
public void saveSnapshot(Object o) {
// Make saving Snapshot successful
- commitSnapshot(-1L);
+ // Committing the snapshot here would end up calling commit in the creating state which would
+ // be a state violation. That's why now we send a message to commit the snapshot.
+ self().tell(COMMIT_SNAPSHOT, self());
+ }
+ }
+
+
+ private class CreateSnapshotProcedure implements Procedure<Void> {
+
+ @Override
+ public void apply(Void aVoid) throws Exception {
+ createSnapshot();
}
}
return currentBehavior;
}
+ private static class BehaviorStateHolder {
+ private RaftActorBehavior behavior;
+ private String leaderId;
+
+ void init(RaftActorBehavior behavior) {
+ this.behavior = behavior;
+ this.leaderId = behavior != null ? behavior.getLeaderId() : null;
+ }
+
+ RaftActorBehavior getBehavior() {
+ return behavior;
+ }
+
+ String getLeaderId() {
+ return leaderId;
+ }
+ }
}
*/
ConfigParams getConfigParams();
- void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
-
- boolean isSnapshotCaptureInitiated();
+ SnapshotManager getSnapshotManager();
}
private boolean snapshotCaptureInitiated;
+ // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
+ // be passed to it in the constructor
+ private SnapshotManager snapshotManager;
+
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
String id,
ElectionTerm termInformation, long commitIndex,
return configParams;
}
- @Override
- public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
- this.snapshotCaptureInitiated = snapshotCaptureInitiated;
- }
-
- @Override
- public boolean isSnapshotCaptureInitiated() {
- return snapshotCaptureInitiated;
- }
-
@Override public void addToPeers(String name, String address) {
peerAddresses.put(name, address);
}
peerAddresses.put(peerId, peerAddress);
}
+
+ public SnapshotManager getSnapshotManager() {
+ if(snapshotManager == null){
+ snapshotManager = new SnapshotManager(this, LOG);
+ }
+ return snapshotManager;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+public class SnapshotManager implements SnapshotState {
+
+
+ private final SnapshotState IDLE = new Idle();
+ private final SnapshotState CAPTURING = new Capturing();
+ private final SnapshotState PERSISTING = new Persisting();
+ private final SnapshotState CREATING = new Creating();
+
+ private final Logger LOG;
+ private final RaftActorContext context;
+ private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
+ new LastAppliedTermInformationReader();
+ private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader =
+ new ReplicatedToAllTermInformationReader();
+
+
+ private SnapshotState currentState = IDLE;
+ private CaptureSnapshot captureSnapshot;
+
+ public SnapshotManager(RaftActorContext context, Logger logger) {
+ this.context = context;
+ this.LOG = logger;
+ }
+
+ @Override
+ public boolean isCapturing() {
+ return currentState.isCapturing();
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower);
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ return currentState.capture(lastLogEntry, replicatedToAllIndex);
+ }
+
+ @Override
+ public void create(Procedure<Void> callback) {
+ currentState.create(callback);
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
+ }
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ currentState.commit(persistenceProvider, sequenceNumber);
+ }
+
+ @Override
+ public void rollback() {
+ currentState.rollback();
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ return currentState.trimLog(desiredTrimIndex, currentBehavior);
+ }
+
+ private boolean hasFollowers(){
+ return context.getPeerAddresses().keySet().size() > 0;
+ }
+
+ private String persistenceId(){
+ return context.getId();
+ }
+
+ private class AbstractSnapshotState implements SnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return false;
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ LOG.debug("capture should not be called in state {}", this);
+ return false;
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ LOG.debug("captureToInstall should not be called in state {}", this);
+ return false;
+ }
+
+ @Override
+ public void create(Procedure<Void> callback) {
+ LOG.debug("create should not be called in state {}", this);
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ LOG.debug("persist should not be called in state {}", this);
+ }
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ LOG.debug("commit should not be called in state {}", this);
+ }
+
+ @Override
+ public void rollback() {
+ LOG.debug("rollback should not be called in state {}", this);
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ LOG.debug("trimLog should not be called in state {}", this);
+ return -1;
+ }
+
+ protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
+ // we would want to keep the lastApplied as its used while capturing snapshots
+ long lastApplied = context.getLastApplied();
+ long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
+ persistenceId(), desiredTrimIndex, lastApplied, tempMin);
+ }
+
+ if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
+ LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
+ context.getTermInformation().getCurrentTerm());
+
+ //use the term of the temp-min, since we check for isPresent, entry will not be null
+ ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+ context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
+ context.getReplicatedLog().snapshotCommit();
+ return tempMin;
+ } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+ // It's possible a follower was lagging and an install snapshot advanced its match index past
+ // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
+ // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
+ // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
+ // trim the log to the last applied index even if previous entries weren't replicated to all followers.
+ currentBehavior.setReplicatedToAllIndex(tempMin);
+ }
+ return -1;
+ }
+ }
+
+ private class Idle extends AbstractSnapshotState {
+
+ private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ TermInformationReader lastAppliedTermInfoReader =
+ lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
+ lastLogEntry, hasFollowers());
+
+ long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
+ long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
+
+ TermInformationReader replicatedToAllTermInfoReader =
+ replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
+
+ long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
+ long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
+
+ // send a CaptureSnapshot to self to make the expensive operation async.
+ captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
+ lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
+ newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+
+ SnapshotManager.this.currentState = CAPTURING;
+
+ if(targetFollower != null){
+ LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
+ } else {
+ LOG.info("{}: Initiating snapshot capture {} to install on {}",
+ persistenceId(), captureSnapshot, targetFollower);
+ }
+
+ context.getActor().tell(captureSnapshot, context.getActor());
+
+ return true;
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ return capture(lastLogEntry, replicatedToAllIndex, null);
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
+ }
+
+ @Override
+ public String toString() {
+ return "Idle";
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ return doTrimLog(desiredTrimIndex, currentBehavior);
+ }
+ }
+
+ private class Capturing extends AbstractSnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return true;
+ }
+
+ @Override
+ public void create(Procedure<Void> callback) {
+ try {
+ callback.apply(null);
+ SnapshotManager.this.currentState = CREATING;
+ } catch (Exception e) {
+ LOG.error("Unexpected error occurred", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Capturing";
+ }
+
+ }
+
+ private class Creating extends AbstractSnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return true;
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ // create a snapshot object from the state provided and save it
+ // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+ Snapshot sn = Snapshot.create(snapshotBytes,
+ context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+ captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+ captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+ persistenceProvider.saveSnapshot(sn);
+
+ LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
+
+ long dataThreshold = totalMemory *
+ context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+ if (context.getReplicatedLog().dataSize() > dataThreshold) {
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
+ persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
+ captureSnapshot.getLastAppliedIndex());
+ }
+
+ // if memory is less, clear the log based on lastApplied.
+ // this could/should only happen if one of the followers is down
+ // as normally we keep removing from the log when its replicated to all.
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
+ // install snapshot to a follower.
+ if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+ currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ }
+
+ } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
+ // clear the log based on replicatedToAllIndex
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+ captureSnapshot.getReplicatedToAllTerm());
+
+ currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ } else {
+ // The replicatedToAllIndex was not found in the log
+ // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+ // In this scenario we may need to save the snapshot to the akka persistence
+ // snapshot for recovery but we do not need to do the replicated log trimming.
+ context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm());
+ }
+
+ LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+ "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ if (context.getId().equals(currentBehavior.getLeaderId())
+ && captureSnapshot.isInstallSnapshotInitiated()) {
+ // this would be call straight to the leader and won't initiate in serialization
+ currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(
+ ByteString.copyFrom(snapshotBytes)));
+ }
+
+ captureSnapshot = null;
+ SnapshotManager.this.currentState = PERSISTING;
+ }
+
+ @Override
+ public String toString() {
+ return "Creating";
+ }
+
+ }
+
+ private class Persisting extends AbstractSnapshotState {
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ context.getReplicatedLog().snapshotCommit();
+ persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+ sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+
+ persistenceProvider.deleteMessages(sequenceNumber);
+
+ SnapshotManager.this.currentState = IDLE;
+ }
+
+ @Override
+ public void rollback() {
+ context.getReplicatedLog().snapshotRollback();
+
+ LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
+ "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().size());
+
+ SnapshotManager.this.currentState = IDLE;
+ }
+
+ @Override
+ public String toString() {
+ return "Persisting";
+ }
+
+ }
+
+ private static interface TermInformationReader {
+ long getIndex();
+ long getTerm();
+ }
+
+ private static class LastAppliedTermInformationReader implements TermInformationReader{
+ private long index;
+ private long term;
+
+ public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex,
+ ReplicatedLogEntry lastLogEntry, boolean hasFollowers){
+ ReplicatedLogEntry entry = log.get(originalIndex);
+ this.index = -1L;
+ this.term = -1L;
+ if (!hasFollowers) {
+ if(lastLogEntry != null) {
+ index = lastLogEntry.getIndex();
+ term = lastLogEntry.getTerm();
+ }
+ } else if (entry != null) {
+ index = entry.getIndex();
+ term = entry.getTerm();
+ } else if(log.getSnapshotIndex() > -1){
+ index = log.getSnapshotIndex();
+ term = log.getSnapshotTerm();
+ }
+ return this;
+ }
+
+ @Override
+ public long getIndex(){
+ return this.index;
+ }
+
+ @Override
+ public long getTerm(){
+ return this.term;
+ }
+ }
+
+ private static class ReplicatedToAllTermInformationReader implements TermInformationReader{
+ private long index;
+ private long term;
+
+ ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){
+ ReplicatedLogEntry entry = log.get(originalIndex);
+ this.index = -1L;
+ this.term = -1L;
+
+ if (entry != null) {
+ index = entry.getIndex();
+ term = entry.getTerm();
+ }
+
+ return this;
+ }
+
+ @Override
+ public long getIndex(){
+ return this.index;
+ }
+
+ @Override
+ public long getTerm(){
+ return this.term;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+public interface SnapshotState {
+ /**
+ * Should return true when a snapshot is being captured
+ * @return
+ */
+ boolean isCapturing();
+
+ /**
+ * Initiate capture snapshot
+ *
+ * @param lastLogEntry the last entry in the replicated log
+ * @param replicatedToAllIndex the current replicatedToAllIndex
+ *
+ * @return true if capture was started
+ */
+ boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+
+ /**
+ * Initiate capture snapshot for the purposing of installing that snapshot
+ *
+ * @param lastLogEntry
+ * @param replicatedToAllIndex
+ * @param targetFollower
+ *
+ * @return true if capture was started
+ */
+ boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
+
+ /**
+ * Create the snapshot
+ *
+ * @param callback a procedure to be called which should create the snapshot
+ */
+ void create(Procedure<Void> callback);
+
+ /**
+ * Persist the snapshot
+ *
+ * @param persistenceProvider
+ * @param snapshotBytes
+ * @param currentBehavior
+ * @param totalMemory
+ */
+ void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior
+ ,long totalMemory);
+
+ /**
+ * Commit the snapshot by trimming the log
+ *
+ * @param persistenceProvider
+ * @param sequenceNumber
+ */
+ void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber);
+
+ /**
+ * Rollback the snapshot
+ */
+ void rollback();
+
+ /**
+ * Trim the log
+ *
+ * @param desiredTrimIndex
+ * @return the actual trim index
+ */
+ long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior);
+}
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
applyLogToStateMachine(context.getCommitIndex());
}
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
purgeInMemoryLog();
}
followerToSnapshot.markSendStatus(false);
}
- if (wasLastChunk && !context.isSnapshotCaptureInitiated()) {
+ if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
// Since the follower is now caught up try to purge the log.
purgeInMemoryLog();
} else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
sendAppendEntries = true;
}
} else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
+ leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
// if the followers next index is not present in the leaders log, and
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
sendSnapshotChunk(followerActor, followerId);
- } else if (!context.isSnapshotCaptureInitiated()) {
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
- }
-
- boolean isInstallSnapshotInitiated = true;
- long replicatedToAllIndex = super.getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-
- CaptureSnapshot captureSnapshot = new CaptureSnapshot(
- lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
- isInstallSnapshotInitiated);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId,
- captureSnapshot);
- }
-
- actor().tell(captureSnapshot, actor());
- context.setSnapshotCaptureInitiated(true);
+ } else {
+ context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
}
}
}
*/
public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
+ protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout();
+
/**
* Information about the RaftActor whose behavior this class represents
*/
// message is sent to itself
electionCancel =
context.getActorSystem().scheduler().scheduleOnce(interval,
- context.getActor(), new ElectionTimeout(),
+ context.getActor(), ELECTION_TIMEOUT,
context.getActorSystem().dispatcher(), context.getActor());
}
* @param snapshotCapturedIndex
*/
protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
- // we would want to keep the lastApplied as its used while capturing snapshots
- long lastApplied = context.getLastApplied();
- long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
-
- if(LOG.isTraceEnabled()) {
- LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}",
- logName, snapshotCapturedIndex, lastApplied, tempMin);
- }
+ long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
- if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
- LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
- context.getTermInformation().getCurrentTerm());
-
- //use the term of the temp-min, since we check for isPresent, entry will not be null
- ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
- context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
- context.getReplicatedLog().snapshotCommit();
- setReplicatedToAllIndex(tempMin);
- } else if(tempMin > getReplicatedToAllIndex()) {
- // It's possible a follower was lagging and an install snapshot advanced its match index past
- // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
- // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
- // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
- // trim the log to the last applied index even if previous entries weren't replicated to all followers.
- setReplicatedToAllIndex(tempMin);
+ if(actualIndex != -1){
+ setReplicatedToAllIndex(actualIndex);
}
}
votesRequired = getMajorityVoteCount(peers.size());
startNewTerm();
- scheduleElection(electionDuration());
+
+ if(context.getPeerAddresses().isEmpty()){
+ actor().tell(ELECTION_TIMEOUT, actor());
+ } else {
+ scheduleElection(electionDuration());
+ }
+
+
}
@Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
public Follower(RaftActorContext context) {
super(context, RaftState.Follower);
- scheduleElection(electionDuration());
-
initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
+
+ if(context.getPeerAddresses().isEmpty()){
+ actor().tell(ELECTION_TIMEOUT, actor());
+ } else {
+ scheduleElection(electionDuration());
+ }
+
}
private boolean isLogEntryPresent(long index){
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
+ if (snapshotTracker != null) {
+ // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
+ AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
+ lastIndex(), lastTerm());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
+ }
+ sender.tell(reply, actor());
+
+ return this;
+ }
+
// 1. Reply false if term < currentTerm (§5.1)
// This is handled in the appendEntries method of the base class
sender.tell(reply, actor());
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
}
assertEquals("ReplicatedLogEntry getIndex", expIndex, replicatedLogEntry.getIndex());
assertEquals("ReplicatedLogEntry getData", payload, replicatedLogEntry.getData());
}
+
+ protected String testActorPath(String id){
+ return "akka://test/user" + id;
+ }
}
private Map<String, String> peerAddresses = new HashMap<>();
private ConfigParams configParams;
private boolean snapshotCaptureInitiated;
+ private SnapshotManager snapshotManager;
public MockRaftActorContext(){
electionTerm = new ElectionTerm() {
}
@Override
- public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
- this.snapshotCaptureInitiated = snapshotCaptureInitiated;
- }
-
- @Override
- public boolean isSnapshotCaptureInitiated() {
- return snapshotCaptureInitiated;
+ public SnapshotManager getSnapshotManager() {
+ if(this.snapshotManager == null){
+ this.snapshotManager = new SnapshotManager(this, getLogger());
+ }
+ return this.snapshotManager;
}
public void setConfigParams(ConfigParams configParams) {
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
}
}
+
+ public void waitUntilLeader(){
+ for(int i = 0;i < 10; i++){
+ if(isLeader()){
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
public List<Object> getState() {
return state;
}
return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
}
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ Optional<ConfigParams> config, ActorRef roleChangeNotifier,
+ DataPersistenceProvider dataPersistenceProvider){
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
+ }
+
+
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
delegate.applyState(clientActor, identifier, data);
LOG.info("{}: applyState called", persistenceId());
mockRaftActor.waitForInitializeBehaviorComplete();
+ mockRaftActor.waitUntilLeader();
+
mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
- verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
+ verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
}
};
}
mockRaftActor.waitForInitializeBehaviorComplete();
+ mockRaftActor.waitUntilLeader();
+
mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
- verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
+ verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
}
new MockRaftActorContext.MockPayload("C"),
new MockRaftActorContext.MockPayload("D")));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
-
RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
+ raftActorContext.getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
+ new MockRaftActorContext.MockPayload("D")), -1);
+
mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+ ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
mockRaftActor.waitForInitializeBehaviorComplete();
+ MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
- mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
+ mockRaftActor.getReplicatedLog().append(lastEntry);
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
long replicatedToAllIndex = 1;
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
+
+ mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
verify(mockRaftActor.delegate).createSnapshot();
mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
+ raftActorContext.getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+ new MockRaftActorContext.MockPayload("D")), 1);
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
}
@Test
- public void testRaftRoleChangeNotifier() throws Exception {
+ public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+ Props.create(MessageCollectorActor.class));
+ MessageCollectorActor.waitUntilReady(notifierActor);
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ long heartBeatInterval = 100;
+ config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
+ config.setElectionTimeoutFactor(20);
+
+ String persistenceId = factory.generateActorId("notifier-");
+
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+ Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
+ new NonPersistentProvider()), persistenceId);
+
+ List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
+
+
+ // check if the notifier got a role change from null to Follower
+ RoleChanged raftRoleChanged = matches.get(0);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertNull(raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+ // check if the notifier got a role change from Follower to Candidate
+ raftRoleChanged = matches.get(1);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+
+ // check if the notifier got a role change from Candidate to Leader
+ raftRoleChanged = matches.get(2);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+
+ LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+ notifierActor, LeaderStateChanged.class);
+
+ assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
+
+ notifierActor.underlyingActor().clear();
+
+ MockRaftActor raftActor = raftActorRef.underlyingActor();
+ final String newLeaderId = "new-leader";
+ Follower follower = new Follower(raftActor.getRaftActorContext()) {
+ @Override
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ leaderId = newLeaderId;
+ return this;
+ }
+ };
+
+ raftActor.changeCurrentBehavior(follower);
+
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(null, leaderStateChange.getLeaderId());
+
+ raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+ notifierActor.underlyingActor().clear();
+
+ raftActor.handleCommand("any");
+
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(newLeaderId, leaderStateChange.getLeaderId());
+ }};
+ }
+
+ @Test
+ public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
new JavaTestKit(getSystem()) {{
ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
MessageCollectorActor.waitUntilReady(notifierActor);
String persistenceId = factory.generateActorId("notifier-");
- factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
+ factory.createActor(MockRaftActor.props(persistenceId,
+ ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
List<RoleChanged> matches = null;
for(int i = 0; i < 5000 / heartBeatInterval; i++) {
Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
}
- assertEquals(3, matches.size());
+ assertEquals(2, matches.size());
// check if the notifier got a role change from null to Follower
RoleChanged raftRoleChanged = matches.get(0);
assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
- // check if the notifier got a role change from Candidate to Leader
- raftRoleChanged = matches.get(2);
- assertEquals(persistenceId, raftRoleChanged.getMemberId());
- assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
- assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
}};
}
assertEquals(8, leaderActor.getReplicatedLog().size());
- leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
+ leaderActor.getRaftActorContext().getSnapshotManager()
+ .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("x")), 4);
- leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(leaderActor.delegate).createSnapshot();
assertEquals(8, leaderActor.getReplicatedLog().size());
new MockRaftActorContext.MockPayload("foo-2"),
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
+ , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
+
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+ // The commit is needed to complete the snapshot creation process
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertEquals(6, followerActor.getReplicatedLog().size());
//snapshot on 4
- followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
+ followerActor.getRaftActorContext().getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+ new MockRaftActorContext.MockPayload("D")), 4);
- followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(followerActor.delegate).createSnapshot();
assertEquals(6, followerActor.getReplicatedLog().size());
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+ // The commit is needed to complete the snapshot creation process
+ followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
// Trimming log in this scenario is a no-op
assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals(-1, leader.getReplicatedToAllIndex());
}};
// Trimming log in this scenario is a no-op
assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals(3, leader.getReplicatedToAllIndex());
}};
// Create the leader and 2 follower actors and verify initial syncing of the followers after leader
// persistence recovery.
- follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams());
+ follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
- follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams());
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
peerAddresses = ImmutableMap.<String, String>builder().
put(follower1Id, follower1Actor.path().toString()).
InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
// Create the leader and 2 follower actors.
+ follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
- follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams());
-
- follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams());
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().
put(follower1Id, follower1Actor.path().toString()).
--- /dev/null
+package org.opendaylight.controller.cluster.raft;
+
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.testkit.TestActorRef;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.LoggerFactory;
+
+public class SnapshotManagerTest extends AbstractActorTest {
+
+ @Mock
+ private RaftActorContext mockRaftActorContext;
+
+ @Mock
+ private ConfigParams mockConfigParams;
+
+ @Mock
+ private ReplicatedLog mockReplicatedLog;
+
+ @Mock
+ private DataPersistenceProvider mockDataPersistenceProvider;
+
+ @Mock
+ private RaftActorBehavior mockRaftActorBehavior;
+
+ @Mock
+ private Procedure<Void> mockProcedure;
+
+ private SnapshotManager snapshotManager;
+
+ private TestActorFactory factory;
+
+ private TestActorRef<MessageCollectorActor> actorRef;
+
+ @Before
+ public void setUp(){
+ MockitoAnnotations.initMocks(this);
+
+ doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses();
+ doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
+ doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
+ doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
+ doReturn("123").when(mockRaftActorContext).getId();
+ doReturn("123").when(mockRaftActorBehavior).getLeaderId();
+
+ snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
+ factory = new TestActorFactory(getSystem());
+
+ actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
+ doReturn(actorRef).when(mockRaftActorContext).getActor();
+
+ }
+
+ @After
+ public void tearDown(){
+ factory.close();
+ }
+
+ @Test
+ public void testConstruction(){
+ assertEquals(false, snapshotManager.isCapturing());
+ }
+
+ @Test
+ public void testCaptureToInstall(){
+
+ // Force capturing toInstall = true
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+ new MockRaftActorContext.MockPayload()), 0, "follower-1");
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+
+ // LastIndex and LastTerm are picked up from the lastLogEntry
+ assertEquals(0L, captureSnapshot.getLastIndex());
+ assertEquals(1L, captureSnapshot.getLastTerm());
+
+ // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+ assertEquals(0L, captureSnapshot.getLastAppliedIndex());
+ assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+ //
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+ actorRef.underlyingActor().clear();
+ }
+
+ @Test
+ public void testCapture(){
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertTrue(capture);
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+ // LastIndex and LastTerm are picked up from the lastLogEntry
+ assertEquals(9L, captureSnapshot.getLastIndex());
+ assertEquals(1L, captureSnapshot.getLastTerm());
+
+ // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+ assertEquals(9L, captureSnapshot.getLastAppliedIndex());
+ assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+ //
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+
+ actorRef.underlyingActor().clear();
+
+ }
+
+ @Test
+ public void testIllegalCapture() throws Exception {
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertTrue(capture);
+
+ List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+ assertEquals(1, allMatching.size());
+
+ // This will not cause snapshot capture to start again
+ capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertFalse(capture);
+
+ allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+ assertEquals(1, allMatching.size());
+ }
+
+ @Test
+ public void testPersistWhenReplicatedToAllIndexMinusOne(){
+ doReturn("123").when(mockRaftActorContext).getId();
+ doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
+ doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+ snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
+ verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
+
+ Snapshot snapshot = snapshotArgumentCaptor.getValue();
+
+ assertEquals(6, snapshot.getLastAppliedTerm());
+ assertEquals(9, snapshot.getLastAppliedIndex());
+ assertEquals(9, snapshot.getLastIndex());
+ assertEquals(6, snapshot.getLastTerm());
+ assertEquals(10, snapshot.getState().length);
+ assertTrue(Arrays.equals(bytes, snapshot.getState()));
+ assertEquals(0, snapshot.getUnAppliedEntries().size());
+
+ verify(mockReplicatedLog).snapshotPreCommit(45L, 6L);
+ }
+
+
+ @Test
+ public void testCreate() throws Exception {
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure).apply(null);
+ }
+
+ @Test
+ public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, times(1)).apply(null);
+ }
+
+ @Test
+ public void testCallingCreateBeforeCapture() throws Exception {
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, times(0)).apply(null);
+ }
+
+ @Test
+ public void testCallingCreateAfterPersist() throws Exception {
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, times(1)).apply(null);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ reset(mockProcedure);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, never()).apply(null);
+ }
+
+ @Test
+ public void testPersistWhenReplicatedToAllIndexNotMinus(){
+ doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
+ doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9);
+ doReturn(6L).when(replicatedLogEntry).getTerm();
+ doReturn(9L).when(replicatedLogEntry).getIndex();
+
+ // when replicatedToAllIndex != -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior).setReplicatedToAllIndex(9);
+ }
+
+
+ @Test
+ public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){
+ doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+ }
+
+ @Test
+ public void testPersistSendInstallSnapshot(){
+ doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+ // when replicatedToAllIndex = -1
+ boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ assertTrue(capture);
+
+ snapshotManager.create(mockProcedure);
+
+ byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+
+ snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ ArgumentCaptor<SendInstallSnapshot> sendInstallSnapshotArgumentCaptor
+ = ArgumentCaptor.forClass(SendInstallSnapshot.class);
+
+ verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture());
+
+ SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
+
+ assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray()));
+ }
+
+ @Test
+ public void testCallingPersistWithoutCaptureWillDoNothing(){
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+ }
+ @Test
+ public void testCallingPersistTwiceWillDoNoHarm(){
+ doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+ }
+
+ @Test
+ public void testCommit(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog).snapshotCommit();
+
+ verify(mockDataPersistenceProvider).deleteMessages(100L);
+
+ ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
+
+ verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
+
+ assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
+ // config snapShotBatchCount = 10
+ // therefore maxSequenceNumber = 90
+ }
+
+ @Test
+ public void testCommitBeforePersist(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ verify(mockDataPersistenceProvider, never()).deleteMessages(100L);
+
+ verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+ }
+
+ @Test
+ public void testCommitBeforeCapture(){
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong());
+
+ verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+ }
+
+ @Test
+ public void testCallingCommitMultipleTimesCausesNoHarm(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog, times(1)).snapshotCommit();
+
+ verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+
+ verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+ }
+
+ @Test
+ public void testRollback(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog).snapshotRollback();
+ }
+
+
+ @Test
+ public void testRollbackBeforePersist(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog, never()).snapshotRollback();
+ }
+
+ @Test
+ public void testRollbackBeforeCapture(){
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog, never()).snapshotRollback();
+ }
+
+ @Test
+ public void testCallingRollbackMultipleTimesCausesNoHarm(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.rollback();
+
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog, times(1)).snapshotRollback();
+ }
+
+ @Test
+ public void testTrimLog(){
+ ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(20L).when(mockRaftActorContext).getLastApplied();
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+ doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+ doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ snapshotManager.trimLog(10, mockRaftActorBehavior);
+
+ verify(mockReplicatedLog).snapshotPreCommit(10, 5);
+ verify(mockReplicatedLog).snapshotCommit();
+ }
+
+ @Test
+ public void testTrimLogAfterCapture(){
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertTrue(capture);
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(20L).when(mockRaftActorContext).getLastApplied();
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+ doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+ doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ snapshotManager.trimLog(10, mockRaftActorBehavior);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ }
+
+ @Test
+ public void testTrimLogAfterCaptureToInstall(){
+ boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9, "follower-1");
+
+ assertTrue(capture);
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(20L).when(mockRaftActorContext).getLastApplied();
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+ doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+ doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ snapshotManager.trimLog(10, mockRaftActorBehavior);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ }
+
+}
\ No newline at end of file
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
assertEquals("getTerm", 1001, reply.getTerm());
}
+ @Test
+ public void testCandidateSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+ MockRaftActorContext context = createActorContext();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ candidate = createBehavior(context);
+
+ MessageCollectorActor.expectFirstMatching(candidateActor, ElectionTimeout.class);
+
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+ }
@Override
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
follower = createBehavior(context);
- HashMap<String, String> followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
}
+
+ /**
+ * Verify that when an AppendEntries is sent to a follower during a snapshot install
+ * the Follower short-circuits the processing of the AppendEntries message.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
+ logStart("testReceivingAppendEntriesDuringInstallSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+
+ follower = createBehavior(context);
+
+ ByteString bsSnapshot = createSnapshot();
+ int snapshotLength = bsSnapshot.size();
+ int chunkSize = 50;
+ int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+ int lastIncludedIndex = 1;
+
+ // Check that snapshot installation is not in progress
+ assertNull(((Follower) follower).getSnapshotTracker());
+
+ // Make sure that we have more than 1 chunk to send
+ assertTrue(totalChunks > 1);
+
+ // Send an install snapshot with the first chunk to start the process of installing a snapshot
+ ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+ follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+ chunkData, 1, totalChunks));
+
+ // Check if snapshot installation is in progress now
+ assertNotNull(((Follower) follower).getSnapshotTracker());
+
+ // Send an append entry
+ AppendEntries appendEntries = mock(AppendEntries.class);
+ doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
+
+ follower.handleMessage(leaderActor, appendEntries);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+ assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+ assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+ // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
+ verify(appendEntries, never()).getPrevLogIndex();
+
+ }
+
@Test
public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
logStart("testInitialSyncUpWithHandleInstallSnapshot");
follower = createBehavior(context);
- HashMap<String, String> followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
follower = createBehavior(context);
- HashMap<String, String> followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
getNextChunk(bsSnapshot, 10, 50), 3, 3);
assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
}
+ @Test
+ public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+ MockRaftActorContext context = createActorContext();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ follower = createBehavior(context);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+ }
+
public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
int snapshotLength = bs.size();
int start = offset;
new MockRaftActorContext.MockPayload(data));
}
+ private ByteString createSnapshot(){
+ HashMap<String, String> followerSnapshot = new HashMap<>();
+ followerSnapshot.put("1", "A");
+ followerSnapshot.put("2", "B");
+ followerSnapshot.put("3", "C");
+
+ return toByteString(followerSnapshot);
+ }
+
@Override
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {
public class LeaderTest extends AbstractLeaderTest {
static final String FOLLOWER_ID = "follower";
+ public static final String LEADER_ID = "leader";
private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
new MockRaftActorContext.MockPayload("D"));
+ actorContext.getReplicatedLog().append(entry);
+
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
@Override
protected MockRaftActorContext createActorContext(ActorRef actorRef) {
- return createActorContext("leader", actorRef);
+ return createActorContext(LEADER_ID, actorRef);
}
private MockRaftActorContext createActorContextWithFollower() {
MockRaftActorContext leaderActorContext = createActorContext();
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+ Map<String, String> leaderPeerAddresses = new HashMap<>();
+ leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
- leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.setPeerAddresses(leaderPeerAddresses);
leaderActorContext.getReplicatedLog().removeFrom(0);
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
followerActorContext.setConfigParams(configParams);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * A message initiated internally from the RaftActor when some state of a leader has changed
+ *
+ * @author Thomas Pantelis
+ */
+public class LeaderStateChanged implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String memberId;
+ private final String leaderId;
+
+ public LeaderStateChanged(String memberId, String leaderId) {
+ this.memberId = memberId;
+ this.leaderId = leaderId;
+ }
+
+ public String getMemberId() {
+ return memberId;
+ }
+
+ public String getLeaderId() {
+ return leaderId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("LeaderStateChanged [memberId=").append(memberId).append(", leaderId=").append(leaderId)
+ .append("]");
+ return builder.toString();
+ }
+}
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
/**
- * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying
+ * The RoleChangeNotifier is responsible for receiving Raft role and leader state change messages and notifying
* the listeners (within the same node), which are registered with it.
* <p/>
* The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor.
*/
public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable {
- private String memberId;
- private Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
+ private final String memberId;
+ private final Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
private RoleChangeNotification latestRoleChangeNotification = null;
+ private LeaderStateChanged latestLeaderStateChanged;
public RoleChangeNotifier(String memberId) {
this.memberId = memberId;
getSender().tell(new RegisterRoleChangeListenerReply(), getSelf());
+ if(latestLeaderStateChanged != null) {
+ getSender().tell(latestLeaderStateChanged, getSelf());
+ }
+
if (latestRoleChangeNotification != null) {
getSender().tell(latestRoleChangeNotification, getSelf());
}
for (ActorRef listener: registeredListeners.values()) {
listener.tell(latestRoleChangeNotification, getSelf());
}
+ } else if (message instanceof LeaderStateChanged) {
+ latestLeaderStateChanged = (LeaderStateChanged)message;
+
+ for (ActorRef listener: registeredListeners.values()) {
+ listener.tell(latestLeaderStateChanged, getSelf());
+ }
}
}
public String getNewRole() {
return newRole;
}
+
+ @Override
+ public String toString() {
+ return "RoleChanged{" +
+ "memberId='" + memberId + '\'' +
+ ", oldRole='" + oldRole + '\'' +
+ ", newRole='" + newRole + '\'' +
+ '}';
+ }
}
# failing an operation (eg transaction create and change listener registration).
#shard-initialization-timeout-in-seconds=300
-# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
-#shard-journal-recovery-log-batch-size=5000
+# The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.
+#shard-journal-recovery-log-batch-size=1000
# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
#shard-snapshot-batch-count=20000
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
final long startTime = System.nanoTime();
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
new TransactionCommitFailedException(
"Can Commit failed, no detailed cause available."));
} else {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we can move on to the preCommit phase
doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
+ } else {
+ ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+ Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor);
}
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Boolean> canCommitFuture = cohort.canCommit();
- Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+ Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we can move on to the commit phase
doCommit(startTime, clientSubmitFuture, transaction, cohorts);
+ } else {
+ ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+ Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor);
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Void> preCommitFuture = cohort.preCommit();
- Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+ Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
// Not using Futures.allAsList here to avoid its internal overhead.
- final AtomicInteger remaining = new AtomicInteger(cohorts.size());
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
- if(remaining.decrementAndGet() == 0) {
+ if(!cohortIterator.hasNext()) {
// All cohorts completed successfully - we're done.
commitStatsTracker.addDuration(System.nanoTime() - startTime);
clientSubmitFuture.set();
+ } else {
+ ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+ Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor);
}
}
}
};
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- ListenableFuture<Void> commitFuture = cohort.commit();
- Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
- }
+ ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+ Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
}
private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
DataChangeScope scope) {
Future<Object> future = actorContext.executeOperationAsync(shard,
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ new RegisterChangeListener(path, dataChangeListenerActor, scope),
actorContext.getDatastoreContext().getShardInitializationTimeout());
future.onComplete(new OnComplete<Object>(){
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
* Coordinates persistence recovery on startup.
*/
private ShardRecoveryCoordinator recoveryCoordinator;
- private List<Object> currentLogRecoveryBatch;
private final DOMTransactionFactory transactionFactory;
private final String txnDispatcherPath;
- protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+ protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name.toString(), mapPeerAddresses(peerAddresses),
- Optional.of(datastoreContext.getShardRaftConfig()));
+ super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
this.name = name.toString();
this.datastoreContext = datastoreContext;
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+ recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
}
private void setTransactionCommitTimeout() {
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
}
- private static Map<String, String> mapPeerAddresses(
- final Map<ShardIdentifier, String> peerAddresses) {
- Map<String, String> map = new HashMap<>();
-
- for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
- .entrySet()) {
- map.put(entry.getKey().toString(), entry.getValue());
- }
-
- return map;
- }
-
public static Props props(final ShardIdentifier name,
- final Map<ShardIdentifier, String> peerAddresses,
+ final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
Preconditions.checkNotNull(name, "name should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
persistenceId(), listenerRegistration.path());
- getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf());
}
private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
@Override
protected
void startLogRecoveryBatch(final int maxBatchSize) {
- currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
- }
+ recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
}
@Override
protected void appendRecoveredLogEntry(final Payload data) {
- if(data instanceof ModificationPayload) {
- try {
- currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
- } catch (ClassNotFoundException | IOException e) {
- LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
- }
- } else if (data instanceof CompositeModificationPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
- } else if (data instanceof CompositeModificationByteStringPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
- } else {
- LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
- }
+ recoveryCoordinator.appendRecoveredLogPayload(data);
}
@Override
protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted recovery sbapshot", persistenceId());
- }
+ recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
}
@Override
protected void applyCurrentLogRecoveryBatch() {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
- currentLogRecoveryBatch.size());
- }
+ recoveryCoordinator.applyCurrentLogRecoveryBatch();
}
@Override
protected void onRecoveryComplete() {
- if(recoveryCoordinator != null) {
- Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
- }
-
- for(DOMStoreWriteTransaction tx: txList) {
- try {
- syncCommitTransaction(tx);
- shardMBean.incrementCommittedTransactionCount();
- } catch (InterruptedException | ExecutionException e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("{}: Failed to commit", persistenceId(), e);
- }
- }
- }
-
recoveryCoordinator = null;
- currentLogRecoveryBatch = null;
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
private static final long serialVersionUID = 1L;
final ShardIdentifier name;
- final Map<ShardIdentifier, String> peerAddresses;
+ final Map<String, String> peerAddresses;
final DatastoreContext datastoreContext;
final SchemaContext schemaContext;
- ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+ ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
this.name = name;
this.peerAddresses = peerAddresses;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Address;
+import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
*/
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
- private final Logger LOG = LoggerFactory.getLogger(getClass());
+ private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
// Stores a mapping between a member name and the address of the member
// Member names look like "member-1", "member-2" etc and are as specified
// A data store could be of type config/operational
private final String type;
+ private final String shardManagerIdentifierString;
+
private final ClusterWrapper cluster;
private final Configuration configuration;
this.datastoreContext = datastoreContext;
this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
this.type = datastoreContext.getDataStoreType();
+ this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
@Override
public void handleCommand(Object message) throws Exception {
- if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
- findPrimary(FindPrimary.fromSerializable(message));
+ if (message instanceof FindPrimary) {
+ findPrimary((FindPrimary)message);
} else if(message instanceof FindLocalShard){
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
onRoleChangeNotification((RoleChangeNotification) message);
} else if(message instanceof FollowerInitialSyncUpStatus){
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
- } else{
+ } else if(message instanceof ShardNotInitializedTimeout) {
+ onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
+ } else if(message instanceof LeaderStateChanged) {
+ onLeaderStateChanged((LeaderStateChanged)message);
+ } else {
unknownMessage(message);
}
}
+ private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+ LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
+
+ ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
+ if(shardInformation != null) {
+ shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+ if (isReadyWithLeaderId()) {
+ LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+ persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+ waitTillReadyCountdownLatch.countDown();
+ }
+
+ } else {
+ LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
+ }
+ }
+
+ private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+ ShardInformation shardInfo = message.getShardInfo();
+
+ LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
+ shardInfo.getShardName());
+
+ shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
+
+ if(!shardInfo.isShardInitialized()) {
+ LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+ message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
+ } else {
+ LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
+ message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+ }
+ }
+
private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
- LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(),
- status.isInitialSyncDone());
+ LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
+ status.getName(), status.isInitialSyncDone());
ShardInformation shardInformation = findShardInformation(status.getName());
}
private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
- LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
+ LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
roleChanged.getOldRole(), roleChanged.getNewRole());
ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
if(shardInformation != null) {
shardInformation.setRole(roleChanged.getNewRole());
- if (isReady()) {
- LOG.info("All Shards are ready - data store {} is ready, available count is {}", type,
- waitTillReadyCountdownLatch.getCount());
+ if (isReadyWithLeaderId()) {
+ LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+ persistenceId(), type, waitTillReadyCountdownLatch.getCount());
waitTillReadyCountdownLatch.countDown();
}
return null;
}
- private boolean isReady() {
+ private boolean isReadyWithLeaderId() {
boolean isReady = true;
for (ShardInformation info : localShards.values()) {
- if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){
+ if(!info.isShardReadyWithLeaderId()){
isReady = false;
break;
}
if (shardId.getShardName() == null) {
return;
}
+
markShardAsInitialized(shardId.getShardName());
}
private void markShardAsInitialized(String shardName) {
- LOG.debug("Initializing shard [{}]", shardName);
+ LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
+
ShardInformation shardInformation = localShards.get(shardName);
if (shardInformation != null) {
shardInformation.setActorInitialized();
+
+ shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
}
}
return;
}
- sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
@Override
public Object get() {
return new LocalShardFound(shardInformation.getActor());
});
}
- private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
- final Supplier<Object> messageSupplier) {
- if (!shardInformation.isShardInitialized()) {
- if(waitUntilInitialized) {
+ private void sendResponse(ShardInformation shardInformation, boolean doWait,
+ boolean wantShardReady, final Supplier<Object> messageSupplier) {
+ if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
+ if(doWait) {
final ActorRef sender = getSender();
final ActorRef self = self();
- shardInformation.addRunnableOnInitialized(new Runnable() {
+
+ Runnable replyRunnable = new Runnable() {
@Override
public void run() {
sender.tell(messageSupplier.get(), self);
}
- });
+ };
+
+ OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
+ new OnShardInitialized(replyRunnable);
+
+ shardInformation.addOnShardInitialized(onShardInitialized);
+
+ LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
+
+ Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
+ datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+ new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
+ getContext().dispatcher(), getSelf());
+
+ onShardInitialized.setTimeoutSchedule(timeoutSchedule);
+
+ } else if (!shardInformation.isShardInitialized()) {
+ LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
+ shardInformation.getShardName());
+ getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
} else {
- getSender().tell(new ActorNotInitialized(), getSelf());
+ LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
+ shardInformation.getShardName());
+ getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
}
return;
getSender().tell(messageSupplier.get(), getSelf());
}
+ private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+ return new NoShardLeaderException(String.format(
+ "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
+ "recovering and a leader is being elected. Try again later.", shardId));
+ }
+
+ private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+ return new NotInitializedException(String.format(
+ "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
+ }
+
private void memberRemoved(ClusterEvent.MemberRemoved message) {
+ String memberName = message.member().roles().head();
+
+ LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
memberNameToAddress.remove(message.member().roles().head());
}
private void memberUp(ClusterEvent.MemberUp message) {
String memberName = message.member().roles().head();
+ LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
memberNameToAddress.put(memberName, message.member().address());
for(ShardInformation info : localShards.values()){
String shardName = info.getShardName();
- info.updatePeerAddress(getShardIdentifier(memberName, shardName),
- getShardActorPath(shardName, memberName));
+ info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
+ getShardActorPath(shardName, memberName), getSelf());
}
}
LOG.debug("Sending new SchemaContext to Shards");
for (ShardInformation info : localShards.values()) {
if (info.getActor() == null) {
- info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
- info.getPeerAddresses(), datastoreContext, schemaContext)
- .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
+ info.setActor(newShardActor(schemaContext, info));
} else {
info.getActor().tell(message, getSelf());
}
- info.getActor().tell(new RegisterRoleChangeListener(), self());
}
}
}
+ @VisibleForTesting
+ protected ClusterWrapper getCluster() {
+ return cluster;
+ }
+
+ @VisibleForTesting
+ protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
+ return getContext().actorOf(Shard.props(info.getShardId(),
+ info.getPeerAddresses(), datastoreContext, schemaContext)
+ .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+ }
+
private void findPrimary(FindPrimary message) {
- String shardName = message.getShardName();
+ LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
+
+ final String shardName = message.getShardName();
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
if (info != null) {
- sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
+ sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
- return new PrimaryFound(info.getActorPath().toString()).toSerializable();
+ Object found = new PrimaryFound(info.getSerializedLeaderActor());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+ }
+
+ return found;
}
});
return;
}
- List<String> members = configuration.getMembersFromShardName(shardName);
+ for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
+ if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
+ String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
- if(cluster.getCurrentMemberName() != null) {
- members.remove(cluster.getCurrentMemberName());
- }
+ LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+ shardName, path);
- /**
- * FIXME: Instead of sending remote shard actor path back to sender,
- * forward FindPrimary message to remote shard manager
- */
- // There is no way for us to figure out the primary (for now) so assume
- // that one of the remote nodes is a primary
- for(String memberName : members) {
- Address address = memberNameToAddress.get(memberName);
- if(address != null){
- String path =
- getShardActorPath(shardName, memberName);
- getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+ getContext().actorSelection(path).forward(message, getContext());
return;
}
}
- getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+
+ LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
+
+ getSender().tell(new PrimaryNotFoundException(
+ String.format("No primary shard found for %s.", shardName)), getSelf());
+ }
+
+ private StringBuilder getShardManagerActorPathBuilder(Address address) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
+ return builder;
}
private String getShardActorPath(String shardName, String memberName) {
Address address = memberNameToAddress.get(memberName);
if(address != null) {
- StringBuilder builder = new StringBuilder();
- builder.append(address.toString())
- .append("/user/")
- .append(ShardManagerIdentifier.builder().type(type).build().toString())
- .append("/")
+ StringBuilder builder = getShardManagerActorPathBuilder(address);
+ builder.append("/")
.append(getShardIdentifier(memberName, shardName));
return builder.toString();
}
List<String> localShardActorNames = new ArrayList<>();
for(String shardName : memberShardNames){
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
- Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
+ Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
}
* @param shardName
* @return
*/
- private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
+ private Map<String, String> getPeerAddresses(String shardName){
- Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+ Map<String, String> peerAddresses = new HashMap<>();
- List<String> members =
- this.configuration.getMembersFromShardName(shardName);
+ List<String> members = this.configuration.getMembersFromShardName(shardName);
String currentMemberName = this.cluster.getCurrentMemberName();
for(String memberName : members){
if(!currentMemberName.equals(memberName)){
- ShardIdentifier shardId = getShardIdentifier(memberName,
- shardName);
- String path =
- getShardActorPath(shardName, currentMemberName);
- peerAddresses.put(shardId, path);
+ ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+ String path = getShardActorPath(shardName, currentMemberName);
+ peerAddresses.put(shardId.toString(), path);
}
}
return peerAddresses;
return mBean;
}
- private class ShardInformation {
+ @VisibleForTesting
+ protected static class ShardInformation {
private final ShardIdentifier shardId;
private final String shardName;
private ActorRef actor;
private ActorPath actorPath;
- private final Map<ShardIdentifier, String> peerAddresses;
+ private final Map<String, String> peerAddresses;
// flag that determines if the actor is ready for business
private boolean actorInitialized = false;
private boolean followerSyncStatus = false;
- private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+ private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
private String role ;
+ private String leaderId;
private ShardInformation(String shardName, ShardIdentifier shardId,
- Map<ShardIdentifier, String> peerAddresses) {
+ Map<String, String> peerAddresses) {
this.shardName = shardName;
this.shardId = shardId;
this.peerAddresses = peerAddresses;
return shardId;
}
- Map<ShardIdentifier, String> getPeerAddresses() {
+ Map<String, String> getPeerAddresses() {
return peerAddresses;
}
- void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+ void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
LOG.info("updatePeerAddress for peer {} with address {}", peerId,
peerAddress);
if(peerAddresses.containsKey(peerId)){
peerId, peerAddress, actor.path());
}
- actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
+ actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender);
}
+
+ notifyOnShardInitializedCallbacks();
}
}
+ boolean isShardReady() {
+ return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
+ }
+
+ boolean isShardReadyWithLeaderId() {
+ return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId));
+ }
+
boolean isShardInitialized() {
return getActor() != null && actorInitialized;
}
+ boolean isLeader() {
+ return Objects.equal(leaderId, shardId.toString());
+ }
+
+ String getSerializedLeaderActor() {
+ if(isLeader()) {
+ return Serialization.serializedActorPath(getActor());
+ } else {
+ return peerAddresses.get(leaderId);
+ }
+ }
+
void setActorInitialized() {
+ LOG.debug("Shard {} is initialized", shardId);
+
this.actorInitialized = true;
- for(Runnable runnable: runnablesOnInitialized) {
- runnable.run();
+ notifyOnShardInitializedCallbacks();
+ }
+
+ private void notifyOnShardInitializedCallbacks() {
+ if(onShardInitializedSet.isEmpty()) {
+ return;
}
- runnablesOnInitialized.clear();
+ boolean ready = isShardReadyWithLeaderId();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
+ ready ? "ready" : "initialized", onShardInitializedSet.size());
+ }
+
+ Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
+ while(iter.hasNext()) {
+ OnShardInitialized onShardInitialized = iter.next();
+ if(!(onShardInitialized instanceof OnShardReady) || ready) {
+ iter.remove();
+ onShardInitialized.getTimeoutSchedule().cancel();
+ onShardInitialized.getReplyRunnable().run();
+ }
+ }
}
- void addRunnableOnInitialized(Runnable runnable) {
- runnablesOnInitialized.add(runnable);
+ void addOnShardInitialized(OnShardInitialized onShardInitialized) {
+ onShardInitializedSet.add(onShardInitialized);
}
- public void setRole(String newRole) {
- this.role = newRole;
+ void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
+ onShardInitializedSet.remove(onShardInitialized);
}
- public String getRole(){
- return this.role;
+ void setRole(String newRole) {
+ this.role = newRole;
+
+ notifyOnShardInitializedCallbacks();
}
- public void setFollowerSyncStatus(boolean syncStatus){
+ void setFollowerSyncStatus(boolean syncStatus){
this.followerSyncStatus = syncStatus;
}
- public boolean isInSync(){
+ boolean isInSync(){
if(RaftState.Follower.name().equals(this.role)){
return followerSyncStatus;
} else if(RaftState.Leader.name().equals(this.role)){
return false;
}
+ void setLeaderId(String leaderId) {
+ this.leaderId = leaderId;
+
+ notifyOnShardInitializedCallbacks();
+ }
}
private static class ShardManagerCreator implements Creator<ShardManager> {
}
}
+ private static class OnShardInitialized {
+ private final Runnable replyRunnable;
+ private Cancellable timeoutSchedule;
+
+ OnShardInitialized(Runnable replyRunnable) {
+ this.replyRunnable = replyRunnable;
+ }
+
+ Runnable getReplyRunnable() {
+ return replyRunnable;
+ }
+
+ Cancellable getTimeoutSchedule() {
+ return timeoutSchedule;
+ }
+
+ void setTimeoutSchedule(Cancellable timeoutSchedule) {
+ this.timeoutSchedule = timeoutSchedule;
+ }
+ }
+
+ private static class OnShardReady extends OnShardInitialized {
+ OnShardReady(Runnable replyRunnable) {
+ super(replyRunnable);
+ }
+ }
+
+ private static class ShardNotInitializedTimeout {
+ private final ActorRef sender;
+ private final ShardInformation shardInfo;
+ private final OnShardInitialized onShardInitialized;
+
+ ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+ this.sender = sender;
+ this.shardInfo = shardInfo;
+ this.onShardInitialized = onShardInitialized;
+ }
+
+ ActorRef getSender() {
+ return sender;
+ }
+
+ ShardInformation getShardInfo() {
+ return shardInfo;
+ }
+
+ OnShardInitialized getOnShardInitialized() {
+ return onShardInitialized;
+ }
+ }
+
static class SchemaContextModules implements Serializable {
private static final long serialVersionUID = -8884620101025936590L;
package org.opendaylight.controller.cluster.datastore;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Collection;
-import java.util.Collections;
+import java.io.IOException;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
/**
*/
class ShardRecoveryCoordinator {
- private static final int TIME_OUT = 10;
-
- private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
- private final SchemaContext schemaContext;
+ private final InMemoryDOMDataStore store;
+ private List<ModificationPayload> currentLogRecoveryBatch;
private final String shardName;
- private final ExecutorService executor;
private final Logger log;
- private final String name;
- ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
- String name) {
- this.schemaContext = schemaContext;
+ ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
+ this.store = store;
this.shardName = shardName;
this.log = log;
- this.name = name;
-
- executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
}
- /**
- * Submits a batch of journal log entries.
- *
- * @param logEntries the serialized journal log entries
- * @param resultingTx the write Tx to which to apply the entries
- */
- void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
- LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
- resultingTxList.add(resultingTx);
- executor.execute(task);
- }
+ void startLogRecoveryBatch(int maxBatchSize) {
+ currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
- /**
- * Submits a snapshot.
- *
- * @param snapshotBytes the serialized snapshot
- * @param resultingTx the write Tx to which to apply the entries
- */
- void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
- SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
- resultingTxList.add(resultingTx);
- executor.execute(task);
+ log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
}
- Collection<DOMStoreWriteTransaction> getTransactions() {
- // Shutdown the executor and wait for task completion.
- executor.shutdown();
-
+ void appendRecoveredLogPayload(Payload payload) {
try {
- if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
- return resultingTxList;
+ if(payload instanceof ModificationPayload) {
+ currentLogRecoveryBatch.add((ModificationPayload) payload);
+ } else if (payload instanceof CompositeModificationPayload) {
+ currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+ ((CompositeModificationPayload) payload).getModification())));
+ } else if (payload instanceof CompositeModificationByteStringPayload) {
+ currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+ ((CompositeModificationByteStringPayload) payload).getModification())));
} else {
- log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
+ log.error("{}: Unknown payload {} received during recovery", shardName, payload);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ log.error("{}: Error extracting ModificationPayload", shardName, e);
}
- return Collections.emptyList();
}
- private static abstract class ShardRecoveryTask implements Runnable {
-
- final DOMStoreWriteTransaction resultingTx;
-
- ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
- this.resultingTx = resultingTx;
+ private void commitTransaction(DOMStoreWriteTransaction transaction) {
+ DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ try {
+ commitCohort.preCommit().get();
+ commitCohort.commit().get();
+ } catch (Exception e) {
+ log.error("{}: Failed to commit Tx on recovery", shardName, e);
}
}
- private class LogRecoveryTask extends ShardRecoveryTask {
-
- private final List<Object> logEntries;
-
- LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
- super(resultingTx);
- this.logEntries = logEntries;
- }
-
- @Override
- public void run() {
- for(int i = 0; i < logEntries.size(); i++) {
- MutableCompositeModification.fromSerializable(
- logEntries.get(i)).apply(resultingTx);
- // Null out to GC quicker.
- logEntries.set(i, null);
+ /**
+ * Applies the current batched log entries to the data store.
+ */
+ void applyCurrentLogRecoveryBatch() {
+ log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
+
+ DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+ for(ModificationPayload payload: currentLogRecoveryBatch) {
+ try {
+ MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
+ } catch (Exception e) {
+ log.error("{}: Error extracting ModificationPayload", shardName, e);
}
}
- }
- private class SnapshotRecoveryTask extends ShardRecoveryTask {
+ commitTransaction(writeTx);
+
+ currentLogRecoveryBatch = null;
+ }
- private final byte[] snapshotBytes;
+ /**
+ * Applies a recovered snapshot to the data store.
+ *
+ * @param snapshotBytes the serialized snapshot
+ */
+ void applyRecoveredSnapshot(final byte[] snapshotBytes) {
+ log.debug("{}: Applyng recovered sbapshot", shardName);
- SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
- super(resultingTx);
- this.snapshotBytes = snapshotBytes;
- }
+ DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
- @Override
- public void run() {
- NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+ NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
- // delete everything first
- resultingTx.delete(YangInstanceIdentifier.builder().build());
+ writeTx.write(YangInstanceIdentifier.builder().build(), node);
- // Add everything from the remote node back
- resultingTx.write(YangInstanceIdentifier.builder().build(), node);
- }
+ commitTransaction(writeTx);
}
}
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} finishCanCommit", transactionId);
}
- // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
- // their canCommit processing. If any one fails then we'll fail canCommit.
- Future<Iterable<Object>> combinedFuture =
- invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
+ // For empty transactions return immediately
+ if(cohorts.size() == 0){
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
+ }
+ returnFuture.set(Boolean.TRUE);
+ return;
+ }
- combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ final Object message = new CanCommitTransaction(transactionId).toSerializable();
+
+ final Iterator<ActorSelection> iterator = cohorts.iterator();
+
+ final OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
- if(failure != null) {
- if(LOG.isDebugEnabled()) {
+ public void onComplete(Throwable failure, Object response) throws Throwable {
+ if (failure != null) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
}
returnFuture.setException(failure);
}
boolean result = true;
- for(Object response: responses) {
- if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
- CanCommitTransactionReply reply =
- CanCommitTransactionReply.fromSerializable(response);
- if (!reply.getCanCommit()) {
- result = false;
- break;
- }
- } else {
- LOG.error("Unexpected response type {}", response.getClass());
- returnFuture.setException(new IllegalArgumentException(
- String.format("Unexpected response type %s", response.getClass())));
- return;
+ if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+ CanCommitTransactionReply reply =
+ CanCommitTransactionReply.fromSerializable(response);
+ if (!reply.getCanCommit()) {
+ result = false;
}
+ } else {
+ LOG.error("Unexpected response type {}", response.getClass());
+ returnFuture.setException(new IllegalArgumentException(
+ String.format("Unexpected response type %s", response.getClass())));
+ return;
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
+ if(iterator.hasNext() && result){
+ Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+ actorContext.getTransactionCommitOperationTimeout());
+ future.onComplete(this, actorContext.getClientDispatcher());
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+ }
+ returnFuture.set(Boolean.valueOf(result));
}
- returnFuture.set(Boolean.valueOf(result));
+
}
- }, actorContext.getClientDispatcher());
+ };
+
+ Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+ actorContext.getTransactionCommitOperationTimeout());
+ future.onComplete(onComplete, actorContext.getClientDispatcher());
}
private Future<Iterable<Object>> invokeCohorts(Object message) {
@Override
public void onComplete(Throwable failure, ActorSelection primaryShard) {
if(failure != null) {
- newTxFutureCallback.onComplete(failure, null);
+ newTxFutureCallback.createTransactionContext(failure, null);
} else {
newTxFutureCallback.setPrimaryShard(primaryShard);
}
if(transactionType == TransactionType.WRITE_ONLY &&
actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
- LOG.debug("Tx {} Primary shard found - creating WRITE_ONLY transaction context", identifier);
+ LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
+ identifier, primaryShard);
// For write-only Tx's we prepare the transaction modifications directly on the shard actor
// to avoid the overhead of creating a separate transaction actor.
* Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
- LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard);
+ }
Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
TransactionProxy.this.transactionType.ordinal(),
}
}
+ createTransactionContext(failure, response);
+ }
+
+ private void createTransactionContext(Throwable failure, Object response) {
// Mainly checking for state violation here to perform a volatile read of "initialized" to
// ensure updates to operationLimter et al are visible to this thread (ie we're doing
// "piggy-back" synchronization here).
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import java.io.Serializable;
-
-public class ActorNotInitialized implements Serializable {
- private static final long serialVersionUID = 1L;
-}
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
/**
* The FindPrimary message is used to locate the primary of any given shard
*
*/
-public class FindPrimary implements SerializableMessage{
- public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
+public class FindPrimary implements Serializable {
+ private static final long serialVersionUID = 1L;
private final String shardName;
- private final boolean waitUntilInitialized;
+ private final boolean waitUntilReady;
- public FindPrimary(String shardName, boolean waitUntilInitialized){
+ public FindPrimary(String shardName, boolean waitUntilReady){
Preconditions.checkNotNull(shardName, "shardName should not be null");
this.shardName = shardName;
- this.waitUntilInitialized = waitUntilInitialized;
+ this.waitUntilReady = waitUntilReady;
}
public String getShardName() {
return shardName;
}
- public boolean isWaitUntilInitialized() {
- return waitUntilInitialized;
+ public boolean isWaitUntilReady() {
+ return waitUntilReady;
}
@Override
- public Object toSerializable() {
- return this;
- }
-
- public static FindPrimary fromSerializable(Object message){
- return (FindPrimary) message;
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady)
+ .append("]");
+ return builder.toString();
}
}
package org.opendaylight.controller.cluster.datastore.messages;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
public class PeerAddressResolved {
- private final ShardIdentifier peerId;
+ private final String peerId;
private final String peerAddress;
- public PeerAddressResolved(ShardIdentifier peerId, String peerAddress) {
+ public PeerAddressResolved(String peerId, String peerAddress) {
this.peerId = peerId;
this.peerAddress = peerAddress;
}
- public ShardIdentifier getPeerId() {
+ public String getPeerId() {
return peerId;
}
package org.opendaylight.controller.cluster.datastore.messages;
+import java.io.Serializable;
-public class PrimaryFound implements SerializableMessage {
- public static final Class<PrimaryFound> SERIALIZABLE_CLASS = PrimaryFound.class;
- private final String primaryPath;
+public class PrimaryFound implements Serializable {
+ private static final long serialVersionUID = 1L;
- public PrimaryFound(final String primaryPath) {
- this.primaryPath = primaryPath;
- }
+ private final String primaryPath;
- public String getPrimaryPath() {
- return primaryPath;
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
+ public PrimaryFound(final String primaryPath) {
+ this.primaryPath = primaryPath;
}
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- PrimaryFound that = (PrimaryFound) o;
- if (!primaryPath.equals(that.primaryPath)) {
- return false;
+ public String getPrimaryPath() {
+ return primaryPath;
}
- return true;
- }
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
- @Override
- public int hashCode() {
- return primaryPath.hashCode();
- }
+ PrimaryFound that = (PrimaryFound) o;
- @Override
- public String toString() {
- return "PrimaryFound{" +
- "primaryPath='" + primaryPath + '\'' +
- '}';
- }
+ if (!primaryPath.equals(that.primaryPath)) {
+ return false;
+ }
+ return true;
+ }
- @Override
- public Object toSerializable() {
- return this;
- }
+ @Override
+ public int hashCode() {
+ return primaryPath.hashCode();
+ }
- public static PrimaryFound fromSerializable(final Object message){
- return (PrimaryFound) message;
- }
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]");
+ return builder.toString();
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import com.google.common.base.Preconditions;
-
-public class PrimaryNotFound implements SerializableMessage {
- public static final Class<PrimaryNotFound> SERIALIZABLE_CLASS = PrimaryNotFound.class;
-
- private final String shardName;
-
- public PrimaryNotFound(final String shardName){
-
- Preconditions.checkNotNull(shardName, "shardName should not be null");
-
- this.shardName = shardName;
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- PrimaryNotFound that = (PrimaryNotFound) o;
-
- if (shardName != null ? !shardName.equals(that.shardName) : that.shardName != null) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return shardName != null ? shardName.hashCode() : 0;
- }
-
- @Override
- public Object toSerializable() {
- return this;
- }
-
- public static PrimaryNotFound fromSerializable(final Object message){
- return (PrimaryNotFound) message;
- }
-}
package org.opendaylight.controller.cluster.datastore.messages;
import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.serialization.Serialization;
import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
ListenerRegistrationMessages.RegisterChangeListener.class;
private final YangInstanceIdentifier path;
- private final ActorPath dataChangeListenerPath;
+ private final ActorRef dataChangeListener;
private final AsyncDataBroker.DataChangeScope scope;
public RegisterChangeListener(YangInstanceIdentifier path,
- ActorPath dataChangeListenerPath,
+ ActorRef dataChangeListener,
AsyncDataBroker.DataChangeScope scope) {
this.path = path;
- this.dataChangeListenerPath = dataChangeListenerPath;
+ this.dataChangeListener = dataChangeListener;
this.scope = scope;
}
}
public ActorPath getDataChangeListenerPath() {
- return dataChangeListenerPath;
+ return dataChangeListener.path();
}
public ListenerRegistrationMessages.RegisterChangeListener toSerializable() {
return ListenerRegistrationMessages.RegisterChangeListener.newBuilder()
.setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path))
- .setDataChangeListenerActorPath(dataChangeListenerPath.toString())
+ .setDataChangeListenerActorPath(Serialization.serializedActorPath(dataChangeListener))
.setDataChangeScope(scope.ordinal()).build();
}
- public static RegisterChangeListener fromSerializable(ActorSystem actorSystem,Object serializable){
+ public static RegisterChangeListener fromSerializable(ActorSystem actorSystem, Object serializable){
ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable;
return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()),
- actorSystem.actorFor(o.getDataChangeListenerActorPath()).path(),
+ actorSystem.provider().resolveActorRef(o.getDataChangeListenerActorPath()),
AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]);
}
package org.opendaylight.controller.cluster.datastore.messages;
import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.serialization.Serialization;
import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
public class RegisterChangeListenerReply implements SerializableMessage{
public static final Class<ListenerRegistrationMessages.RegisterChangeListenerReply> SERIALIZABLE_CLASS =
ListenerRegistrationMessages.RegisterChangeListenerReply.class;
- private final ActorPath listenerRegistrationPath;
+ private final ActorRef listenerRegistration;
- public RegisterChangeListenerReply(final ActorPath listenerRegistrationPath) {
- this.listenerRegistrationPath = listenerRegistrationPath;
+ public RegisterChangeListenerReply(final ActorRef listenerRegistration) {
+ this.listenerRegistration = listenerRegistration;
}
public ActorPath getListenerRegistrationPath() {
- return listenerRegistrationPath;
+ return listenerRegistration.path();
}
@Override
public ListenerRegistrationMessages.RegisterChangeListenerReply toSerializable() {
return ListenerRegistrationMessages.RegisterChangeListenerReply.newBuilder()
- .setListenerRegistrationPath(listenerRegistrationPath.toString()).build();
+ .setListenerRegistrationPath(Serialization.serializedActorPath(listenerRegistration)).build();
}
public static RegisterChangeListenerReply fromSerializable(final ActorSystem actorSystem,final Object serializable){
ListenerRegistrationMessages.RegisterChangeListenerReply o = (ListenerRegistrationMessages.RegisterChangeListenerReply) serializable;
return new RegisterChangeListenerReply(
- actorSystem.actorFor(o.getListenerRegistrationPath()).path()
+ actorSystem.provider().resolveActorRef(o.getListenerRegistrationPath())
);
}
}
import akka.actor.PoisonPill;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
import com.codahale.metrics.JmxReporter;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
private Timeout transactionCommitOperationTimeout;
+ private Timeout shardInitializationTimeout;
private final Dispatchers dispatchers;
- private final Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
+ private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
this.dispatchers = new Dispatchers(actorSystem.dispatchers());
setCachedProperties();
- primaryShardActorSelectionCache = CacheBuilder.newBuilder()
- .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
- .build();
-
- operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
- operationTimeout = new Timeout(operationDuration);
- transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
- TimeUnit.SECONDS));
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
transactionCommitOperationTimeout = new Timeout(Duration.create(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
+
+ shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+
+ primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+ .build();
}
public DatastoreContext getDatastoreContext() {
return schemaContext;
}
- /**
- * Finds the primary shard for the given shard name
- *
- * @param shardName
- * @return
- */
- public Optional<ActorSelection> findPrimaryShard(String shardName) {
- String path = findPrimaryPathOrNull(shardName);
- if (path == null){
- return Optional.absent();
- }
- return Optional.of(actorSystem.actorSelection(path));
- }
-
public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
if(ret != null){
return ret;
}
Future<Object> future = executeOperationAsync(shardManager,
- new FindPrimary(shardName, true).toSerializable(),
- datastoreContext.getShardInitializationTimeout());
+ new FindPrimary(shardName, true), shardInitializationTimeout);
return future.transform(new Mapper<Object, ActorSelection>() {
@Override
public ActorSelection checkedApply(Object response) throws Exception {
- if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
- PrimaryFound found = PrimaryFound.fromSerializable(response);
+ if(response instanceof PrimaryFound) {
+ PrimaryFound found = (PrimaryFound)response;
LOG.debug("Primary found {}", found.getPrimaryPath());
ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
return actorSelection;
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found primary shard %s but it's not initialized yet. " +
- "Please try again later", shardName));
- } else if(response instanceof PrimaryNotFound) {
- throw new PrimaryNotFoundException(
- String.format("No primary shard found for %S.", shardName));
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
+ } else if(response instanceof PrimaryNotFoundException) {
+ throw (PrimaryNotFoundException)response;
+ } else if(response instanceof NoShardLeaderException) {
+ throw (NoShardLeaderException)response;
}
throw new UnknownMessageException(String.format(
*/
public Future<ActorRef> findLocalShardAsync( final String shardName) {
Future<Object> future = executeOperationAsync(shardManager,
- new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
+ new FindLocalShard(shardName, true), shardInitializationTimeout);
return future.map(new Mapper<Object, ActorRef>() {
@Override
LocalShardFound found = (LocalShardFound)response;
LOG.debug("Local shard found {}", found.getPath());
return found.getPath();
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found local shard for %s but it's not initialized yet.",
- shardName));
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
} else if(response instanceof LocalShardNotFound) {
throw new LocalShardNotFoundException(
String.format("Local shard for %s does not exist.", shardName));
}, getClientDispatcher());
}
- private String findPrimaryPathOrNull(String shardName) {
- Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
-
- if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
- PrimaryFound found = PrimaryFound.fromSerializable(result);
-
- LOG.debug("Primary found {}", found.getPrimaryPath());
- return found.getPrimaryPath();
-
- } else if (result.getClass().equals(ActorNotInitialized.class)){
- throw new NotInitializedException(
- String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
- );
-
- } else {
- return null;
- }
- }
-
-
/**
* Executes an operation on a local actor and wait for it's response
*
*
* @param message
*/
- public void broadcast(Object message){
- for(String shardName : configuration.getAllShardNames()){
-
- Optional<ActorSelection> primary = findPrimaryShard(shardName);
- if (primary.isPresent()) {
- primary.get().tell(message, ActorRef.noSender());
- } else {
- LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
- message.getClass().getSimpleName(), shardName);
- }
+ public void broadcast(final Object message){
+ for(final String shardName : configuration.getAllShardNames()){
+
+ Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
+ primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ @Override
+ public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ if(failure != null) {
+ LOG.warn("broadcast failed to send message {} to shard {}: {}",
+ message.getClass().getSimpleName(), shardName, failure);
+ } else {
+ primaryShard.tell(message, ActorRef.noSender());
+ }
+ }
+ }, getClientDispatcher());
}
}
}
leaf shard-journal-recovery-log-batch-size {
- default 5000;
+ default 1000;
type non-zero-uint32-type;
description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
}
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.testkit.TestActorRef;
import com.google.common.base.Function;
}
protected Props newShardProps() {
- return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return Shard.props(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT);
}
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT) {
@Override
protected void onRecoveryComplete() {
};
TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)), "testRecovery");
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testRecovery");
assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
*
* @author Thomas Pantelis
*/
-public class DOMConcurrentDataCommitCoordinatorTest {
+public class ConcurrentDOMDataBrokerTest {
private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class);
private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
*/
package org.opendaylight.controller.cluster.datastore;
-import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
Assert.assertEquals("getPath", path, registerMsg.getPath());
Assert.assertEquals("getScope", scope, registerMsg.getScope());
- reply(new RegisterChangeListenerReply(getRef().path()));
+ reply(new RegisterChangeListenerReply(getRef()));
for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
- reply(new ActorNotInitialized());
+ reply(new NotInitializedException("not initialized"));
new Within(duration("1 seconds")) {
@Override
@Override
public Future<Object> answer(InvocationOnMock invocation) {
proxy.close();
- return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path()));
+ return Futures.successful((Object)new RegisterChangeListenerReply(getRef()));
}
};
}};
}
- @Test
- public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
+ private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception {
new IntegrationTestKit(getSystem()) {{
String testName = "testTransactionWritesWithShardNotInitiallyReady";
String shardName = "test-1";
// Create the write Tx
- // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard.
- final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
+ final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
+ dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", writeTx);
// Do some modification operations and ready the Tx on a separate thread.
}
@Test
- public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
+ public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
+ datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+ testTransactionWritesWithShardNotInitiallyReady(true);
+ }
+
+ @Test
+ public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
+ testTransactionWritesWithShardNotInitiallyReady(false);
+ }
+
+ @Test
+ public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
new IntegrationTestKit(getSystem()) {{
String testName = "testTransactionReadsWithShardNotInitiallyReady";
String shardName = "test-1";
}};
}
- @Test(expected=NoShardLeaderException.class)
- public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
+ private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
new IntegrationTestKit(getSystem()) {{
String testName = "testTransactionCommitFailureWithNoShardLeader";
- String shardName = "test-1";
+ String shardName = "default";
// We don't want the shard to become the leader so prevent shard election from completing
// by setting the election timeout, which is based on the heartbeat interval, really high.
datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
+ datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
// Set the leader election timeout low for the test.
// Create the write Tx.
- final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
+ final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
+ dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", writeTx);
// Do some modifications and ready the Tx on a separate thread.
@Override
public void run() {
try {
- writeTx.write(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ writeTx.write(TestModel.JUNK_PATH,
+ ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
txCohort.set(writeTx.ready());
} catch(Exception e) {
}};
}
+ @Test(expected=NoShardLeaderException.class)
+ public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
+ datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+ testTransactionCommitFailureWithNoShardLeader(true);
+ }
+
+ @Test(expected=NoShardLeaderException.class)
+ public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
+ testTransactionCommitFailureWithNoShardLeader(false);
+ }
+
@Test
public void testTransactionAbort() throws Exception{
System.setProperty("shard.persistent", "true");
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.RaftState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
public class RoleChangeNotifierTest extends AbstractActorTest {
TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
- RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor();
-
notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor);
// no notification should be sent as listener has not yet registered
}};
}
+
+ @Test
+ public void testHandleLeaderStateChanged() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String actorId = "testHandleLeaderStateChanged";
+ TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+ getSystem(), RoleChangeNotifier.getProps(actorId), actorId);
+
+ notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1"), ActorRef.noSender());
+
+ // listener registers after the sate has been changed, ensure we sent the latest state change after a reply
+ notifierTestActorRef.tell(new RegisterRoleChangeListener(), getRef());
+
+ expectMsgClass(RegisterRoleChangeListenerReply.class);
+
+ LeaderStateChanged leaderStateChanged = expectMsgClass(LeaderStateChanged.class);
+ assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
+ assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId());
+
+ notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2"), ActorRef.noSender());
+
+ leaderStateChanged = expectMsgClass(LeaderStateChanged.class);
+ assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
+ assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId());
+ }};
+ }
}
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.AddressFromURIString;
import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.ConfigFactory;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
public class ShardManagerTest extends AbstractActorTest {
private static int ID_COUNTER = 1;
@Mock
private static CountDownLatch ready;
- private static ActorRef mockShardActor;
+ private static TestActorRef<MessageCollectorActor> mockShardActor;
+
+ private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
+ dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
+
+ private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+ String name = new ShardIdentifier(shardName, memberName,"config").toString();
+ return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
+ }
@Before
public void setUp() {
InMemoryJournal.clear();
if(mockShardActor == null) {
- String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
- mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
+ String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
+ mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
}
+
+ mockShardActor.underlyingActor().clear();
}
@After
}
private Props newShardMgrProps() {
- DatastoreContext.Builder builder = DatastoreContext.newBuilder();
- builder.dataStoreType(shardMrgIDSuffix);
return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
- builder.build(), ready);
+ datastoreContextBuilder.build(), ready);
+ }
+
+ private Props newPropsShardMgrWithMockShardActor() {
+ return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
+ new MockConfiguration());
+ }
+
+ private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
+ final ClusterWrapper clusterWrapper, final Configuration config) {
+ Creator<ShardManager> creator = new Creator<ShardManager>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public ShardManager create() throws Exception {
+ return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
+ ready, name, shardActor);
+ }
+ };
+
+ return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
}
@Test
public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("non-existent", false), getRef());
- expectMsgEquals(duration("5 seconds"),
- new PrimaryNotFound("non-existent").toSerializable());
+ expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
}};
}
@Test
- public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
+ public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
+
+ MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), mockShardActor);
- expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
}};
}
@Test
- public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
+ public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+ MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId1,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+ shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
- expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-2-shard-default"));
}};
}
@Test
- public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
+ public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
- shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- // We're passing waitUntilInitialized = true to FindPrimary so the response should be
- // delayed until we send ActorInitialized.
- Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
- new Timeout(5, TimeUnit.SECONDS));
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- Object resp = Await.result(future, duration("5 seconds"));
- assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+ expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
}};
}
@Test
- public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
+ public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindLocalShard("non-existent", false), getRef());
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
- LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- assertEquals("getShardName", "non-existent", notFound.getShardName());
+ expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+ shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
}};
}
@Test
- public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
+ public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // We're passing waitUntilInitialized = true to FindPrimary so the response should be
+ // delayed until we send ActorInitialized and RoleChangeNotification.
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
+
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
- LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId,
+ RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
- assertTrue("Found path contains " + found.getPath().path().toString(),
- found.getPath().path().toString().contains("member-1-shard-default-config"));
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+ shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+
+ expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
}};
}
@Test
- public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
+ public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
- shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
+
+ expectMsgClass(duration("2 seconds"), NotInitializedException.class);
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
- expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+ expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
}};
}
@Test
- public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
+ public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+ shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+ null, RaftState.Candidate.name()), mockShardActor);
- // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
- // delayed until we send ActorInitialized.
- Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
- new Timeout(5, TimeUnit.SECONDS));
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
+
+ expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- Object resp = Await.result(future, duration("5 seconds"));
- assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
+
+ expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
}};
}
@Test
- public void testOnReceiveMemberUp() throws Exception {
- new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
+ String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
- MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
+ // Create an ActorSystem ShardManager actor for member-1.
+
+ final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+ final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+ newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+ new MockConfiguration()), shardManagerID);
+
+ // Create an ActorSystem ShardManager actor for member-2.
+
+ final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+ Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
+ final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
- PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
- PrimaryFound.SERIALIZABLE_CLASS));
+ MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+ newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+ mockConfig2), shardManagerID);
+
+ new JavaTestKit(system1) {{
+
+ shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+ String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+ shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
+ shardManager2.tell(new RoleChangeNotification(memberId2,
+ RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+
+ shardManager1.underlyingActor().waitForMemberUp();
+
+ shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+ PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
String path = found.getPrimaryPath();
- assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
+ assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
+
+ shardManager2.underlyingActor().verifyFindPrimary();
+
+ Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ shardManager1.underlyingActor().waitForMemberRemoved();
+
+ shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+ expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
}};
+
+ JavaTestKit.shutdownActorSystem(system1);
+ JavaTestKit.shutdownActorSystem(system2);
}
@Test
- public void testOnReceiveMemberDown() throws Exception {
+ public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager.tell(new FindLocalShard("non-existent", false), getRef());
+
+ LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+ assertEquals("getShardName", "non-existent", notFound.getShardName());
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
- MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
- expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
- MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
+ assertTrue("Found path contains " + found.getPath().path().toString(),
+ found.getPath().path().toString().contains("member-1-shard-default-config"));
+ }};
+ }
- shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
+ @Test
+ public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
- expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
+ // delayed until we send ActorInitialized.
+ Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
+ new Timeout(5, TimeUnit.SECONDS));
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ Object resp = Await.result(future, duration("5 seconds"));
+ assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
}};
}
}
@Test
- public void testRoleChangeNotificationReleaseReady() throws Exception {
+ public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception {
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready);
- final TestActorRef<ShardManager> shardManager =
- TestActorRef.create(getSystem(), persistentProps);
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+ memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
+
+ verify(ready, never()).countDown();
- shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+ shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId));
verify(ready, times(1)).countDown();
}};
}
+ @Test
+ public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+ memberId, null, RaftState.Follower.name()));
+
+ verify(ready, never()).countDown();
+
+ shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
+
+ verify(ready, times(1)).countDown();
+
+ }};
+ }
+
+
@Test
public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready);
- final TestActorRef<ShardManager> shardManager =
- TestActorRef.create(getSystem(), persistentProps);
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
- shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+ "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
verify(ready, never()).countDown();
return delegate.create();
}
}
+
+ private static class ForwardingShardManager extends ShardManager {
+ private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
+ private CountDownLatch memberUpReceived = new CountDownLatch(1);
+ private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+ private final ActorRef shardActor;
+ private final String name;
+
+ protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
+ DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
+ ActorRef shardActor) {
+ super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+ this.shardActor = shardActor;
+ this.name = name;
+ }
+
+ @Override
+ public void handleCommand(Object message) throws Exception {
+ try{
+ super.handleCommand(message);
+ } finally {
+ if(message instanceof FindPrimary) {
+ findPrimaryMessageReceived.countDown();
+ } else if(message instanceof ClusterEvent.MemberUp) {
+ String role = ((ClusterEvent.MemberUp)message).member().roles().head();
+ if(!getCluster().getCurrentMemberName().equals(role)) {
+ memberUpReceived.countDown();
+ }
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
+ if(!getCluster().getCurrentMemberName().equals(role)) {
+ memberRemovedReceived.countDown();
+ }
+ }
+ }
+ }
+
+ @Override
+ public String persistenceId() {
+ return name;
+ }
+
+ @Override
+ protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ return shardActor;
+ }
+
+ void waitForMemberUp() {
+ assertEquals("MemberUp received", true,
+ Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
+ memberUpReceived = new CountDownLatch(1);
+ }
+
+ void waitForMemberRemoved() {
+ assertEquals("MemberRemoved received", true,
+ Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
+ memberRemovedReceived = new CountDownLatch(1);
+ }
+
+ void verifyFindPrimary() {
+ assertEquals("FindPrimary received", true,
+ Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
+ findPrimaryMessageReceived = new CountDownLatch(1);
+ }
+ }
}
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
"testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+ dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterChangeListenerReply.class);
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
+ // Use a non persistent provider because this test actually invokes persist on the journal
+ // this will cause all other messages to not be queued properly after that.
+ // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
+ // it does do a persist)
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
// Now send the RegisterChangeListener and wait for the reply.
- shard.tell(new RegisterChangeListener(path, dclActor.path(),
+ shard.tell(new RegisterChangeListener(path, dclActor,
AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
final CountDownLatch recoveryComplete = new CountDownLatch(1);
class TestShard extends Shard {
TestShard() {
- super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
+ super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
newDatastoreContext(), SCHEMA_CONTEXT);
}
Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
String address = "akka://foobar";
- shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
+ shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
assertEquals("getPeerAddresses", address,
((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT) {
@Override
protected boolean isLeader() {
// Use MBean for verification
// Committed transaction count should increase as usual
- assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+ assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
// Commit index should advance as we do not have an empty modification
assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
dataStoreContextBuilder.persistent(persistent);
+
+
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
- Creator<Shard> creator = new Creator<Shard>() {
- @Override
- public Shard create() throws Exception {
- return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
- DelegatingPersistentDataProvider delegating;
+ class TestShard extends Shard {
- @Override
- protected DataPersistenceProvider persistence() {
- if(delegating == null) {
- delegating = new DelegatingPersistentDataProvider(super.persistence());
- }
+ protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ super(name, peerAddresses, datastoreContext, schemaContext);
+ }
- return delegating;
- }
+ DelegatingPersistentDataProvider delegating;
- @Override
- protected void commitSnapshot(final long sequenceNumber) {
- super.commitSnapshot(sequenceNumber);
- latch.get().countDown();
- }
- };
+ protected DataPersistenceProvider persistence() {
+ if(delegating == null) {
+ delegating = new DelegatingPersistentDataProvider(super.persistence());
+ }
+ return delegating;
+ }
+
+ @Override
+ protected void commitSnapshot(final long sequenceNumber) {
+ super.commitSnapshot(sequenceNumber);
+ latch.get().countDown();
+ }
+
+ @Override
+ public RaftActorContext getRaftActorContext() {
+ return super.getRaftActorContext();
+ }
+ }
+
+ Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new TestShard(shardID, Collections.<String,String>emptyMap(),
+ newDatastoreContext(), SCHEMA_CONTEXT);
}
};
NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
- CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
- shard.tell(capture, getRef());
+ // Trigger creation of a snapshot by ensuring
+ RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
+ raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
latch.set(new CountDownLatch(1));
savedSnapshot.set(null);
- shard.tell(capture, getRef());
+ raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
final DatastoreContext persistentContext = DatastoreContext.newBuilder().
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
- final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
persistentContext, SCHEMA_CONTEXT);
final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
- final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
nonPersistentContext, SCHEMA_CONTEXT);
new ShardTestKit(getSystem()) {{
}
private ActorRef createShard(){
- return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<ShardIdentifier, String>emptyMap(), datastoreContext,
+ return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<String, String>emptyMap(), datastoreContext,
TestModel.createTestContext()));
}
private ActorRef createShard(){
return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
+ Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
}
private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
isA(requestType), any(Timeout.class));
+
+ doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
+ .when(actorContext).getTransactionCommitOperationTimeout();
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
+ Boolean actual = future.get(5, TimeUnit.SECONDS);
- verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+ assertEquals("canCommit", false, actual);
+
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
@Test(expected = TestException.class)
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
verifyCohortFutures(proxy, TestException.class);
}
- @Test
- public void testReadyWithInitialCreateTransactionFailure() throws Exception {
-
- doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
- mockActorContext).findPrimaryShardAsync(anyString());
+ private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
+ doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortFutures(proxy, PrimaryNotFoundException.class);
+ verifyCohortFutures(proxy, toThrow.getClass());
+ }
+
+ @Test
+ public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
+ testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
+ }
+
+ @Test
+ public void testWriteOnlyTxWithNotInitializedException() throws Exception {
+ testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
+ }
+
+ @Test
+ public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
+ testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
}
@Test
SchemaContext schemaContext = TestModel.createTestContext();
Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
shardName("inventory").type("config").build(),
- Collections.<ShardIdentifier,String>emptyMap(),
+ Collections.<String,String>emptyMap(),
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
SchemaContext schemaContext = TestModel.createTestContext();
Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
shardName("inventory").type("config").build(),
- Collections.<ShardIdentifier,String>emptyMap(),
+ Collections.<String,String>emptyMap(),
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static junit.framework.TestCase.assertEquals;
+import akka.actor.Actor;
+import akka.serialization.Serialization;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class RegisterChangeListenerReplyTest extends AbstractActorTest {
+
+ private TestActorFactory factory;
+
+
+ @Before
+ public void setUp(){
+ factory = new TestActorFactory(getSystem());
+ }
+
+ @After
+ public void shutDown(){
+ factory.close();
+ }
+
+ @Test
+ public void testToSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+
+ RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor);
+
+ ListenerRegistrationMessages.RegisterChangeListenerReply serialized
+ = registerChangeListenerReply.toSerializable();
+
+ assertEquals(Serialization.serializedActorPath(testActor), serialized.getListenerRegistrationPath());
+ }
+
+ @Test
+ public void testFromSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+
+ RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor);
+
+ ListenerRegistrationMessages.RegisterChangeListenerReply serialized
+ = registerChangeListenerReply.toSerializable();
+
+
+ RegisterChangeListenerReply fromSerialized
+ = RegisterChangeListenerReply.fromSerializable(getSystem(), serialized);
+
+ assertEquals(testActor.path().toString(), fromSerialized.getListenerRegistrationPath().toString());
+ }
+
+}
\ No newline at end of file
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static junit.framework.TestCase.assertEquals;
+import akka.actor.Actor;
+import akka.serialization.Serialization;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class RegisterChangeListenerTest extends AbstractActorTest {
+
+ private TestActorFactory factory;
+
+ @Before
+ public void setUp(){
+ factory = new TestActorFactory(getSystem());
+ }
+
+ @After
+ public void shutDown(){
+ factory.close();
+ }
+
+ @Test
+ public void testToSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+ RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
+ , AsyncDataBroker.DataChangeScope.BASE);
+
+ ListenerRegistrationMessages.RegisterChangeListener serialized
+ = registerChangeListener.toSerializable();
+
+ NormalizedNodeMessages.InstanceIdentifier path = serialized.getInstanceIdentifierPath();
+
+ assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0));
+ assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath());
+ assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope());
+
+ }
+
+ @Test
+ public void testFromSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+ RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
+ , AsyncDataBroker.DataChangeScope.SUBTREE);
+
+ ListenerRegistrationMessages.RegisterChangeListener serialized
+ = registerChangeListener.toSerializable();
+
+
+ RegisterChangeListener fromSerialized = RegisterChangeListener.fromSerializable(getSystem(), serialized);
+
+ assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath());
+ assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString());
+ assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope());
+
+
+ }
+}
\ No newline at end of file
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.time.StopWatch;
import org.junit.Assert;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
public class ActorContextTest extends AbstractActorTest{
+ static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
+
+ private static class TestMessage {
+ }
+
private static class MockShardManager extends UntypedActor {
private final boolean found;
private final ActorRef actorRef;
+ private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
private MockShardManager(boolean found, ActorRef actorRef){
}
@Override public void onReceive(Object message) throws Exception {
+ if(message instanceof FindPrimary) {
+ FindPrimary fp = (FindPrimary)message;
+ Object resp = findPrimaryResponses.get(fp.getShardName());
+ if(resp == null) {
+ log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
+ } else {
+ getSender().tell(resp, getSelf());
+ }
+
+ return;
+ }
+
if(found){
getSender().tell(new LocalShardFound(actorRef), getSelf());
} else {
}
}
+ void addFindPrimaryResp(String shardName, Object resp) {
+ findPrimaryResponses.put(shardName, resp);
+ }
+
private static Props props(final boolean found, final ActorRef actorRef){
return Props.create(new MockShardManagerCreator(found, actorRef) );
}
+ private static Props props(){
+ return Props.create(new MockShardManagerCreator() );
+ }
+
@SuppressWarnings("serial")
private static class MockShardManagerCreator implements Creator<MockShardManager> {
final boolean found;
final ActorRef actorRef;
+ MockShardManagerCreator() {
+ this.found = false;
+ this.actorRef = null;
+ }
+
MockShardManagerCreator(boolean found, ActorRef actorRef) {
this.found = found;
this.actorRef = actorRef;
@Test
public void testRateLimiting(){
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ transactionCreationInitialRateLimit(155L).build();
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
+ mock(Configuration.class), dataStoreContext);
// Check that the initial value is being picked up from DataStoreContext
- assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+ assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
actorContext.setTxCreationLimit(1.0);
@Test
public void testClientDispatcherIsGlobalDispatcher(){
-
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
+ mock(Configuration.class), DatastoreContext.newBuilder().build());
assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
@Test
public void testClientDispatcherIsNotGlobalDispatcher(){
-
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
ActorContext actorContext =
new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
+ mock(Configuration.class), DatastoreContext.newBuilder().build());
assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
+ mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
+ mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryNotFound("foobar"));
+ return Futures.successful((Object) new PrimaryNotFoundException("not found"));
}
};
Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
assertNull(cached);
-
}
@Test
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
+ mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new ActorNotInitialized());
+ return Futures.successful((Object) new NotInitializedException("not iniislized"));
}
};
Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
assertNull(cached);
+ }
+
+ @Test
+ public void testBroadcast() {
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
+ MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+ shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
+ shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
+ shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+ Configuration mockConfig = mock(Configuration.class);
+ doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
+ when(mockConfig).getAllShardNames();
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mockConfig,
+ DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+
+ actorContext.broadcast(new TestMessage());
+
+ expectFirstMatching(shardActorRef1, TestMessage.class);
+ expectFirstMatching(shardActorRef2, TestMessage.class);
+ }};
}
+ private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+ int count = 5000 / 50;
+ for(int i = 0; i < count; i++) {
+ try {
+ T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
+ if(message != null) {
+ return message;
+ }
+ } catch (Exception e) {}
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.fail("Did not receive message of type " + clazz);
+ return null;
+ }
}
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
-
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
* </p>
*/
public class MessageCollectorActor extends UntypedActor {
- private List<Object> messages = new ArrayList<>();
+ private final List<Object> messages = new ArrayList<>();
@Override public void onReceive(Object message) throws Exception {
if(message instanceof String){
}
}
+ public void clear() {
+ messages.clear();
+ }
+
public static List<Object> getAllMessages(ActorRef actor) throws Exception {
FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
Timeout operationTimeout = new Timeout(operationDuration);
return output;
}
+ public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+ int count = 5000 / 50;
+ for(int i = 0; i < count; i++) {
+ try {
+ T message = (T) getFirstMatching(actor, clazz);
+ if(message != null) {
+ return message;
+ }
+ } catch (Exception e) {}
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.fail("Did not receive message of type " + clazz);
+ return null;
+ }
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
-import com.google.common.base.Optional;
public class MockActorContext extends ActorContext {
return executeRemoteOperationResponse;
}
- @Override public Optional<ActorSelection> findPrimaryShard(String shardName) {
- return Optional.absent();
- }
-
public void setExecuteShardOperationResponse(Object response){
executeShardOperationResponse = response;
}
import akka.cluster.ClusterEvent;
import akka.cluster.MemberStatus;
import akka.cluster.UniqueAddress;
-import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import scala.collection.JavaConversions;
import java.util.HashSet;
import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import scala.collection.JavaConversions;
public class MockClusterWrapper implements ClusterWrapper{
private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550);
+ private String currentMemberName = "member-1";
+
+ public MockClusterWrapper() {
+ }
+
+ public MockClusterWrapper(String currentMemberName) {
+ this.currentMemberName = currentMemberName;
+ }
@Override
public void subscribeToMemberEvents(ActorRef actorRef) {
@Override
public String getCurrentMemberName() {
- return "member-1";
+ return currentMemberName;
}
@Override
package org.opendaylight.controller.cluster.datastore.utils;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
public class MockConfiguration implements Configuration{
- @Override public List<String> getMemberShardNames(final String memberName) {
- return Arrays.asList("default");
+ private Map<String, List<String>> shardMembers = ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ /*put("astronauts", Arrays.asList("member-2", "member-3")).*/build();
+
+ public MockConfiguration() {
+ }
+
+ public MockConfiguration(Map<String, List<String>> shardMembers) {
+ this.shardMembers = shardMembers;
}
- @Override public Optional<String> getModuleNameFromNameSpace(
+ @Override
+ public List<String> getMemberShardNames(final String memberName) {
+ return new ArrayList<>(shardMembers.keySet());
+ }
+ @Override
+ public Optional<String> getModuleNameFromNameSpace(
final String nameSpace) {
return Optional.absent();
}
return Arrays.asList("member-2", "member-3");
}
- return Collections.emptyList();
+ List<String> members = shardMembers.get(shardName);
+ return members != null ? members : Collections.<String>emptyList();
}
@Override public Set<String> getAllShardNames() {
public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
"test");
+ public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
+ "junk");
+
+
public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+ public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
node(OUTER_LIST_QNAME).build();
public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
+
+Member1 {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
+
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
+
+ akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ loglevel = "DEBUG"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2558
+ }
+ }
+
+ cluster {
+ auto-down-unreachable-after = 100s
+
+ roles = [
+ "member-1"
+ ]
+ }
+ }
+}
+
+Member2 {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
+
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
+
+ akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2559
+ }
+ }
+
+ cluster {
+ auto-down-unreachable-after = 100s
+
+ roles = [
+ "member-2"
+ ]
+ }
+ }
+}
// If no more sources, fail
if(requiredSources.isEmpty()) {
- handleSalInitializationFailure(new IllegalStateException(id + ": No more sources for schema context"), listener);
+ final IllegalStateException cause = new IllegalStateException(id + ": No more sources for schema context");
+ handleSalInitializationFailure(cause, listener);
+ salFacade.onDeviceFailed(cause);
return;
}
@Override
public void onComplete(final Throwable failure, final Object reply) throws Throwable {
if(failure != null) {
- LOG.error("InvokeRpc failed", failure);
+
+ // When we return a failure to the caller they can choose to log it if they like
+ // so here we just do basic warn logging by default and log the stack trace only when debug
+ // is enabled
+
+ LOG.warn("InvokeRpc failed rpc = {}, identifier = {}", rpcMsg.getRpc(), rpcMsg.getIdentifier());
+
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Detailed Error", failure);
+ }
final String message = String.format("Execution of RPC %s failed", rpcMsg.getRpc());
Collection<RpcError> errors = ((RpcErrorsException)failure).getRpcErrors();
}
Date revision = module.getRevision();
- Preconditions.checkState(!revisionsByNamespace.containsKey(revision),
- "Duplicate revision %s for namespace %s", revision, namespace);
IdentityMapping identityMapping = revisionsByNamespace.get(revision);
if(identityMapping == null) {
package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
import org.opendaylight.controller.config.yangjmxgenerator.PackageTranslator;
import org.opendaylight.yangtools.yang.model.api.IdentitySchemaNode;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.parser.builder.impl.ModuleIdentifierImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Override
public Set<Module> getModules() {
- return schemaContext.getModules();
+ final Set<Module> modules = Sets.newHashSet(schemaContext.getModules());
+ for (final Module module : schemaContext.getModules()) {
+ modules.addAll(module.getSubmodules());
+ }
+ return modules;
}
@Override
public String getModuleSource(final org.opendaylight.yangtools.yang.model.api.ModuleIdentifier moduleIdentifier) {
- return schemaContext.getModuleSource(moduleIdentifier).get();
+ final Optional<String> moduleSource = schemaContext.getModuleSource(moduleIdentifier);
+ if(moduleSource.isPresent()) {
+ return moduleSource.get();
+ } else {
+ try {
+ return Iterables.find(getModules(), new Predicate<Module>() {
+ @Override
+ public boolean apply(final Module input) {
+ final ModuleIdentifierImpl id = new ModuleIdentifierImpl(input.getName(), Optional.fromNullable(input.getNamespace()), Optional.fromNullable(input.getRevision()));
+ return id.equals(moduleIdentifier);
+ }
+ }).getSource();
+ } catch (final NoSuchElementException e) {
+ throw new IllegalArgumentException("Source for yang module " + moduleIdentifier + " not found", e);
+ }
+ }
}
@Override
package org.opendaylight.controller.netconf.mdsal.connector;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.Set;
return transformCapabilities(currentSchemaContext.getCurrentContext());
}
- static Set<Capability> transformCapabilities(final SchemaContext currentContext1) {
+ static Set<Capability> transformCapabilities(final SchemaContext currentContext) {
final Set<Capability> capabilities = new HashSet<>();
// [RFC6241] 8.3. Candidate Configuration Capability
capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:candidate:1.0"));
- final SchemaContext currentContext = currentContext1;
final Set<Module> modules = currentContext.getModules();
for (final Module module : modules) {
- if(currentContext.getModuleSource(module).isPresent()) {
- capabilities.add(new YangModuleCapability(module, currentContext.getModuleSource(module).get()));
- } else {
- LOG.warn("Missing source for module {}. This module will not be available from netconf server",
- module);
+ Optional<YangModuleCapability> cap = moduleToCapability(module);
+ if(cap.isPresent()) {
+ capabilities.add(cap.get());
+ }
+ for (final Module submodule : module.getSubmodules()) {
+ cap = moduleToCapability(submodule);
+ if(cap.isPresent()) {
+ capabilities.add(cap.get());
+ }
}
}
return capabilities;
}
+ private static Optional<YangModuleCapability> moduleToCapability(final Module module) {
+ final String source = module.getSource();
+ if(source !=null) {
+ return Optional.of(new YangModuleCapability(module, source));
+ } else {
+ LOG.warn("Missing source for module {}. This module will not be available from netconf server",
+ module);
+ }
+ return Optional.absent();
+ }
+
@Override
public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
return currentSchemaContext.registerCapabilityListener(listener);
package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
private static final XPathExpression sessionIdXPath = XMLNetconfUtil
.compileXPath("/netconf:hello/netconf:session-id");
+ private static final XPathExpression sessionIdXPathNoNamespace = XMLNetconfUtil
+ .compileXPath("/hello/session-id");
+
private static final String EXI_1_0_CAPABILITY_MARKER = "exi:1.0";
protected NetconfClientSessionNegotiator(final NetconfClientSessionPreferences sessionPreferences,
}
private long extractSessionId(final Document doc) {
- final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE);
- Preconditions.checkState(sessionIdNode != null, "");
- String textContent = sessionIdNode.getTextContent();
- if (textContent == null || textContent.equals("")) {
- throw new IllegalStateException("Session id not received from server");
+ String textContent = getSessionIdWithXPath(doc, sessionIdXPath);
+ if (Strings.isNullOrEmpty(textContent)) {
+ textContent = getSessionIdWithXPath(doc, sessionIdXPathNoNamespace);
+ if (Strings.isNullOrEmpty(textContent)) {
+ throw new IllegalStateException("Session id not received from server, hello message: " + XmlUtil.toString(doc));
+ }
}
return Long.valueOf(textContent);
}
+ private String getSessionIdWithXPath(final Document doc, final XPathExpression sessionIdXPath) {
+ final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE);
+ return sessionIdNode != null ? sessionIdNode.getTextContent() : null;
+ }
+
@Override
protected NetconfClientSession getSession(final NetconfClientSessionListener sessionListener, final Channel channel,
final NetconfHelloMessage message) throws NetconfDocumentedException {
@AfterClass
public static void tearDown() throws Exception {
hashedWheelTimer.stop();
- nettyGroup.shutdownGracefully().await();
+ nettyGroup.shutdownGracefully().await(5, TimeUnit.SECONDS);
minaTimerEx.shutdownNow();
nioExec.shutdownNow();
}
private static boolean isHelloMessage(final Document document) {
XmlElement element = XmlElement.fromDomElement(document.getDocumentElement());
try {
+ // accept even if hello has no namespace
return element.getName().equals(HELLO_TAG) &&
- element.hasNamespace() &&
- element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0);
+ (!element.hasNamespace() || element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0));
} catch (MissingNameSpaceException e) {
// Cannot happen, since we check for hasNamespace
throw new IllegalStateException(e);
package org.opendaylight.controller.netconf.util.messages;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import java.util.Collection;
import java.util.List;
public static Collection<String> extractCapabilitiesFromHello(Document doc) throws NetconfDocumentedException {
XmlElement responseElement = XmlElement.fromDomDocument(doc);
- XmlElement capabilitiesElement = responseElement
- .getOnlyChildElementWithSameNamespace(XmlNetconfConstants.CAPABILITIES);
- List<XmlElement> caps = capabilitiesElement.getChildElements(XmlNetconfConstants.CAPABILITY);
+ // Extract child element <capabilities> from <hello> with or without(fallback) the same namespace
+ Optional<XmlElement> capabilitiesElement = responseElement
+ .getOnlyChildElementWithSameNamespaceOptionally(XmlNetconfConstants.CAPABILITIES)
+ .or(responseElement
+ .getOnlyChildElementOptionally(XmlNetconfConstants.CAPABILITIES));
+
+ List<XmlElement> caps = capabilitiesElement.get().getChildElements(XmlNetconfConstants.CAPABILITY);
return Collections2.transform(caps, new Function<XmlElement, String>() {
@Override