import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
* set commitIndex = N (§5.3, §5.4).
*/
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
-
- // The index of the first chunk that is sent when installing a snapshot
- public static final int FIRST_CHUNK_INDEX = 1;
-
- // The index that the follower should respond with if it needs the install snapshot to be reset
- public static final int INVALID_CHUNK_INDEX = -1;
-
- // This would be passed as the hash code of the last chunk when sending the first chunk
- public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
-
private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
- private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
+ private final Map<String, LeaderInstallSnapshotState> mapFollowerToSnapshot = new HashMap<>();
/**
* Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
String followerId = reply.getFollowerId();
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
logName(), reply.getChunkIndex(), followerId,
followerToSnapshot.getChunkIndex());
- if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+ if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){
// Since the Follower did not find this index to be valid we should reset the follower snapshot
// so that Installing the snapshot can resume from the beginning
followerToSnapshot.reset();
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
int nextChunkIndex = followerToSnapshot.incrementChunkIndex();
Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
* creates and return a ByteString chunk
*/
private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
- followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+ followerToSnapshot = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
+ logName());
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
byte[] nextChunk = followerToSnapshot.getNextChunk();
return minPresent != 0;
}
- /**
- * Encapsulates the snapshot bytestring and handles the logic of sending
- * snapshot chunks
- */
- protected class FollowerToSnapshot {
- private final ByteString snapshotBytes;
- private int offset = 0;
- // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
- private int replyReceivedForOffset;
- // if replyStatus is false, the previous chunk is attempted
- private boolean replyStatus = false;
- private int chunkIndex;
- private final int totalChunks;
- private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-
- public FollowerToSnapshot(ByteString snapshotBytes) {
- this.snapshotBytes = snapshotBytes;
- int size = snapshotBytes.size();
- totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) +
- (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
- logName(), size, totalChunks);
- }
- replyReceivedForOffset = -1;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- }
-
- public ByteString getSnapshotBytes() {
- return snapshotBytes;
- }
-
- public int incrementOffset() {
- if(replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- offset = offset + context.getConfigParams().getSnapshotChunkSize();
- }
- return offset;
- }
-
- public int incrementChunkIndex() {
- if (replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- chunkIndex = chunkIndex + 1;
- }
- return chunkIndex;
- }
-
- public int getChunkIndex() {
- return chunkIndex;
- }
-
- public int getTotalChunks() {
- return totalChunks;
- }
-
- public boolean canSendNextChunk() {
- // we only send a false if a chunk is sent but we have not received a reply yet
- return replyReceivedForOffset == offset;
- }
-
- public boolean isLastChunk(int chunkIndex) {
- return totalChunks == chunkIndex;
- }
-
- public void markSendStatus(boolean success) {
- if (success) {
- // if the chunk sent was successful
- replyReceivedForOffset = offset;
- replyStatus = true;
- lastChunkHashCode = nextChunkHashCode;
- } else {
- // if the chunk sent was failure
- replyReceivedForOffset = offset;
- replyStatus = false;
- }
- }
-
- public byte[] getNextChunk() {
- int snapshotLength = getSnapshotBytes().size();
- int start = incrementOffset();
- int size = context.getConfigParams().getSnapshotChunkSize();
- if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
- size = snapshotLength;
- } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
-
- byte[] nextChunk = new byte[size];
- getSnapshotBytes().copyTo(nextChunk, start, 0, size);
- nextChunkHashCode = Arrays.hashCode(nextChunk);
-
- LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
- snapshotLength, start, size, nextChunkHashCode);
- return nextChunk;
- }
-
- /**
- * reset should be called when the Follower needs to be sent the snapshot from the beginning
- */
- public void reset(){
- offset = 0;
- replyStatus = false;
- replyReceivedForOffset = offset;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- }
-
- public int getLastChunkHashCode() {
- return lastChunkHashCode;
- }
- }
-
// called from example-actor for printing the follower-states
public String printFollowerStates() {
final StringBuilder sb = new StringBuilder();
}
@VisibleForTesting
- protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
+ protected void setFollowerSnapshot(String followerId, LeaderInstallSnapshotState snapshot) {
mapFollowerToSnapshot.put(followerId, snapshot);
}
--- /dev/null
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import com.google.protobuf.ByteString;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
+ */
+class LeaderInstallSnapshotState {
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
+
+ // The index of the first chunk that is sent when installing a snapshot
+ static final int FIRST_CHUNK_INDEX = 1;
+
+ // The index that the follower should respond with if it needs the install snapshot to be reset
+ static final int INVALID_CHUNK_INDEX = -1;
+
+ // This would be passed as the hash code of the last chunk when sending the first chunk
+ static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+
+ private int snapshotChunkSize;
+ private final ByteString snapshotBytes;
+ private final String logName;
+ private int offset = 0;
+ // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
+ private int replyReceivedForOffset;
+ // if replyStatus is false, the previous chunk is attempted
+ private boolean replyStatus = false;
+ private int chunkIndex;
+ private final int totalChunks;
+ private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
+ private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
+
+ public LeaderInstallSnapshotState(ByteString snapshotBytes, int snapshotChunkSize, String logName) {
+ this.snapshotChunkSize = snapshotChunkSize;
+ this.snapshotBytes = snapshotBytes;
+ this.logName = logName;
+ int size = snapshotBytes.size();
+ totalChunks = (size / snapshotChunkSize) +
+ (size % snapshotChunkSize > 0 ? 1 : 0);
+
+ LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks);
+
+ replyReceivedForOffset = -1;
+ chunkIndex = FIRST_CHUNK_INDEX;
+ }
+
+ ByteString getSnapshotBytes() {
+ return snapshotBytes;
+ }
+
+ int incrementOffset() {
+ if(replyStatus) {
+ // if prev chunk failed, we would want to sent the same chunk again
+ offset = offset + snapshotChunkSize;
+ }
+ return offset;
+ }
+
+ int incrementChunkIndex() {
+ if (replyStatus) {
+ // if prev chunk failed, we would want to sent the same chunk again
+ chunkIndex = chunkIndex + 1;
+ }
+ return chunkIndex;
+ }
+
+ int getChunkIndex() {
+ return chunkIndex;
+ }
+
+ int getTotalChunks() {
+ return totalChunks;
+ }
+
+ boolean canSendNextChunk() {
+ // we only send a false if a chunk is sent but we have not received a reply yet
+ return replyReceivedForOffset == offset;
+ }
+
+ boolean isLastChunk(int index) {
+ return totalChunks == index;
+ }
+
+ void markSendStatus(boolean success) {
+ if (success) {
+ // if the chunk sent was successful
+ replyReceivedForOffset = offset;
+ replyStatus = true;
+ lastChunkHashCode = nextChunkHashCode;
+ } else {
+ // if the chunk sent was failure
+ replyReceivedForOffset = offset;
+ replyStatus = false;
+ }
+ }
+
+ byte[] getNextChunk() {
+ int snapshotLength = getSnapshotBytes().size();
+ int start = incrementOffset();
+ int size = snapshotChunkSize;
+ if (snapshotChunkSize > snapshotLength) {
+ size = snapshotLength;
+ } else if ((start + snapshotChunkSize) > snapshotLength) {
+ size = snapshotLength - start;
+ }
+
+ byte[] nextChunk = new byte[size];
+ getSnapshotBytes().copyTo(nextChunk, start, 0, size);
+ nextChunkHashCode = Arrays.hashCode(nextChunk);
+
+ LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
+ snapshotLength, start, size, nextChunkHashCode);
+ return nextChunk;
+ }
+
+ /**
+ * reset should be called when the Follower needs to be sent the snapshot from the beginning
+ */
+ void reset(){
+ offset = 0;
+ replyStatus = false;
+ replyReceivedForOffset = offset;
+ chunkIndex = FIRST_CHUNK_INDEX;
+ lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
+ }
+
+ int getLastChunkHashCode() {
+ return lastChunkHashCode;
+ }
+}
private final int totalChunks;
private final String leaderId;
private ByteString collectedChunks = ByteString.EMPTY;
- private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
+ private int lastChunkIndex = LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1;
private boolean sealed = false;
- private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+ private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE;
SnapshotTracker(Logger LOG, int totalChunks, String leaderId) {
this.LOG = LOG;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
ByteString bs = toByteString(leadersSnapshot);
leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
commitIndex, snapshotTerm, commitIndex, snapshotTerm));
- FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+ actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
//send first chunk and no InstallSnapshotReply received yet
ByteString bs = toByteString(leadersSnapshot);
leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
commitIndex, snapshotTerm, commitIndex, snapshotTerm));
- FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+ actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
while(!fts.isLastChunk(fts.getChunkIndex())) {
fts.getNextChunk();
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
+ assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
+ installSnapshot.getLastChunkHashCode().get().intValue());
int hashCode = Arrays.hashCode(installSnapshot.getData());
ByteString bs = toByteString(leadersSnapshot);
byte[] barray = bs.toByteArray();
- FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+ actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
leader.setFollowerSnapshot(FOLLOWER_ID, fts);
assertEquals(bs.size(), barray.length);
SnapshotTracker tracker3 = new SnapshotTracker(logger, 2, "leader");
try {
- tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
+ tracker3.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
Assert.fail();
} catch(SnapshotTracker.InvalidChunkException e){
// Out of sequence chunk indexes won't work
SnapshotTracker tracker4 = new SnapshotTracker(logger, 2, "leader");
- tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+ tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
try {
- tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX+2, chunk2, Optional.<Integer>absent());
+ tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX+2, chunk2, Optional.<Integer>absent());
Assert.fail();
} catch(SnapshotTracker.InvalidChunkException e){
// If the lastChunkHashCode is missing
SnapshotTracker tracker5 = new SnapshotTracker(logger, 2, "leader");
- tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+ tracker5.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
// Look I can add the same chunk again
- tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk1, Optional.<Integer>absent());
+ tracker5.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 1, chunk1, Optional.<Integer>absent());
// An exception will be thrown when an invalid chunk is addedd with the right sequence
// when the lastChunkHashCode is present
SnapshotTracker tracker6 = new SnapshotTracker(logger, 2, "leader");
- tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
+ tracker6.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
try {
// Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777
- tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
+ tracker6.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
Assert.fail();
}catch(SnapshotTracker.InvalidChunkException e){
SnapshotTracker tracker2 = new SnapshotTracker(logger, 3, "leader");
- tracker2.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
+ tracker2.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
tracker2.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
tracker2.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2)));
ByteString chunks = ByteString.copyFrom(chunk1).concat(ByteString.copyFrom(chunk2));
- tracker1.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
+ tracker1.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
tracker1.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
assertEquals(chunks, tracker1.getCollectedChunks());