X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=410dcee5e5e811e9b9cb485b30420a4d21311a12;hb=664fc9940569ebfd54dcfc3db87bab66fad9300e;hp=e28e4b066d372ee54594ecaf9fe0e5259ad4cdfd;hpb=fc54ab8853d36fb1d7aebf2a09ef10567e66aa0d;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index e28e4b066d..410dcee5e5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -294,6 +294,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(), + rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); return switchBehavior(new Follower(context)); @@ -330,12 +333,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void handleInstallSnapshotReply(InstallSnapshotReply reply) { String followerId = reply.getFollowerId(); FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + + if (followerToSnapshot == null) { + LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", + context.getId(), followerId); + return; + } + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); followerLogInformation.markFollowerActive(); - if (followerToSnapshot != null && - followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { - + if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { if (reply.isSuccess()) { if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply @@ -373,12 +381,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(false); } - } else { - LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" + - " or Chunk Index in InstallSnapshotReply not matching {} != {}", - context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex() - ); + LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", + context.getId(), reply.getChunkIndex(), followerId, + followerToSnapshot.getChunkIndex()); if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ // Since the Follower did not find this index to be valid we should reset the follower snapshot @@ -413,6 +419,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendAppendEntries() { // Send an AppendEntries to all followers + for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); ActorSelection followerActor = context.getPeerActorSelection(followerId); @@ -422,14 +429,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); - if (mapFollowerToSnapshot.get(followerId) != null) { + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + if (followerToSnapshot != null) { // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + if (isFollowerActive && followerToSnapshot.canSendNextChunk()) { sendSnapshotChunk(followerActor, followerId); } else { // we send a heartbeat even if we have not received a reply for the last chunk sendAppendEntriesToFollower(followerActor, followerNextIndex, - Collections.emptyList()); + Collections.emptyList(), followerId); } } else { @@ -437,8 +445,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); final List entries; - if (isFollowerActive && - context.getReplicatedLog().isPresent(followerNextIndex)) { + LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", + context.getId(), leaderLastIndex, leaderSnapShotIndex); + + if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { + LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(), + followerNextIndex, followerId); + // FIXME : Sending one entry at a time entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); @@ -465,23 +478,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { entries = Collections.emptyList(); } - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries); - + sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); } } } } private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, - List entries) { - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, - context.getCommitIndex(), - replicatedToAllIndex).toSerializable(), - actor() - ); + List entries, String followerId) { + AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex(), replicatedToAllIndex); + + if(!entries.isEmpty()) { + LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId, + appendEntries); + } + + followerActor.tell(appendEntries.toSerializable(), actor()); } /** @@ -501,6 +516,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * */ private void installSnapshotIfNeeded() { + if(LOG.isDebugEnabled()) { + LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet()); + } + for (Entry e : followerToLog.entrySet()) { final ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); @@ -508,7 +527,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long nextIndex = e.getValue().getNextIndex(); if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { + context.getReplicatedLog().isInSnapshot(nextIndex)) { LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey()); if (snapshot.isPresent()) { // if a snapshot is present in the memory, most likely another install is in progress @@ -573,21 +592,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { + ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get()); + + // Note: the previous call to getNextSnapshotChunk has the side-effect of adding + // followerId to the followerToSnapshot map. + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), - getNextSnapshotChunk(followerId,snapshot.get()), - mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks(), - Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode()) + nextSnapshotChunk, + followerToSnapshot.incrementChunkIndex(), + followerToSnapshot.getTotalChunks(), + Optional.of(followerToSnapshot.getLastChunkHashCode()) ).toSerializable(), actor() ); LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", context.getId(), followerActor.path(), - mapFollowerToSnapshot.get(followerId).getChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks()); + followerToSnapshot.getChunkIndex(), + followerToSnapshot.getTotalChunks()); } } catch (IOException e) { LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());