X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=cfafc2c84d8a12971b172d2fa4594af5a0f36435;hp=622d59e41a062346aa55eaa82e98f4255b592ee6;hb=727170e24bd3c6a3973160b86343c2a534344e6e;hpb=3bc363a69d6d48709f7fd741ef018ecd75b8f99b diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 622d59e41a..cfafc2c84d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -16,6 +16,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -28,12 +29,13 @@ import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; -import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; +import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -43,6 +45,7 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import scala.concurrent.duration.FiniteDuration; /** @@ -85,37 +88,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Collection trackerList = new LinkedList<>(); - protected final int minReplicationCount; - - protected final int minIsolatedLeaderPeerCount; + private int minReplicationCount; private Optional snapshot; - public AbstractLeader(RaftActorContext context) { - super(context, RaftState.Leader); + protected AbstractLeader(RaftActorContext context, RaftState state) { + super(context, state); setLeaderPayloadVersion(context.getPayloadVersion()); - for (String followerId : context.getPeerAddresses().keySet()) { - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, -1, context); - - followerToLog.put(followerId, followerLogInformation); + for(PeerInfo peerInfo: context.getPeers()) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + followerToLog.put(peerInfo.getId(), followerLogInformation); } leaderId = context.getId(); LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); - minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); - - // the isolated Leader peer count will be 1 less than the majority vote count. - // this is because the vote count has the self vote counted in it - // for e.g - // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0 - // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 - // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 - minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0; + updateMinReplicaCount(); snapshot = Optional.absent(); @@ -138,10 +129,41 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.keySet(); } - public void addFollower(String followerId, FollowerState followerState) { - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context); - followerLogInformation.setFollowerState(followerState); + public void addFollower(String followerId) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl( + context.getPeerInfo(followerId), -1, context); followerToLog.put(followerId, followerLogInformation); + + if(heartbeatSchedule == null) { + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); + } + } + + public void removeFollower(String followerId) { + followerToLog.remove(followerId); + mapFollowerToSnapshot.remove(followerId); + } + + public void updateMinReplicaCount() { + int numVoting = 0; + for(PeerInfo peer: context.getPeers()) { + if(peer.isVoting()) { + numVoting++; + } + } + + minReplicationCount = getMajorityVoteCount(numVoting); + } + + protected int getMinIsolatedLeaderPeerCount(){ + //the isolated Leader peer count will be 1 less than the majority vote count. + //this is because the vote count has the self vote counted in it + //for e.g + //0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0 + //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 + //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 + + return minReplicationCount > 0 ? (minReplicationCount - 1) : 0; } @VisibleForTesting @@ -190,6 +212,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.markFollowerActive(); followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion()); + followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion()); boolean updated = false; if (appendEntriesReply.isSuccess()) { @@ -234,15 +257,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { int replicatedCount = 1; for (FollowerLogInformation info : followerToLog.values()) { - if (info.getMatchIndex() >= N) { + final PeerInfo peerInfo = context.getPeerInfo(info.getId()); + if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) { replicatedCount++; } } if (replicatedCount >= minReplicationCount) { ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); - if (replicatedLogEntry != null && - replicatedLogEntry.getTerm() == currentTerm()) { + if (replicatedLogEntry == null) { + break; + } + + // Don't update the commit index if the log entry is from a previous term, as per §5.4.1: + // "Raft never commits log entries from previous terms by counting replicas". + // However we keep looping so we can make progress when new entries in the current term + // reach consensus, as per §5.4.1: "once an entry from the current term is committed by + // counting replicas, then all prior entries are committed indirectly". + if (replicatedLogEntry.getTerm() == currentTerm()) { context.setCommitIndex(N); } } else { @@ -378,12 +410,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { - LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", + LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply", logName(), followerId); return; } FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + if(followerLogInformation == null) { + // This can happen during AddServer if it times out. + LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", + logName(), followerId); + mapFollowerToSnapshot.remove(followerId); + return; + } + followerLogInformation.markFollowerActive(); if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { @@ -414,7 +454,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { setSnapshot(null); } wasLastChunk = true; - + if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){ + UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = + new UnInitializedFollowerSnapshotReply(followerId); + context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor()); + LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self"); + } } else { followerToSnapshot.markSendStatus(true); } @@ -475,7 +520,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { + protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { // Send an AppendEntries to all followers for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); @@ -581,7 +626,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { appendEntries); } - followerActor.tell(appendEntries.toSerializable(), actor()); + followerActor.tell(appendEntries, actor()); } /** @@ -597,17 +642,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * then send the existing snapshot in chunks to the follower. * @param followerId */ - public void initiateCaptureSnapshot(String followerId) { + public boolean initiateCaptureSnapshot(String followerId) { if (snapshot.isPresent()) { // if a snapshot is present in the memory, most likely another install is in progress // no need to capture snapshot. // This could happen if another follower needs an install when one is going on. final ActorSelection followerActor = context.getPeerActorSelection(followerId); sendSnapshotChunk(followerActor, followerId); - - + return true; } else { - context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), this.getReplicatedToAllIndex(), followerId); } } @@ -626,14 +670,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendInstallSnapshot() { LOG.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { - ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); + String followerId = e.getKey(); + ActorSelection followerActor = context.getPeerActorSelection(followerId); FollowerLogInformation followerLogInfo = e.getValue(); if (followerActor != null) { - long nextIndex = e.getValue().getNextIndex(); - if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED || + long nextIndex = followerLogInfo.getNextIndex(); + if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED || canInstallSnapshot(nextIndex)) { - sendSnapshotChunk(followerActor, e.getKey()); + sendSnapshotChunk(followerActor, followerId); } } } @@ -646,7 +691,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { - ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); + byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. @@ -660,7 +705,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.incrementChunkIndex(), followerToSnapshot.getTotalChunks(), Optional.of(followerToSnapshot.getLastChunkHashCode()) - ).toSerializable(), + ).toSerializable(followerToLog.get(followerId).getRaftVersion()), actor() ); @@ -679,15 +724,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * Acccepts snaphot as ByteString, enters into map for future chunks * creates and return a ByteString chunk */ - private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { + private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { followerToSnapshot = new FollowerToSnapshot(snapshotBytes); mapFollowerToSnapshot.put(followerId, followerToSnapshot); } - ByteString nextChunk = followerToSnapshot.getNextChunk(); + byte[] nextChunk = followerToSnapshot.getNextChunk(); - LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size()); + LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length); return nextChunk; } @@ -720,12 +765,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // need to be sent if there are other messages being sent to the remote // actor. heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( - interval, context.getActor(), new SendHeartBeat(), + interval, context.getActor(), SendHeartBeat.INSTANCE, context.getActorSystem().dispatcher(), context.getActor()); } @Override - public void close() throws Exception { + public void close() { stopHeartBeat(); } @@ -735,7 +780,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } protected boolean isLeaderIsolated() { - int minPresent = minIsolatedLeaderPeerCount; + int minPresent = getMinIsolatedLeaderPeerCount(); for (FollowerLogInformation followerLogInformation : followerToLog.values()) { if (followerLogInformation.isFollowerActive()) { --minPresent; @@ -826,25 +871,23 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - public ByteString getNextChunk() { + public byte[] getNextChunk() { int snapshotLength = getSnapshotBytes().size(); int start = incrementOffset(); int size = context.getConfigParams().getSnapshotChunkSize(); if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { size = snapshotLength; - } else { - if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { - size = snapshotLength - start; - } + } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { + size = snapshotLength - start; } + byte[] nextChunk = new byte[size]; + getSnapshotBytes().copyTo(nextChunk, start, 0, size); + nextChunkHashCode = Arrays.hashCode(nextChunk); - LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(), - snapshotLength, start, size); - - ByteString substring = getSnapshotBytes().substring(start, start + size); - nextChunkHashCode = substring.hashCode(); - return substring; + LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(), + snapshotLength, start, size, nextChunkHashCode); + return nextChunk; } /**