X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=e5c5dc752d3a257e9ce2f5852bf3350b006b8116;hp=d85ac8ef67ded21d6140bb42512ba5fa6eb2165d;hb=0230f37066dfd974accaf36bc712d6f1e60637d0;hpb=ae75f63e3b47d0a10f736bdd18fb80e6ddc504ed diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index d85ac8ef67..e5c5dc752d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -67,6 +67,16 @@ import scala.concurrent.duration.FiniteDuration; * set commitIndex = N (§5.3, §5.4). */ public abstract class AbstractLeader extends AbstractRaftActorBehavior { + + // The index of the first chunk that is sent when installing a snapshot + public static final int FIRST_CHUNK_INDEX = 1; + + // The index that the follower should respond with if it needs the install snapshot to be reset + public static final int INVALID_CHUNK_INDEX = -1; + + // This would be passed as the hash code of the last chunk when sending the first chunk + public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; + protected final Map followerToLog = new HashMap<>(); protected final Map mapFollowerToSnapshot = new HashMap<>(); @@ -332,6 +342,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { "sending snapshot chunk failed, Will retry, Chunk:{}", reply.getChunkIndex() ); + followerToSnapshot.markSendStatus(false); } @@ -341,6 +352,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { " or Chunk Index in InstallSnapshotReply not matching {} != {}", followerToSnapshot.getChunkIndex(), reply.getChunkIndex() ); + + if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ + // Since the Follower did not find this index to be valid we should reset the follower snapshot + // so that Installing the snapshot can resume from the beginning + followerToSnapshot.reset(); + } } } @@ -539,7 +556,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().getSnapshotTerm(), getNextSnapshotChunk(followerId,snapshot.get()), mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks() + mapFollowerToSnapshot.get(followerId).getTotalChunks(), + Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode()) ).toSerializable(), actor() ); @@ -636,11 +654,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private boolean replyStatus = false; private int chunkIndex; private int totalChunks; + private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; public FollowerToSnapshot(ByteString snapshotBytes) { this.snapshotBytes = snapshotBytes; - replyReceivedForOffset = -1; - chunkIndex = 1; int size = snapshotBytes.size(); totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); @@ -648,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("Snapshot {} bytes, total chunks to send:{}", size, totalChunks); } + replyReceivedForOffset = -1; + chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; } public ByteString getSnapshotBytes() { @@ -692,6 +712,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // if the chunk sent was successful replyReceivedForOffset = offset; replyStatus = true; + lastChunkHashCode = nextChunkHashCode; } else { // if the chunk sent was failure replyReceivedForOffset = offset; @@ -715,8 +736,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("length={}, offset={},size={}", snapshotLength, start, size); } - return getSnapshotBytes().substring(start, start + size); + ByteString substring = getSnapshotBytes().substring(start, start + size); + nextChunkHashCode = substring.hashCode(); + return substring; + } + + /** + * reset should be called when the Follower needs to be sent the snapshot from the beginning + */ + public void reset(){ + offset = 0; + replyStatus = false; + replyReceivedForOffset = offset; + chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; + lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + } + public int getLastChunkHashCode() { + return lastChunkHashCode; } }