X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=7c439f7d5c929910c7dd04f4504deb7dc81c589f;hb=d3e310b940b60f6590f0e94a576aece95a055942;hp=94e08658e8c06d443921e80db090b939911ef6f2;hpb=d86f990976dcc2879b40dec7df1b3b5fba8cba78;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 94e08658e8..7c439f7d5c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -16,7 +16,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -35,7 +34,6 @@ import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; @@ -48,6 +46,7 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import scala.concurrent.duration.FiniteDuration; /** @@ -73,18 +72,8 @@ import scala.concurrent.duration.FiniteDuration; * set commitIndex = N (§5.3, §5.4). */ public abstract class AbstractLeader extends AbstractRaftActorBehavior { - - // The index of the first chunk that is sent when installing a snapshot - public static final int FIRST_CHUNK_INDEX = 1; - - // The index that the follower should respond with if it needs the install snapshot to be reset - public static final int INVALID_CHUNK_INDEX = -1; - - // This would be passed as the hash code of the last chunk when sending the first chunk - public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; - private final Map followerToLog = new HashMap<>(); - private final Map mapFollowerToSnapshot = new HashMap<>(); + private final Map mapFollowerToSnapshot = new HashMap<>(); /** * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really @@ -96,7 +85,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Queue trackers = new LinkedList<>(); private Cancellable heartbeatSchedule = null; - private Optional snapshot; + private Optional snapshot = Optional.absent();; private int minReplicationCount; protected AbstractLeader(RaftActorContext context, RaftState state, @@ -105,6 +94,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(initializeFromLeader != null) { followerToLog.putAll(initializeFromLeader.followerToLog); + mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot); + snapshot = initializeFromLeader.snapshot; + trackers.addAll(initializeFromLeader.trackers); } else { for(PeerInfo peerInfo: context.getPeers()) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); @@ -116,8 +108,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { updateMinReplicaCount(); - snapshot = Optional.absent(); - // Immediately schedule a heartbeat // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to @@ -299,7 +289,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.trace("{}: checking Nth index {}", logName(), N); for (FollowerLogInformation info : followerToLog.values()) { final PeerInfo peerInfo = context.getPeerInfo(info.getId()); - if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) { + if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) { replicatedCount++; } else if(LOG.isTraceEnabled()) { LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), @@ -406,11 +396,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected void beforeSendHeartbeat(){} @Override - public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { Preconditions.checkNotNull(sender, "sender should not be null"); - Object message = fromSerializableMessage(originalMessage); - if (message instanceof RaftRPC) { RaftRPC rpc = (RaftRPC) message; // If RPC request or response contains term T > currentTerm: @@ -449,7 +437,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); String followerId = reply.getFollowerId(); - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply", @@ -527,7 +515,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { logName(), reply.getChunkIndex(), followerId, followerToSnapshot.getChunkIndex()); - if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ + if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){ // Since the Follower did not find this index to be valid we should reset the follower snapshot // so that Installing the snapshot can resume from the beginning followerToSnapshot.reset(); @@ -701,9 +689,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // If the follower's nextIndex is -1 then we might as well send it a snapshot // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present // in the snapshot - return (nextIndex == -1 || + return nextIndex == -1 || (!context.getReplicatedLog().isPresent(nextIndex) - && context.getReplicatedLog().isInSnapshot(nextIndex))); + && context.getReplicatedLog().isInSnapshot(nextIndex)); } @@ -736,7 +724,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); int nextChunkIndex = followerToSnapshot.incrementChunkIndex(); Optional serverConfig = Optional.absent(); @@ -773,9 +761,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * creates and return a ByteString chunk */ private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { - followerToSnapshot = new FollowerToSnapshot(snapshotBytes); + followerToSnapshot = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(), + logName()); mapFollowerToSnapshot.put(followerId, followerToSnapshot); } byte[] nextChunk = followerToSnapshot.getNextChunk(); @@ -843,121 +832,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } } - return (minPresent != 0); - } - - /** - * Encapsulates the snapshot bytestring and handles the logic of sending - * snapshot chunks - */ - protected class FollowerToSnapshot { - private final ByteString snapshotBytes; - private int offset = 0; - // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset - private int replyReceivedForOffset; - // if replyStatus is false, the previous chunk is attempted - private boolean replyStatus = false; - private int chunkIndex; - private final int totalChunks; - private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - - public FollowerToSnapshot(ByteString snapshotBytes) { - this.snapshotBytes = snapshotBytes; - int size = snapshotBytes.size(); - totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + - ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", - logName(), size, totalChunks); - } - replyReceivedForOffset = -1; - chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; - } - - public ByteString getSnapshotBytes() { - return snapshotBytes; - } - - public int incrementOffset() { - if(replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again - offset = offset + context.getConfigParams().getSnapshotChunkSize(); - } - return offset; - } - - public int incrementChunkIndex() { - if (replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again - chunkIndex = chunkIndex + 1; - } - return chunkIndex; - } - - public int getChunkIndex() { - return chunkIndex; - } - - public int getTotalChunks() { - return totalChunks; - } - - public boolean canSendNextChunk() { - // we only send a false if a chunk is sent but we have not received a reply yet - return replyReceivedForOffset == offset; - } - - public boolean isLastChunk(int chunkIndex) { - return totalChunks == chunkIndex; - } - - public void markSendStatus(boolean success) { - if (success) { - // if the chunk sent was successful - replyReceivedForOffset = offset; - replyStatus = true; - lastChunkHashCode = nextChunkHashCode; - } else { - // if the chunk sent was failure - replyReceivedForOffset = offset; - replyStatus = false; - } - } - - public byte[] getNextChunk() { - int snapshotLength = getSnapshotBytes().size(); - int start = incrementOffset(); - int size = context.getConfigParams().getSnapshotChunkSize(); - if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { - size = snapshotLength; - } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { - size = snapshotLength - start; - } - - byte[] nextChunk = new byte[size]; - getSnapshotBytes().copyTo(nextChunk, start, 0, size); - nextChunkHashCode = Arrays.hashCode(nextChunk); - - LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(), - snapshotLength, start, size, nextChunkHashCode); - return nextChunk; - } - - /** - * reset should be called when the Follower needs to be sent the snapshot from the beginning - */ - public void reset(){ - offset = 0; - replyStatus = false; - replyReceivedForOffset = offset; - chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; - lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - } - - public int getLastChunkHashCode() { - return lastChunkHashCode; - } + return minPresent != 0; } // called from example-actor for printing the follower-states @@ -983,7 +858,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @VisibleForTesting - protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) { + protected void setFollowerSnapshot(String followerId, LeaderInstallSnapshotState snapshot) { mapFollowerToSnapshot.put(followerId, snapshot); }