<yang-ext.version>2013.09.07.7-SNAPSHOT</yang-ext.version>
<yang-jmx-generator.version>1.1.0-SNAPSHOT</yang-jmx-generator.version>
<yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
- <sshd-core.version>0.12.0</sshd-core.version>
+ <sshd-core.version>0.14.0</sshd-core.version>
<jmh.version>0.9.7</jmh.version>
<lmax.version>3.3.0</lmax.version>
</properties>
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class FollowerLogInformationImpl implements FollowerLogInformation {
- private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
- private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
-
private final String id;
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final RaftActorContext context;
- private volatile long nextIndex;
+ private long nextIndex;
- private volatile long matchIndex;
+ private long matchIndex;
private long lastReplicatedIndex = -1L;
}
@Override
- public long incrNextIndex(){
- return NEXT_INDEX_UPDATER.incrementAndGet(this);
+ public long incrNextIndex() {
+ return nextIndex++;
}
@Override
public long decrNextIndex() {
- return NEXT_INDEX_UPDATER.decrementAndGet(this);
+ return nextIndex--;
}
@Override
@Override
public long incrMatchIndex(){
- return MATCH_INDEX_UPDATER.incrementAndGet(this);
+ return matchIndex++;
}
@Override
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
public void apply(ApplyJournalEntries param) throws Exception {
}
};
+ private static final String COMMIT_SNAPSHOT = "commit_snapshot";
protected final Logger LOG = LoggerFactory.getLogger(getClass());
*/
private final RaftActorContextImpl context;
+ private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
+
/**
* The in-memory journal
*/
private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
- private CaptureSnapshot captureSnapshot = null;
-
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
persistenceId(), saveSnapshotFailure.cause());
- context.getReplicatedLog().snapshotRollback();
-
- LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- context.getReplicatedLog().size());
+ context.getSnapshotManager().rollback();
} else if (message instanceof CaptureSnapshot) {
LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
- if(captureSnapshot == null) {
- captureSnapshot = (CaptureSnapshot)message;
- createSnapshot();
- }
+ context.getSnapshotManager().create(createSnapshotProcedure);
- } else if (message instanceof CaptureSnapshotReply){
+ } else if (message instanceof CaptureSnapshotReply) {
handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
+ } else if (message.equals(COMMIT_SNAPSHOT)) {
+ commitSnapshot(-1);
} else {
reusableBehaviorStateHolder.init(currentBehavior);
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog.dataSize())
.inMemoryJournalLogSize(replicatedLog.size())
- .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+ .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
.lastApplied(context.getLastApplied())
.lastIndex(replicatedLog.lastIndex())
.lastTerm(replicatedLog.lastTerm())
// the state to durable storage
self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
- // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
- if(!context.isSnapshotCaptureInitiated()){
- raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
- raftContext.getTermInformation().getCurrentTerm());
- raftContext.getReplicatedLog().snapshotCommit();
- } else {
- LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
- persistenceId(), getId());
- }
+ context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+
} else if (clientActor != null) {
// Send message for replication
currentBehavior.handleMessage(getSelf(),
}
protected void commitSnapshot(long sequenceNumber) {
- context.getReplicatedLog().snapshotCommit();
-
- // TODO: Not sure if we want to be this aggressive with trimming stuff
- trimPersistentData(sequenceNumber);
+ context.getSnapshotManager().commit(persistence(), sequenceNumber);
}
/**
protected void onLeaderChanged(String oldLeader, String newLeader){};
- private void trimPersistentData(long sequenceNumber) {
- // Trim akka snapshots
- // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
- // For now guessing that it is ANDed.
- persistence().deleteSnapshots(new SnapshotSelectionCriteria(
- sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
-
- // Trim akka journal
- persistence().deleteMessages(sequenceNumber);
- }
-
private String getLeaderAddress(){
if(isLeader()){
return getSelf().path().toString();
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
- // create a snapshot object from the state provided and save it
- // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
- Snapshot sn = Snapshot.create(snapshotBytes,
- context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
- captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
- captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
- persistence().saveSnapshot(sn);
-
- LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
-
- long dataThreshold = getTotalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- if (context.getReplicatedLog().dataSize() > dataThreshold) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
- persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
- captureSnapshot.getLastAppliedIndex());
- }
-
- // if memory is less, clear the log based on lastApplied.
- // this could/should only happen if one of the followers is down
- // as normally we keep removing from the log when its replicated to all.
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());
-
- // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
- // install snapshot to a follower.
- if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- }
- } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
- // clear the log based on replicatedToAllIndex
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
- captureSnapshot.getReplicatedToAllTerm());
-
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- } else {
- // The replicatedToAllIndex was not found in the log
- // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
- // In this scenario we may need to save the snapshot to the akka persistence
- // snapshot for recovery but we do not need to do the replicated log trimming.
- context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm());
- }
-
-
- LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
- "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm());
-
- if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
- // this would be call straight to the leader and won't initiate in serialization
- currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
- ByteString.copyFrom(snapshotBytes)));
- }
-
- captureSnapshot = null;
- context.setSnapshotCaptureInitiated(false);
+ context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
}
protected long getTotalMemory() {
}
private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-
private static final int DATA_SIZE_DIVIDER = 5;
- private long dataSizeSinceLastSnapshot = 0;
+ private long dataSizeSinceLastSnapshot = 0L;
+
public ReplicatedLogImpl(Snapshot snapshot) {
super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
long dataSizeForCheck = dataSize;
dataSizeSinceLastSnapshot += logEntrySize;
- long journalSize = lastIndex() + 1;
- if(!hasFollowers()) {
+ if (!hasFollowers()) {
// When we do not have followers we do not maintain an in-memory log
// due to this the journalSize will never become anything close to the
// snapshot batch count. In fact will mostly be 1.
// as if we were maintaining a real snapshot
dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
}
-
+ long journalSize = replicatedLogEntry.getIndex() + 1;
long dataThreshold = getTotalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
- // when a snaphsot is being taken, captureSnapshot != null
- if (!context.isSnapshotCaptureInitiated() &&
- ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
- dataSizeForCheck > dataThreshold)) {
+ context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- dataSizeSinceLastSnapshot = 0;
+ if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+ || dataSizeForCheck > dataThreshold)) {
- LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
- " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+ boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+ currentBehavior.getReplicatedToAllIndex());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
- if (!hasFollowers()) {
- lastAppliedIndex = replicatedLogEntry.getIndex();
- lastAppliedTerm = replicatedLogEntry.getTerm();
- } else if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
- LOG.debug("{}: Snapshot Capture lastApplied:{} ",
- persistenceId(), context.getLastApplied());
- LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
- lastAppliedIndex);
- LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
- lastAppliedTerm);
+ if(started){
+ dataSizeSinceLastSnapshot = 0;
}
- // send a CaptureSnapshot to self to make the expensive operation async.
- long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
- getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
- null);
- context.setSnapshotCaptureInitiated(true);
}
+
if (callback != null){
callback.apply(replicatedLogEntry);
}
@Override
public void saveSnapshot(Object o) {
// Make saving Snapshot successful
- commitSnapshot(-1L);
+ // Committing the snapshot here would end up calling commit in the creating state which would
+ // be a state violation. That's why now we send a message to commit the snapshot.
+ self().tell(COMMIT_SNAPSHOT, self());
+ }
+ }
+
+
+ private class CreateSnapshotProcedure implements Procedure<Void> {
+
+ @Override
+ public void apply(Void aVoid) throws Exception {
+ createSnapshot();
}
}
*/
ConfigParams getConfigParams();
- void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
-
- boolean isSnapshotCaptureInitiated();
+ SnapshotManager getSnapshotManager();
}
private boolean snapshotCaptureInitiated;
+ // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
+ // be passed to it in the constructor
+ private SnapshotManager snapshotManager;
+
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
String id,
ElectionTerm termInformation, long commitIndex,
return configParams;
}
- @Override
- public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
- this.snapshotCaptureInitiated = snapshotCaptureInitiated;
- }
-
- @Override
- public boolean isSnapshotCaptureInitiated() {
- return snapshotCaptureInitiated;
- }
-
@Override public void addToPeers(String name, String address) {
peerAddresses.put(name, address);
}
peerAddresses.put(peerId, peerAddress);
}
+
+ public SnapshotManager getSnapshotManager() {
+ if(snapshotManager == null){
+ snapshotManager = new SnapshotManager(this, LOG);
+ }
+ return snapshotManager;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+public class SnapshotManager implements SnapshotState {
+
+
+ private final SnapshotState IDLE = new Idle();
+ private final SnapshotState CAPTURING = new Capturing();
+ private final SnapshotState PERSISTING = new Persisting();
+ private final SnapshotState CREATING = new Creating();
+
+ private final Logger LOG;
+ private final RaftActorContext context;
+ private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
+ new LastAppliedTermInformationReader();
+ private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader =
+ new ReplicatedToAllTermInformationReader();
+
+
+ private SnapshotState currentState = IDLE;
+ private CaptureSnapshot captureSnapshot;
+
+ public SnapshotManager(RaftActorContext context, Logger logger) {
+ this.context = context;
+ this.LOG = logger;
+ }
+
+ @Override
+ public boolean isCapturing() {
+ return currentState.isCapturing();
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower);
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ return currentState.capture(lastLogEntry, replicatedToAllIndex);
+ }
+
+ @Override
+ public void create(Procedure<Void> callback) {
+ currentState.create(callback);
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
+ }
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ currentState.commit(persistenceProvider, sequenceNumber);
+ }
+
+ @Override
+ public void rollback() {
+ currentState.rollback();
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ return currentState.trimLog(desiredTrimIndex, currentBehavior);
+ }
+
+ private boolean hasFollowers(){
+ return context.getPeerAddresses().keySet().size() > 0;
+ }
+
+ private String persistenceId(){
+ return context.getId();
+ }
+
+ private class AbstractSnapshotState implements SnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return false;
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ LOG.debug("capture should not be called in state {}", this);
+ return false;
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ LOG.debug("captureToInstall should not be called in state {}", this);
+ return false;
+ }
+
+ @Override
+ public void create(Procedure<Void> callback) {
+ LOG.debug("create should not be called in state {}", this);
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ LOG.debug("persist should not be called in state {}", this);
+ }
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ LOG.debug("commit should not be called in state {}", this);
+ }
+
+ @Override
+ public void rollback() {
+ LOG.debug("rollback should not be called in state {}", this);
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ LOG.debug("trimLog should not be called in state {}", this);
+ return -1;
+ }
+
+ protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
+ // we would want to keep the lastApplied as its used while capturing snapshots
+ long lastApplied = context.getLastApplied();
+ long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
+ persistenceId(), desiredTrimIndex, lastApplied, tempMin);
+ }
+
+ if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
+ LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
+ context.getTermInformation().getCurrentTerm());
+
+ //use the term of the temp-min, since we check for isPresent, entry will not be null
+ ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+ context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
+ context.getReplicatedLog().snapshotCommit();
+ return tempMin;
+ } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+ // It's possible a follower was lagging and an install snapshot advanced its match index past
+ // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
+ // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
+ // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
+ // trim the log to the last applied index even if previous entries weren't replicated to all followers.
+ currentBehavior.setReplicatedToAllIndex(tempMin);
+ }
+ return -1;
+ }
+ }
+
+ private class Idle extends AbstractSnapshotState {
+
+ private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ TermInformationReader lastAppliedTermInfoReader =
+ lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
+ lastLogEntry, hasFollowers());
+
+ long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
+ long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
+
+ TermInformationReader replicatedToAllTermInfoReader =
+ replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
+
+ long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
+ long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
+
+ // send a CaptureSnapshot to self to make the expensive operation async.
+ captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
+ lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
+ newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+
+ SnapshotManager.this.currentState = CAPTURING;
+
+ if(targetFollower != null){
+ LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
+ } else {
+ LOG.info("{}: Initiating snapshot capture {} to install on {}",
+ persistenceId(), captureSnapshot, targetFollower);
+ }
+
+ context.getActor().tell(captureSnapshot, context.getActor());
+
+ return true;
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ return capture(lastLogEntry, replicatedToAllIndex, null);
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
+ }
+
+ @Override
+ public String toString() {
+ return "Idle";
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ return doTrimLog(desiredTrimIndex, currentBehavior);
+ }
+ }
+
+ private class Capturing extends AbstractSnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return true;
+ }
+
+ @Override
+ public void create(Procedure<Void> callback) {
+ try {
+ callback.apply(null);
+ SnapshotManager.this.currentState = CREATING;
+ } catch (Exception e) {
+ LOG.error("Unexpected error occurred", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Capturing";
+ }
+
+ }
+
+ private class Creating extends AbstractSnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return true;
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ // create a snapshot object from the state provided and save it
+ // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+ Snapshot sn = Snapshot.create(snapshotBytes,
+ context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+ captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+ captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+ persistenceProvider.saveSnapshot(sn);
+
+ LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
+
+ long dataThreshold = totalMemory *
+ context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+ if (context.getReplicatedLog().dataSize() > dataThreshold) {
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
+ persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
+ captureSnapshot.getLastAppliedIndex());
+ }
+
+ // if memory is less, clear the log based on lastApplied.
+ // this could/should only happen if one of the followers is down
+ // as normally we keep removing from the log when its replicated to all.
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
+ // install snapshot to a follower.
+ if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+ currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ }
+
+ } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
+ // clear the log based on replicatedToAllIndex
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+ captureSnapshot.getReplicatedToAllTerm());
+
+ currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ } else {
+ // The replicatedToAllIndex was not found in the log
+ // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+ // In this scenario we may need to save the snapshot to the akka persistence
+ // snapshot for recovery but we do not need to do the replicated log trimming.
+ context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm());
+ }
+
+ LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+ "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ if (context.getId().equals(currentBehavior.getLeaderId())
+ && captureSnapshot.isInstallSnapshotInitiated()) {
+ // this would be call straight to the leader and won't initiate in serialization
+ currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(
+ ByteString.copyFrom(snapshotBytes)));
+ }
+
+ captureSnapshot = null;
+ SnapshotManager.this.currentState = PERSISTING;
+ }
+
+ @Override
+ public String toString() {
+ return "Creating";
+ }
+
+ }
+
+ private class Persisting extends AbstractSnapshotState {
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ context.getReplicatedLog().snapshotCommit();
+ persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+ sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+
+ persistenceProvider.deleteMessages(sequenceNumber);
+
+ SnapshotManager.this.currentState = IDLE;
+ }
+
+ @Override
+ public void rollback() {
+ context.getReplicatedLog().snapshotRollback();
+
+ LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
+ "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().size());
+
+ SnapshotManager.this.currentState = IDLE;
+ }
+
+ @Override
+ public String toString() {
+ return "Persisting";
+ }
+
+ }
+
+ private static interface TermInformationReader {
+ long getIndex();
+ long getTerm();
+ }
+
+ private static class LastAppliedTermInformationReader implements TermInformationReader{
+ private long index;
+ private long term;
+
+ public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex,
+ ReplicatedLogEntry lastLogEntry, boolean hasFollowers){
+ ReplicatedLogEntry entry = log.get(originalIndex);
+ this.index = -1L;
+ this.term = -1L;
+ if (!hasFollowers) {
+ if(lastLogEntry != null) {
+ index = lastLogEntry.getIndex();
+ term = lastLogEntry.getTerm();
+ }
+ } else if (entry != null) {
+ index = entry.getIndex();
+ term = entry.getTerm();
+ } else if(log.getSnapshotIndex() > -1){
+ index = log.getSnapshotIndex();
+ term = log.getSnapshotTerm();
+ }
+ return this;
+ }
+
+ @Override
+ public long getIndex(){
+ return this.index;
+ }
+
+ @Override
+ public long getTerm(){
+ return this.term;
+ }
+ }
+
+ private static class ReplicatedToAllTermInformationReader implements TermInformationReader{
+ private long index;
+ private long term;
+
+ ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){
+ ReplicatedLogEntry entry = log.get(originalIndex);
+ this.index = -1L;
+ this.term = -1L;
+
+ if (entry != null) {
+ index = entry.getIndex();
+ term = entry.getTerm();
+ }
+
+ return this;
+ }
+
+ @Override
+ public long getIndex(){
+ return this.index;
+ }
+
+ @Override
+ public long getTerm(){
+ return this.term;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+public interface SnapshotState {
+ /**
+ * Should return true when a snapshot is being captured
+ * @return
+ */
+ boolean isCapturing();
+
+ /**
+ * Initiate capture snapshot
+ *
+ * @param lastLogEntry the last entry in the replicated log
+ * @param replicatedToAllIndex the current replicatedToAllIndex
+ *
+ * @return true if capture was started
+ */
+ boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+
+ /**
+ * Initiate capture snapshot for the purposing of installing that snapshot
+ *
+ * @param lastLogEntry
+ * @param replicatedToAllIndex
+ * @param targetFollower
+ *
+ * @return true if capture was started
+ */
+ boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
+
+ /**
+ * Create the snapshot
+ *
+ * @param callback a procedure to be called which should create the snapshot
+ */
+ void create(Procedure<Void> callback);
+
+ /**
+ * Persist the snapshot
+ *
+ * @param persistenceProvider
+ * @param snapshotBytes
+ * @param currentBehavior
+ * @param totalMemory
+ */
+ void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior
+ ,long totalMemory);
+
+ /**
+ * Commit the snapshot by trimming the log
+ *
+ * @param persistenceProvider
+ * @param sequenceNumber
+ */
+ void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber);
+
+ /**
+ * Rollback the snapshot
+ */
+ void rollback();
+
+ /**
+ * Trim the log
+ *
+ * @param desiredTrimIndex
+ * @return the actual trim index
+ */
+ long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior);
+}
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
applyLogToStateMachine(context.getCommitIndex());
}
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
purgeInMemoryLog();
}
followerToSnapshot.markSendStatus(false);
}
- if (wasLastChunk && !context.isSnapshotCaptureInitiated()) {
+ if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
// Since the follower is now caught up try to purge the log.
purgeInMemoryLog();
} else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
sendAppendEntries = true;
}
} else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
+ leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
// if the followers next index is not present in the leaders log, and
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
sendSnapshotChunk(followerActor, followerId);
- } else if (!context.isSnapshotCaptureInitiated()) {
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
- }
-
- boolean isInstallSnapshotInitiated = true;
- long replicatedToAllIndex = super.getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-
- CaptureSnapshot captureSnapshot = new CaptureSnapshot(
- lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
- isInstallSnapshotInitiated);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId,
- captureSnapshot);
- }
-
- actor().tell(captureSnapshot, actor());
- context.setSnapshotCaptureInitiated(true);
+ } else {
+ context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
}
}
}
*/
public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
+ protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout();
+
/**
* Information about the RaftActor whose behavior this class represents
*/
// message is sent to itself
electionCancel =
context.getActorSystem().scheduler().scheduleOnce(interval,
- context.getActor(), new ElectionTimeout(),
+ context.getActor(), ELECTION_TIMEOUT,
context.getActorSystem().dispatcher(), context.getActor());
}
* @param snapshotCapturedIndex
*/
protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
- // we would want to keep the lastApplied as its used while capturing snapshots
- long lastApplied = context.getLastApplied();
- long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
-
- if(LOG.isTraceEnabled()) {
- LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}",
- logName, snapshotCapturedIndex, lastApplied, tempMin);
- }
+ long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
- if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
- LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
- context.getTermInformation().getCurrentTerm());
-
- //use the term of the temp-min, since we check for isPresent, entry will not be null
- ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
- context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
- context.getReplicatedLog().snapshotCommit();
- setReplicatedToAllIndex(tempMin);
- } else if(tempMin > getReplicatedToAllIndex()) {
- // It's possible a follower was lagging and an install snapshot advanced its match index past
- // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
- // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
- // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
- // trim the log to the last applied index even if previous entries weren't replicated to all followers.
- setReplicatedToAllIndex(tempMin);
+ if(actualIndex != -1){
+ setReplicatedToAllIndex(actualIndex);
}
}
votesRequired = getMajorityVoteCount(peers.size());
startNewTerm();
- scheduleElection(electionDuration());
+
+ if(context.getPeerAddresses().isEmpty()){
+ actor().tell(ELECTION_TIMEOUT, actor());
+ } else {
+ scheduleElection(electionDuration());
+ }
+
+
}
@Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
public Follower(RaftActorContext context) {
super(context, RaftState.Follower);
- scheduleElection(electionDuration());
-
initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
+
+ if(context.getPeerAddresses().isEmpty()){
+ actor().tell(ELECTION_TIMEOUT, actor());
+ } else {
+ scheduleElection(electionDuration());
+ }
+
}
private boolean isLogEntryPresent(long index){
sender.tell(reply, actor());
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
}
assertEquals("ReplicatedLogEntry getIndex", expIndex, replicatedLogEntry.getIndex());
assertEquals("ReplicatedLogEntry getData", payload, replicatedLogEntry.getData());
}
+
+ protected String testActorPath(String id){
+ return "akka://test/user" + id;
+ }
}
private Map<String, String> peerAddresses = new HashMap<>();
private ConfigParams configParams;
private boolean snapshotCaptureInitiated;
+ private SnapshotManager snapshotManager;
public MockRaftActorContext(){
electionTerm = new ElectionTerm() {
}
@Override
- public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
- this.snapshotCaptureInitiated = snapshotCaptureInitiated;
- }
-
- @Override
- public boolean isSnapshotCaptureInitiated() {
- return snapshotCaptureInitiated;
+ public SnapshotManager getSnapshotManager() {
+ if(this.snapshotManager == null){
+ this.snapshotManager = new SnapshotManager(this, getLogger());
+ }
+ return this.snapshotManager;
}
public void setConfigParams(ConfigParams configParams) {
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
}
}
+
+ public void waitUntilLeader(){
+ for(int i = 0;i < 10; i++){
+ if(isLeader()){
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
public List<Object> getState() {
return state;
}
return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
}
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ Optional<ConfigParams> config, ActorRef roleChangeNotifier,
+ DataPersistenceProvider dataPersistenceProvider){
+ return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
+ }
+
+
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
delegate.applyState(clientActor, identifier, data);
LOG.info("{}: applyState called", persistenceId());
mockRaftActor.waitForInitializeBehaviorComplete();
+ mockRaftActor.waitUntilLeader();
+
mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
- verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
+ verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
}
};
}
mockRaftActor.waitForInitializeBehaviorComplete();
+ mockRaftActor.waitUntilLeader();
+
mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
- verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
+ verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
}
new MockRaftActorContext.MockPayload("C"),
new MockRaftActorContext.MockPayload("D")));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
-
RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
+ raftActorContext.getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
+ new MockRaftActorContext.MockPayload("D")), -1);
+
mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+ ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
mockRaftActor.waitForInitializeBehaviorComplete();
+ MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
- mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
+ mockRaftActor.getReplicatedLog().append(lastEntry);
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
long replicatedToAllIndex = 1;
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
+
+ mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
verify(mockRaftActor.delegate).createSnapshot();
mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
+ raftActorContext.getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+ new MockRaftActorContext.MockPayload("D")), 1);
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
}
@Test
- public void testRaftRoleChangeNotifier() throws Exception {
+ public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
new JavaTestKit(getSystem()) {{
TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
Props.create(MessageCollectorActor.class));
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
long heartBeatInterval = 100;
config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
- config.setElectionTimeoutFactor(1);
+ config.setElectionTimeoutFactor(20);
String persistenceId = factory.generateActorId("notifier-");
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
+ Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
+ new NonPersistentProvider()), persistenceId);
List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
+
// check if the notifier got a role change from null to Follower
RoleChanged raftRoleChanged = matches.get(0);
assertEquals(persistenceId, raftRoleChanged.getMemberId());
}};
}
+ @Test
+ public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+ MessageCollectorActor.waitUntilReady(notifierActor);
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ long heartBeatInterval = 100;
+ config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
+ config.setElectionTimeoutFactor(1);
+
+ String persistenceId = factory.generateActorId("notifier-");
+
+ factory.createActor(MockRaftActor.props(persistenceId,
+ ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
+
+ List<RoleChanged> matches = null;
+ for(int i = 0; i < 5000 / heartBeatInterval; i++) {
+ matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+ assertNotNull(matches);
+ if(matches.size() == 3) {
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
+ }
+
+ assertEquals(2, matches.size());
+
+ // check if the notifier got a role change from null to Follower
+ RoleChanged raftRoleChanged = matches.get(0);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertNull(raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+ // check if the notifier got a role change from Follower to Candidate
+ raftRoleChanged = matches.get(1);
+ assertEquals(persistenceId, raftRoleChanged.getMemberId());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+
+ }};
+ }
+
@Test
public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
new JavaTestKit(getSystem()) {
assertEquals(8, leaderActor.getReplicatedLog().size());
- leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
+ leaderActor.getRaftActorContext().getSnapshotManager()
+ .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+ new MockRaftActorContext.MockPayload("x")), 4);
- leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(leaderActor.delegate).createSnapshot();
assertEquals(8, leaderActor.getReplicatedLog().size());
new MockRaftActorContext.MockPayload("foo-2"),
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
+ , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
+
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+ // The commit is needed to complete the snapshot creation process
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertEquals(6, followerActor.getReplicatedLog().size());
//snapshot on 4
- followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
+ followerActor.getRaftActorContext().getSnapshotManager().capture(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+ new MockRaftActorContext.MockPayload("D")), 4);
- followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(followerActor.delegate).createSnapshot();
assertEquals(6, followerActor.getReplicatedLog().size());
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+ // The commit is needed to complete the snapshot creation process
+ followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
// Trimming log in this scenario is a no-op
assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals(-1, leader.getReplicatedToAllIndex());
}};
// Trimming log in this scenario is a no-op
assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
- assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals(3, leader.getReplicatedToAllIndex());
}};
// Create the leader and 2 follower actors and verify initial syncing of the followers after leader
// persistence recovery.
- follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams());
+ follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
- follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams());
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
peerAddresses = ImmutableMap.<String, String>builder().
put(follower1Id, follower1Actor.path().toString()).
InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
// Create the leader and 2 follower actors.
+ follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
- follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams());
-
- follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams());
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().
put(follower1Id, follower1Actor.path().toString()).
--- /dev/null
+package org.opendaylight.controller.cluster.raft;
+
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.testkit.TestActorRef;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.LoggerFactory;
+
+public class SnapshotManagerTest extends AbstractActorTest {
+
+ @Mock
+ private RaftActorContext mockRaftActorContext;
+
+ @Mock
+ private ConfigParams mockConfigParams;
+
+ @Mock
+ private ReplicatedLog mockReplicatedLog;
+
+ @Mock
+ private DataPersistenceProvider mockDataPersistenceProvider;
+
+ @Mock
+ private RaftActorBehavior mockRaftActorBehavior;
+
+ @Mock
+ private Procedure<Void> mockProcedure;
+
+ private SnapshotManager snapshotManager;
+
+ private TestActorFactory factory;
+
+ private TestActorRef<MessageCollectorActor> actorRef;
+
+ @Before
+ public void setUp(){
+ MockitoAnnotations.initMocks(this);
+
+ doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses();
+ doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
+ doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
+ doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
+ doReturn("123").when(mockRaftActorContext).getId();
+ doReturn("123").when(mockRaftActorBehavior).getLeaderId();
+
+ snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
+ factory = new TestActorFactory(getSystem());
+
+ actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
+ doReturn(actorRef).when(mockRaftActorContext).getActor();
+
+ }
+
+ @After
+ public void tearDown(){
+ factory.close();
+ }
+
+ @Test
+ public void testConstruction(){
+ assertEquals(false, snapshotManager.isCapturing());
+ }
+
+ @Test
+ public void testCaptureToInstall(){
+
+ // Force capturing toInstall = true
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+ new MockRaftActorContext.MockPayload()), 0, "follower-1");
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+
+ // LastIndex and LastTerm are picked up from the lastLogEntry
+ assertEquals(0L, captureSnapshot.getLastIndex());
+ assertEquals(1L, captureSnapshot.getLastTerm());
+
+ // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+ assertEquals(0L, captureSnapshot.getLastAppliedIndex());
+ assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+ //
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+ actorRef.underlyingActor().clear();
+ }
+
+ @Test
+ public void testCapture(){
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertTrue(capture);
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+ // LastIndex and LastTerm are picked up from the lastLogEntry
+ assertEquals(9L, captureSnapshot.getLastIndex());
+ assertEquals(1L, captureSnapshot.getLastTerm());
+
+ // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+ assertEquals(9L, captureSnapshot.getLastAppliedIndex());
+ assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+ //
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+
+ actorRef.underlyingActor().clear();
+
+ }
+
+ @Test
+ public void testIllegalCapture() throws Exception {
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertTrue(capture);
+
+ List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+ assertEquals(1, allMatching.size());
+
+ // This will not cause snapshot capture to start again
+ capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertFalse(capture);
+
+ allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+ assertEquals(1, allMatching.size());
+ }
+
+ @Test
+ public void testPersistWhenReplicatedToAllIndexMinusOne(){
+ doReturn("123").when(mockRaftActorContext).getId();
+ doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
+ doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+ snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
+ verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
+
+ Snapshot snapshot = snapshotArgumentCaptor.getValue();
+
+ assertEquals(6, snapshot.getLastAppliedTerm());
+ assertEquals(9, snapshot.getLastAppliedIndex());
+ assertEquals(9, snapshot.getLastIndex());
+ assertEquals(6, snapshot.getLastTerm());
+ assertEquals(10, snapshot.getState().length);
+ assertTrue(Arrays.equals(bytes, snapshot.getState()));
+ assertEquals(0, snapshot.getUnAppliedEntries().size());
+
+ verify(mockReplicatedLog).snapshotPreCommit(45L, 6L);
+ }
+
+
+ @Test
+ public void testCreate() throws Exception {
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure).apply(null);
+ }
+
+ @Test
+ public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, times(1)).apply(null);
+ }
+
+ @Test
+ public void testCallingCreateBeforeCapture() throws Exception {
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, times(0)).apply(null);
+ }
+
+ @Test
+ public void testCallingCreateAfterPersist() throws Exception {
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, times(1)).apply(null);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ reset(mockProcedure);
+
+ snapshotManager.create(mockProcedure);
+
+ verify(mockProcedure, never()).apply(null);
+ }
+
+ @Test
+ public void testPersistWhenReplicatedToAllIndexNotMinus(){
+ doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
+ doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9);
+ doReturn(6L).when(replicatedLogEntry).getTerm();
+ doReturn(9L).when(replicatedLogEntry).getIndex();
+
+ // when replicatedToAllIndex != -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior).setReplicatedToAllIndex(9);
+ }
+
+
+ @Test
+ public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){
+ doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+ new MockRaftActorContext.MockPayload()), -1);
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+ }
+
+ @Test
+ public void testPersistSendInstallSnapshot(){
+ doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+ // when replicatedToAllIndex = -1
+ boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ assertTrue(capture);
+
+ snapshotManager.create(mockProcedure);
+
+ byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+
+ snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ ArgumentCaptor<SendInstallSnapshot> sendInstallSnapshotArgumentCaptor
+ = ArgumentCaptor.forClass(SendInstallSnapshot.class);
+
+ verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture());
+
+ SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
+
+ assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray()));
+ }
+
+ @Test
+ public void testCallingPersistWithoutCaptureWillDoNothing(){
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+ }
+ @Test
+ public void testCallingPersistTwiceWillDoNoHarm(){
+ doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+ }
+
+ @Test
+ public void testCommit(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog).snapshotCommit();
+
+ verify(mockDataPersistenceProvider).deleteMessages(100L);
+
+ ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
+
+ verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
+
+ assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
+ // config snapShotBatchCount = 10
+ // therefore maxSequenceNumber = 90
+ }
+
+ @Test
+ public void testCommitBeforePersist(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ verify(mockDataPersistenceProvider, never()).deleteMessages(100L);
+
+ verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+ }
+
+ @Test
+ public void testCommitBeforeCapture(){
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong());
+
+ verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+ }
+
+ @Test
+ public void testCallingCommitMultipleTimesCausesNoHarm(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+ verify(mockReplicatedLog, times(1)).snapshotCommit();
+
+ verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+
+ verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+ }
+
+ @Test
+ public void testRollback(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog).snapshotRollback();
+ }
+
+
+ @Test
+ public void testRollbackBeforePersist(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog, never()).snapshotRollback();
+ }
+
+ @Test
+ public void testRollbackBeforeCapture(){
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog, never()).snapshotRollback();
+ }
+
+ @Test
+ public void testCallingRollbackMultipleTimesCausesNoHarm(){
+ // when replicatedToAllIndex = -1
+ snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+ snapshotManager.create(mockProcedure);
+
+ snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+ , Runtime.getRuntime().totalMemory());
+
+ snapshotManager.rollback();
+
+ snapshotManager.rollback();
+
+ verify(mockReplicatedLog, times(1)).snapshotRollback();
+ }
+
+ @Test
+ public void testTrimLog(){
+ ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(20L).when(mockRaftActorContext).getLastApplied();
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+ doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+ doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ snapshotManager.trimLog(10, mockRaftActorBehavior);
+
+ verify(mockReplicatedLog).snapshotPreCommit(10, 5);
+ verify(mockReplicatedLog).snapshotCommit();
+ }
+
+ @Test
+ public void testTrimLogAfterCapture(){
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertTrue(capture);
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(20L).when(mockRaftActorContext).getLastApplied();
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+ doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+ doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ snapshotManager.trimLog(10, mockRaftActorBehavior);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ }
+
+ @Test
+ public void testTrimLogAfterCaptureToInstall(){
+ boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9, "follower-1");
+
+ assertTrue(capture);
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(20L).when(mockRaftActorContext).getLastApplied();
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+ doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+ doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ snapshotManager.trimLog(10, mockRaftActorBehavior);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ }
+
+}
\ No newline at end of file
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
assertEquals("getTerm", 1001, reply.getTerm());
}
+ @Test
+ public void testCandidateSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+ MockRaftActorContext context = createActorContext();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ candidate = createBehavior(context);
+
+ MessageCollectorActor.expectFirstMatching(candidateActor, ElectionTimeout.class);
+
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+ }
@Override
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
}
+ @Test
+ public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+ MockRaftActorContext context = createActorContext();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ follower = createBehavior(context);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+ }
+
public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
int snapshotLength = bs.size();
int start = offset;
public class LeaderTest extends AbstractLeaderTest {
static final String FOLLOWER_ID = "follower";
+ public static final String LEADER_ID = "leader";
private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
new MockRaftActorContext.MockPayload("D"));
+ actorContext.getReplicatedLog().append(entry);
+
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
@Override
protected MockRaftActorContext createActorContext(ActorRef actorRef) {
- return createActorContext("leader", actorRef);
+ return createActorContext(LEADER_ID, actorRef);
}
private MockRaftActorContext createActorContextWithFollower() {
MockRaftActorContext leaderActorContext = createActorContext();
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+ Map<String, String> leaderPeerAddresses = new HashMap<>();
+ leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
- leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.setPeerAddresses(leaderPeerAddresses);
leaderActorContext.getReplicatedLog().removeFrom(0);
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
followerActorContext.setConfigParams(configParams);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
public String getNewRole() {
return newRole;
}
+
+ @Override
+ public String toString() {
+ return "RoleChanged{" +
+ "memberId='" + memberId + '\'' +
+ ", oldRole='" + oldRole + '\'' +
+ ", newRole='" + newRole + '\'' +
+ '}';
+ }
}
# failing an operation (eg transaction create and change listener registration).
#shard-initialization-timeout-in-seconds=300
-# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
-#shard-journal-recovery-log-batch-size=5000
+# The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.
+#shard-journal-recovery-log-batch-size=1000
# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
#shard-snapshot-batch-count=20000
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
* Coordinates persistence recovery on startup.
*/
private ShardRecoveryCoordinator recoveryCoordinator;
- private List<Object> currentLogRecoveryBatch;
private final DOMTransactionFactory transactionFactory;
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+ recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
}
private void setTransactionCommitTimeout() {
@Override
protected
void startLogRecoveryBatch(final int maxBatchSize) {
- currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
- }
+ recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
}
@Override
protected void appendRecoveredLogEntry(final Payload data) {
- if(data instanceof ModificationPayload) {
- try {
- currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
- } catch (ClassNotFoundException | IOException e) {
- LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
- }
- } else if (data instanceof CompositeModificationPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
- } else if (data instanceof CompositeModificationByteStringPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
- } else {
- LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
- }
+ recoveryCoordinator.appendRecoveredLogPayload(data);
}
@Override
protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted recovery sbapshot", persistenceId());
- }
+ recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
}
@Override
protected void applyCurrentLogRecoveryBatch() {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
- currentLogRecoveryBatch.size());
- }
+ recoveryCoordinator.applyCurrentLogRecoveryBatch();
}
@Override
protected void onRecoveryComplete() {
- if(recoveryCoordinator != null) {
- Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
- }
-
- for(DOMStoreWriteTransaction tx: txList) {
- try {
- syncCommitTransaction(tx);
- shardMBean.incrementCommittedTransactionCount();
- } catch (InterruptedException | ExecutionException e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("{}: Failed to commit", persistenceId(), e);
- }
- }
- }
-
recoveryCoordinator = null;
- currentLogRecoveryBatch = null;
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
// A data store could be of type config/operational
private final String type;
+ private final String shardManagerIdentifierString;
+
private final ClusterWrapper cluster;
private final Configuration configuration;
this.datastoreContext = datastoreContext;
this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
this.type = datastoreContext.getDataStoreType();
+ this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
@Override
public void handleCommand(Object message) throws Exception {
- if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
- findPrimary(FindPrimary.fromSerializable(message));
+ if (message instanceof FindPrimary) {
+ findPrimary((FindPrimary)message);
} else if(message instanceof FindLocalShard){
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
if(shardInformation != null) {
shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+ if (isReadyWithLeaderId()) {
+ LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+ persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+ waitTillReadyCountdownLatch.countDown();
+ }
+
} else {
LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
}
ShardInformation shardInfo = message.getShardInfo();
LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
- shardInfo.getShardId());
+ shardInfo.getShardName());
shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
if(!shardInfo.isShardInitialized()) {
- message.getSender().tell(new ActorNotInitialized(), getSelf());
+ LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+ message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
} else {
+ LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
}
}
if(shardInformation != null) {
shardInformation.setRole(roleChanged.getNewRole());
- if (isReady()) {
+ if (isReadyWithLeaderId()) {
LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
persistenceId(), type, waitTillReadyCountdownLatch.getCount());
return null;
}
- private boolean isReady() {
+ private boolean isReadyWithLeaderId() {
boolean isReady = true;
for (ShardInformation info : localShards.values()) {
- if(!info.isShardReady()){
+ if(!info.isShardReadyWithLeaderId()){
isReady = false;
break;
}
}
private void markShardAsInitialized(String shardName) {
- LOG.debug("Initializing shard [{}]", shardName);
+ LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
ShardInformation shardInformation = localShards.get(shardName);
if (shardInformation != null) {
shardInformation.addOnShardInitialized(onShardInitialized);
+ LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
+
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
onShardInitialized.setTimeoutSchedule(timeoutSchedule);
} else if (!shardInformation.isShardInitialized()) {
- getSender().tell(new ActorNotInitialized(), getSelf());
+ LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
+ shardInformation.getShardName());
+ getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
} else {
+ LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
+ shardInformation.getShardName());
getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
}
"recovering and a leader is being elected. Try again later.", shardId));
}
+ private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+ return new NotInitializedException(String.format(
+ "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
+ }
+
private void memberRemoved(ClusterEvent.MemberRemoved message) {
+ String memberName = message.member().roles().head();
+
+ LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
memberNameToAddress.remove(message.member().roles().head());
}
private void memberUp(ClusterEvent.MemberUp message) {
String memberName = message.member().roles().head();
+ LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
memberNameToAddress.put(memberName, message.member().address());
for(ShardInformation info : localShards.values()){
}
+ @VisibleForTesting
+ protected ClusterWrapper getCluster() {
+ return cluster;
+ }
+
@VisibleForTesting
protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
return getContext().actorOf(Shard.props(info.getShardId(),
}
private void findPrimary(FindPrimary message) {
+ LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
+
final String shardName = message.getShardName();
// First see if the there is a local replica for the shard
sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
- Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
+ Object found = new PrimaryFound(info.getSerializedLeaderActor());
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Found primary for {}: {}", shardName, found);
+ LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
}
return found;
return;
}
- List<String> members = configuration.getMembersFromShardName(shardName);
+ for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
+ if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
+ String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
- if(cluster.getCurrentMemberName() != null) {
- members.remove(cluster.getCurrentMemberName());
- }
+ LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+ shardName, path);
- /**
- * FIXME: Instead of sending remote shard actor path back to sender,
- * forward FindPrimary message to remote shard manager
- */
- // There is no way for us to figure out the primary (for now) so assume
- // that one of the remote nodes is a primary
- for(String memberName : members) {
- Address address = memberNameToAddress.get(memberName);
- if(address != null){
- String path =
- getShardActorPath(shardName, memberName);
- getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+ getContext().actorSelection(path).forward(message, getContext());
return;
}
}
- getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+
+ LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
+
+ getSender().tell(new PrimaryNotFoundException(
+ String.format("No primary shard found for %s.", shardName)), getSelf());
+ }
+
+ private StringBuilder getShardManagerActorPathBuilder(Address address) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
+ return builder;
}
private String getShardActorPath(String shardName, String memberName) {
Address address = memberNameToAddress.get(memberName);
if(address != null) {
- StringBuilder builder = new StringBuilder();
- builder.append(address.toString())
- .append("/user/")
- .append(ShardManagerIdentifier.builder().type(type).build().toString())
- .append("/")
+ StringBuilder builder = getShardManagerActorPathBuilder(address);
+ builder.append("/")
.append(getShardIdentifier(memberName, shardName));
return builder.toString();
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Collection;
-import java.util.Collections;
+import java.io.IOException;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
/**
*/
class ShardRecoveryCoordinator {
- private static final int TIME_OUT = 10;
-
- private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
- private final SchemaContext schemaContext;
+ private final InMemoryDOMDataStore store;
+ private List<ModificationPayload> currentLogRecoveryBatch;
private final String shardName;
- private final ExecutorService executor;
private final Logger log;
- private final String name;
- ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
- String name) {
- this.schemaContext = schemaContext;
+ ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
+ this.store = store;
this.shardName = shardName;
this.log = log;
- this.name = name;
-
- executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
}
- /**
- * Submits a batch of journal log entries.
- *
- * @param logEntries the serialized journal log entries
- * @param resultingTx the write Tx to which to apply the entries
- */
- void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
- LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
- resultingTxList.add(resultingTx);
- executor.execute(task);
- }
+ void startLogRecoveryBatch(int maxBatchSize) {
+ currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
- /**
- * Submits a snapshot.
- *
- * @param snapshotBytes the serialized snapshot
- * @param resultingTx the write Tx to which to apply the entries
- */
- void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
- SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
- resultingTxList.add(resultingTx);
- executor.execute(task);
+ log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
}
- Collection<DOMStoreWriteTransaction> getTransactions() {
- // Shutdown the executor and wait for task completion.
- executor.shutdown();
-
+ void appendRecoveredLogPayload(Payload payload) {
try {
- if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) {
- return resultingTxList;
+ if(payload instanceof ModificationPayload) {
+ currentLogRecoveryBatch.add((ModificationPayload) payload);
+ } else if (payload instanceof CompositeModificationPayload) {
+ currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+ ((CompositeModificationPayload) payload).getModification())));
+ } else if (payload instanceof CompositeModificationByteStringPayload) {
+ currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+ ((CompositeModificationByteStringPayload) payload).getModification())));
} else {
- log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
+ log.error("{}: Unknown payload {} received during recovery", shardName, payload);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ log.error("{}: Error extracting ModificationPayload", shardName, e);
}
- return Collections.emptyList();
}
- private static abstract class ShardRecoveryTask implements Runnable {
-
- final DOMStoreWriteTransaction resultingTx;
-
- ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
- this.resultingTx = resultingTx;
+ private void commitTransaction(DOMStoreWriteTransaction transaction) {
+ DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ try {
+ commitCohort.preCommit().get();
+ commitCohort.commit().get();
+ } catch (Exception e) {
+ log.error("{}: Failed to commit Tx on recovery", shardName, e);
}
}
- private class LogRecoveryTask extends ShardRecoveryTask {
-
- private final List<Object> logEntries;
-
- LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
- super(resultingTx);
- this.logEntries = logEntries;
- }
-
- @Override
- public void run() {
- for(int i = 0; i < logEntries.size(); i++) {
- MutableCompositeModification.fromSerializable(
- logEntries.get(i)).apply(resultingTx);
- // Null out to GC quicker.
- logEntries.set(i, null);
+ /**
+ * Applies the current batched log entries to the data store.
+ */
+ void applyCurrentLogRecoveryBatch() {
+ log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
+
+ DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+ for(ModificationPayload payload: currentLogRecoveryBatch) {
+ try {
+ MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
+ } catch (Exception e) {
+ log.error("{}: Error extracting ModificationPayload", shardName, e);
}
}
- }
- private class SnapshotRecoveryTask extends ShardRecoveryTask {
+ commitTransaction(writeTx);
+
+ currentLogRecoveryBatch = null;
+ }
- private final byte[] snapshotBytes;
+ /**
+ * Applies a recovered snapshot to the data store.
+ *
+ * @param snapshotBytes the serialized snapshot
+ */
+ void applyRecoveredSnapshot(final byte[] snapshotBytes) {
+ log.debug("{}: Applyng recovered sbapshot", shardName);
- SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
- super(resultingTx);
- this.snapshotBytes = snapshotBytes;
- }
+ DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
- @Override
- public void run() {
- NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+ NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
- // delete everything first
- resultingTx.delete(YangInstanceIdentifier.builder().build());
+ writeTx.write(YangInstanceIdentifier.builder().build(), node);
- // Add everything from the remote node back
- resultingTx.write(YangInstanceIdentifier.builder().build(), node);
- }
+ commitTransaction(writeTx);
}
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import java.io.Serializable;
-
-public class ActorNotInitialized implements Serializable {
- private static final long serialVersionUID = 1L;
-}
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
/**
* The FindPrimary message is used to locate the primary of any given shard
*
*/
-public class FindPrimary implements SerializableMessage{
- public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
+public class FindPrimary implements Serializable {
+ private static final long serialVersionUID = 1L;
private final String shardName;
private final boolean waitUntilReady;
return waitUntilReady;
}
- @Override
- public Object toSerializable() {
- return this;
- }
-
- public static FindPrimary fromSerializable(Object message){
- return (FindPrimary) message;
- }
-
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
package org.opendaylight.controller.cluster.datastore.messages;
+import java.io.Serializable;
-public class PrimaryFound implements SerializableMessage {
- public static final Class<PrimaryFound> SERIALIZABLE_CLASS = PrimaryFound.class;
- private final String primaryPath;
+public class PrimaryFound implements Serializable {
+ private static final long serialVersionUID = 1L;
- public PrimaryFound(final String primaryPath) {
- this.primaryPath = primaryPath;
- }
+ private final String primaryPath;
- public String getPrimaryPath() {
- return primaryPath;
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
+ public PrimaryFound(final String primaryPath) {
+ this.primaryPath = primaryPath;
}
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- PrimaryFound that = (PrimaryFound) o;
- if (!primaryPath.equals(that.primaryPath)) {
- return false;
+ public String getPrimaryPath() {
+ return primaryPath;
}
- return true;
- }
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
- @Override
- public int hashCode() {
- return primaryPath.hashCode();
- }
+ PrimaryFound that = (PrimaryFound) o;
- @Override
- public String toString() {
- return "PrimaryFound{" +
- "primaryPath='" + primaryPath + '\'' +
- '}';
- }
+ if (!primaryPath.equals(that.primaryPath)) {
+ return false;
+ }
+ return true;
+ }
- @Override
- public Object toSerializable() {
- return this;
- }
+ @Override
+ public int hashCode() {
+ return primaryPath.hashCode();
+ }
- public static PrimaryFound fromSerializable(final Object message){
- return (PrimaryFound) message;
- }
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]");
+ return builder.toString();
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import com.google.common.base.Preconditions;
-
-public class PrimaryNotFound implements SerializableMessage {
- public static final Class<PrimaryNotFound> SERIALIZABLE_CLASS = PrimaryNotFound.class;
-
- private final String shardName;
-
- public PrimaryNotFound(final String shardName){
-
- Preconditions.checkNotNull(shardName, "shardName should not be null");
-
- this.shardName = shardName;
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- PrimaryNotFound that = (PrimaryNotFound) o;
-
- if (shardName != null ? !shardName.equals(that.shardName) : that.shardName != null) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return shardName != null ? shardName.hashCode() : 0;
- }
-
- @Override
- public Object toSerializable() {
- return this;
- }
-
- public static PrimaryNotFound fromSerializable(final Object message){
- return (PrimaryNotFound) message;
- }
-}
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
return ret;
}
Future<Object> future = executeOperationAsync(shardManager,
- new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
+ new FindPrimary(shardName, true), shardInitializationTimeout);
return future.transform(new Mapper<Object, ActorSelection>() {
@Override
public ActorSelection checkedApply(Object response) throws Exception {
- if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
- PrimaryFound found = PrimaryFound.fromSerializable(response);
+ if(response instanceof PrimaryFound) {
+ PrimaryFound found = (PrimaryFound)response;
LOG.debug("Primary found {}", found.getPrimaryPath());
ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
return actorSelection;
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found primary shard %s but it's not initialized yet. " +
- "Please try again later", shardName));
- } else if(response instanceof PrimaryNotFound) {
- throw new PrimaryNotFoundException(
- String.format("No primary shard found for %S.", shardName));
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
+ } else if(response instanceof PrimaryNotFoundException) {
+ throw (PrimaryNotFoundException)response;
} else if(response instanceof NoShardLeaderException) {
throw (NoShardLeaderException)response;
}
LocalShardFound found = (LocalShardFound)response;
LOG.debug("Local shard found {}", found.getPath());
return found.getPath();
- } else if(response instanceof ActorNotInitialized) {
- throw new NotInitializedException(
- String.format("Found local shard for %s but it's not initialized yet.",
- shardName));
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
} else if(response instanceof LocalShardNotFound) {
throw new LocalShardNotFoundException(
String.format("Local shard for %s does not exist.", shardName));
}
leaf shard-journal-recovery-log-batch-size {
- default 5000;
+ default 1000;
type non-zero-uint32-type;
description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
}
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.testkit.TestActorRef;
import com.google.common.base.Function;
};
TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)), "testRecovery");
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testRecovery");
assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
*/
package org.opendaylight.controller.cluster.datastore;
-import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
- reply(new ActorNotInitialized());
+ reply(new NotInitializedException("not initialized"));
new Within(duration("1 seconds")) {
@Override
private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
new IntegrationTestKit(getSystem()) {{
String testName = "testTransactionCommitFailureWithNoShardLeader";
- String shardName = "test-1";
+ String shardName = "default";
// We don't want the shard to become the leader so prevent shard election from completing
// by setting the election timeout, which is based on the heartbeat interval, really high.
@Override
public void run() {
try {
- writeTx.write(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ writeTx.write(TestModel.JUNK_PATH,
+ ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
txCohort.set(writeTx.ready());
} catch(Exception e) {
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.AddressFromURIString;
import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.dispatch.Dispatchers;
import akka.japi.Creator;
import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.ConfigFactory;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
+ private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+ String name = new ShardIdentifier(shardName, memberName,"config").toString();
+ return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
+ }
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
private Props newPropsShardMgrWithMockShardActor() {
+ return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
+ new MockConfiguration());
+ }
+
+ private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
+ final ClusterWrapper clusterWrapper, final Configuration config) {
Creator<ShardManager> creator = new Creator<ShardManager>() {
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
- datastoreContextBuilder.build(), ready) {
- @Override
- protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
- return mockShardActor;
- }
- };
+ return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
+ ready, name, shardActor);
}
};
- return Props.create(new DelegatingShardManagerCreator(creator));
+ return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
}
@Test
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("non-existent", false), getRef());
- expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable());
+ expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
}};
}
shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
RaftState.Leader.name())), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
}};
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-2-shard-default"));
}};
new JavaTestKit(getSystem()) {{
final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
}};
}
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
}};
shardManager.tell(new RoleChangeNotification(memberId,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
}};
// We're passing waitUntilInitialized = true to FindPrimary so the response should be
// delayed until we send ActorInitialized and RoleChangeNotification.
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
- expectMsgClass(duration("2 seconds"), ActorNotInitialized.class);
+ expectMsgClass(duration("2 seconds"), NotInitializedException.class);
shardManager.tell(new ActorInitialized(), mockShardActor);
shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
null, RaftState.Candidate.name()), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
}};
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
}};
}
+ @Test
+ public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
+ String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+ // Create an ActorSystem ShardManager actor for member-1.
+
+ final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+ final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+ newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+ new MockConfiguration()), shardManagerID);
+
+ // Create an ActorSystem ShardManager actor for member-2.
+
+ final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+ Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
+
+ MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+ newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+ mockConfig2), shardManagerID);
+
+ new JavaTestKit(system1) {{
+
+ shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+ String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+ shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
+ shardManager2.tell(new RoleChangeNotification(memberId2,
+ RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+
+ shardManager1.underlyingActor().waitForMemberUp();
+
+ shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+ PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ String path = found.getPrimaryPath();
+ assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
+
+ shardManager2.underlyingActor().verifyFindPrimary();
+
+ Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+ shardManager1.underlyingActor().waitForMemberRemoved();
+
+ shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+ expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
+ }};
+
+ JavaTestKit.shutdownActorSystem(system1);
+ JavaTestKit.shutdownActorSystem(system2);
+ }
+
@Test
public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
- expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+ expectMsgClass(duration("5 seconds"), NotInitializedException.class);
}};
}
}};
}
- @Test
- public void testOnReceiveMemberUp() throws Exception {
- new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
-
- MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
-
- shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
- PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
- PrimaryFound.SERIALIZABLE_CLASS));
- String path = found.getPrimaryPath();
- assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
- }};
- }
-
- @Test
- public void testOnReceiveMemberDown() throws Exception {
-
- new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
-
- MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
-
- shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
- expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
-
- MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
-
- shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
- expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
- }};
- }
-
@Test
public void testOnRecoveryJournalIsCleaned() {
InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
}
@Test
- public void testRoleChangeNotificationReleaseReady() throws Exception {
+ public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception {
new JavaTestKit(getSystem()) {
{
TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
+ verify(ready, never()).countDown();
+
+ shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId));
+
verify(ready, times(1)).countDown();
}};
}
+ @Test
+ public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+ memberId, null, RaftState.Follower.name()));
+
+ verify(ready, never()).countDown();
+
+ shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
+
+ verify(ready, times(1)).countDown();
+
+ }};
+ }
+
+
@Test
public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
new JavaTestKit(getSystem()) {
return delegate.create();
}
}
+
+ private static class ForwardingShardManager extends ShardManager {
+ private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
+ private CountDownLatch memberUpReceived = new CountDownLatch(1);
+ private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+ private final ActorRef shardActor;
+ private final String name;
+
+ protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
+ DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
+ ActorRef shardActor) {
+ super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+ this.shardActor = shardActor;
+ this.name = name;
+ }
+
+ @Override
+ public void handleCommand(Object message) throws Exception {
+ try{
+ super.handleCommand(message);
+ } finally {
+ if(message instanceof FindPrimary) {
+ findPrimaryMessageReceived.countDown();
+ } else if(message instanceof ClusterEvent.MemberUp) {
+ String role = ((ClusterEvent.MemberUp)message).member().roles().head();
+ if(!getCluster().getCurrentMemberName().equals(role)) {
+ memberUpReceived.countDown();
+ }
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
+ if(!getCluster().getCurrentMemberName().equals(role)) {
+ memberRemovedReceived.countDown();
+ }
+ }
+ }
+ }
+
+ @Override
+ public String persistenceId() {
+ return name;
+ }
+
+ @Override
+ protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ return shardActor;
+ }
+
+ void waitForMemberUp() {
+ assertEquals("MemberUp received", true,
+ Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
+ memberUpReceived = new CountDownLatch(1);
+ }
+
+ void waitForMemberRemoved() {
+ assertEquals("MemberRemoved received", true,
+ Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
+ memberRemovedReceived = new CountDownLatch(1);
+ }
+
+ void verifyFindPrimary() {
+ assertEquals("FindPrimary received", true,
+ Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
+ findPrimaryMessageReceived = new CountDownLatch(1);
+ }
+ }
}
import org.junit.Test;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@Override
public Shard create() throws Exception {
+ // Use a non persistent provider because this test actually invokes persist on the journal
+ // this will cause all other messages to not be queued properly after that.
+ // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
+ // it does do a persist)
return new Shard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
+ dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
// Use MBean for verification
// Committed transaction count should increase as usual
- assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+ assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
// Commit index should advance as we do not have an empty modification
assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
dataStoreContextBuilder.persistent(persistent);
+
+
new ShardTestKit(getSystem()) {{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
- Creator<Shard> creator = new Creator<Shard>() {
- @Override
- public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
- DelegatingPersistentDataProvider delegating;
+ class TestShard extends Shard {
- @Override
- protected DataPersistenceProvider persistence() {
- if(delegating == null) {
- delegating = new DelegatingPersistentDataProvider(super.persistence());
- }
+ protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ super(name, peerAddresses, datastoreContext, schemaContext);
+ }
- return delegating;
- }
+ DelegatingPersistentDataProvider delegating;
- @Override
- protected void commitSnapshot(final long sequenceNumber) {
- super.commitSnapshot(sequenceNumber);
- latch.get().countDown();
- }
- };
+ protected DataPersistenceProvider persistence() {
+ if(delegating == null) {
+ delegating = new DelegatingPersistentDataProvider(super.persistence());
+ }
+ return delegating;
+ }
+
+ @Override
+ protected void commitSnapshot(final long sequenceNumber) {
+ super.commitSnapshot(sequenceNumber);
+ latch.get().countDown();
+ }
+
+ @Override
+ public RaftActorContext getRaftActorContext() {
+ return super.getRaftActorContext();
+ }
+ }
+
+ Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new TestShard(shardID, Collections.<String,String>emptyMap(),
+ newDatastoreContext(), SCHEMA_CONTEXT);
}
};
NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
- CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
- shard.tell(capture, getRef());
+ // Trigger creation of a snapshot by ensuring
+ RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
+ raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
latch.set(new CountDownLatch(1));
savedSnapshot.set(null);
- shard.tell(capture, getRef());
+ raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryNotFound("foobar"));
+ return Futures.successful((Object) new PrimaryNotFoundException("not found"));
}
};
mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new ActorNotInitialized());
+ return Futures.successful((Object) new NotInitializedException("not iniislized"));
}
};
TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
- shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable());
- shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable());
+ shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
+ shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
Configuration mockConfig = mock(Configuration.class);
import akka.cluster.ClusterEvent;
import akka.cluster.MemberStatus;
import akka.cluster.UniqueAddress;
-import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import scala.collection.JavaConversions;
import java.util.HashSet;
import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import scala.collection.JavaConversions;
public class MockClusterWrapper implements ClusterWrapper{
private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550);
+ private String currentMemberName = "member-1";
+
+ public MockClusterWrapper() {
+ }
+
+ public MockClusterWrapper(String currentMemberName) {
+ this.currentMemberName = currentMemberName;
+ }
@Override
public void subscribeToMemberEvents(ActorRef actorRef) {
@Override
public String getCurrentMemberName() {
- return "member-1";
+ return currentMemberName;
}
@Override
package org.opendaylight.controller.cluster.datastore.utils;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
public class MockConfiguration implements Configuration{
- @Override public List<String> getMemberShardNames(final String memberName) {
- return Arrays.asList("default");
+ private Map<String, List<String>> shardMembers = ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ /*put("astronauts", Arrays.asList("member-2", "member-3")).*/build();
+
+ public MockConfiguration() {
+ }
+
+ public MockConfiguration(Map<String, List<String>> shardMembers) {
+ this.shardMembers = shardMembers;
}
- @Override public Optional<String> getModuleNameFromNameSpace(
+ @Override
+ public List<String> getMemberShardNames(final String memberName) {
+ return new ArrayList<>(shardMembers.keySet());
+ }
+ @Override
+ public Optional<String> getModuleNameFromNameSpace(
final String nameSpace) {
return Optional.absent();
}
return Arrays.asList("member-2", "member-3");
}
- return Collections.emptyList();
+ List<String> members = shardMembers.get(shardName);
+ return members != null ? members : Collections.<String>emptyList();
}
@Override public Set<String> getAllShardNames() {
public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
"test");
+ public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
+ "junk");
+
+
public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+ public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
node(OUTER_LIST_QNAME).build();
public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
+
+Member1 {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
+
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
+
+ akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ loglevel = "DEBUG"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2558
+ }
+ }
+
+ cluster {
+ auto-down-unreachable-after = 100s
+
+ roles = [
+ "member-1"
+ ]
+ }
+ }
+}
+
+Member2 {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
+
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
+
+ akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2559
+ }
+ }
+
+ cluster {
+ auto-down-unreachable-after = 100s
+
+ roles = [
+ "member-2"
+ ]
+ }
+ }
+}
@AfterClass
public static void tearDown() throws Exception {
hashedWheelTimer.stop();
- nettyGroup.shutdownGracefully().await();
+ nettyGroup.shutdownGracefully().await(5, TimeUnit.SECONDS);
minaTimerEx.shutdownNow();
nioExec.shutdownNow();
}