X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=da1627b98e7e4e8204385914795300ce073a71ad;hb=77d55c2a5a0311aac06707d71e199ba30271b48c;hp=4993d25f202ecee0be55f7ec17b26528924a2e67;hpb=664c122de279729ca1a5e33f1b95606dab861ab7;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 4993d25f20..da1627b98e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -18,10 +18,11 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.protobuf.ByteString; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -84,7 +85,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Cancellable heartbeatSchedule = null; - private List trackerList = new ArrayList<>(); + private final Collection trackerList = new LinkedList<>(); protected final int minReplicationCount; @@ -108,7 +109,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { leaderId = context.getId(); - LOG.debug("Election:Leader has following peers: {}", getFollowerIds()); + LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds()); minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); @@ -152,7 +153,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries) { if(LOG.isDebugEnabled()) { - LOG.debug(appendEntries.toString()); + LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries); } return this; @@ -164,7 +165,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(! appendEntriesReply.isSuccess()) { if(LOG.isDebugEnabled()) { - LOG.debug(appendEntriesReply.toString()); + LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply); } } @@ -174,7 +175,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToLog.get(followerId); if(followerLogInformation == null){ - LOG.error("Unknown follower {}", followerId); + LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId); return this; } @@ -204,7 +205,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { int replicatedCount = 1; for (FollowerLogInformation info : followerToLog.values()) { - if (info.getMatchIndex().get() >= N) { + if (info.getMatchIndex() >= N) { replicatedCount++; } } @@ -228,16 +229,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return this; } + @Override protected ClientRequestTracker removeClientRequestTracker(long logIndex) { - - ClientRequestTracker toRemove = findClientRequestTracker(logIndex); - if(toRemove != null) { - trackerList.remove(toRemove); + final Iterator it = trackerList.iterator(); + while (it.hasNext()) { + final ClientRequestTracker t = it.next(); + if (t.getIndex() == logIndex) { + it.remove(); + return t; + } } - return toRemove; + return null; } + @Override protected ClientRequestTracker findClientRequestTracker(long logIndex) { for (ClientRequestTracker tracker : trackerList) { if (tracker.getIndex() == logIndex) { @@ -316,9 +322,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply if(LOG.isDebugEnabled()) { - LOG.debug("InstallSnapshotReply received, " + + LOG.debug("{}: InstallSnapshotReply received, " + "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", - reply.getChunkIndex(), followerId, + context.getId(), reply.getChunkIndex(), followerId, context.getReplicatedLog().getSnapshotIndex() + 1 ); } @@ -330,8 +336,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.remove(followerId); if(LOG.isDebugEnabled()) { - LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" + - followerToLog.get(followerId).getNextIndex().get()); + LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" + + context.getId(), followerToLog.get(followerId).getNextIndex()); } if (mapFollowerToSnapshot.isEmpty()) { @@ -344,19 +350,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(true); } } else { - LOG.info("InstallSnapshotReply received, " + - "sending snapshot chunk failed, Will retry, Chunk:{}", - reply.getChunkIndex() - ); + LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", + context.getId(), reply.getChunkIndex()); followerToSnapshot.markSendStatus(false); } } else { - LOG.error("ERROR!!" + - "FollowerId in InstallSnapshotReply not known to Leader" + + LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" + " or Chunk Index in InstallSnapshotReply not matching {} != {}", - followerToSnapshot.getChunkIndex(), reply.getChunkIndex() + context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex() ); if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ @@ -371,7 +374,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long logIndex = replicate.getReplicatedLogEntry().getIndex(); if(LOG.isDebugEnabled()) { - LOG.debug("Replicate message {}", logIndex); + LOG.debug("{}: Replicate message {}", context.getId(), logIndex); } // Create a tracker entry we will use this later to notify the @@ -398,9 +401,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (followerActor != null) { FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long followerNextIndex = followerLogInformation.getNextIndex().get(); + long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); - List entries = null; if (mapFollowerToSnapshot.get(followerId) != null) { // if install snapshot is in process , then sent next chunk if possible @@ -415,6 +417,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); + final List entries; if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { @@ -428,11 +431,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // then snapshot should be sent if(LOG.isDebugEnabled()) { - LOG.debug("InitiateInstallSnapshot to follower:{}," + - "follower-nextIndex:{}, leader-snapshot-index:{}, " + - "leader-last-index:{}", followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex - ); + LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + + "follower-nextIndex: %s, leader-snapshot-index: %s, " + + "leader-last-index: %s", context.getId(), followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); } actor().tell(new InitiateInstallSnapshot(), actor()); @@ -484,11 +486,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); if (followerActor != null) { - long nextIndex = e.getValue().getNextIndex().get(); + long nextIndex = e.getValue().getNextIndex(); if (!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)) { - LOG.info("{} follower needs a snapshot install", e.getKey()); + LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey()); if (snapshot.isPresent()) { // if a snapshot is present in the memory, most likely another install is in progress // no need to capture snapshot @@ -510,7 +512,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // on every install snapshot, we try to capture the snapshot. // Once a capture is going on, another one issued will get ignored by RaftActor. private void initiateCaptureSnapshot() { - LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId()); + LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId()); ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -535,7 +537,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); if (followerActor != null) { - long nextIndex = e.getValue().getNextIndex().get(); + long nextIndex = e.getValue().getNextIndex(); if (!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)) { @@ -563,12 +565,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ).toSerializable(), actor() ); - LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}", - followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks()); + LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", + context.getId(), followerActor.path(), + mapFollowerToSnapshot.get(followerId).getChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks()); } } catch (IOException e) { - LOG.error(e, "InstallSnapshot failed for Leader."); + LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId()); } } @@ -584,7 +587,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } ByteString nextChunk = followerToSnapshot.getNextChunk(); if (LOG.isDebugEnabled()) { - LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); + LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size()); } return nextChunk; } @@ -648,14 +651,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * snapshot chunks */ protected class FollowerToSnapshot { - private ByteString snapshotBytes; + private final ByteString snapshotBytes; private int offset = 0; // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset private int replyReceivedForOffset; // if replyStatus is false, the previous chunk is attempted private boolean replyStatus = false; private int chunkIndex; - private int totalChunks; + private final int totalChunks; private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; @@ -665,8 +668,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot {} bytes, total chunks to send:{}", - size, totalChunks); + LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", + context.getId(), size, totalChunks); } replyReceivedForOffset = -1; chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; @@ -735,7 +738,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } if(LOG.isDebugEnabled()) { - LOG.debug("length={}, offset={},size={}", + LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(), snapshotLength, start, size); } ByteString substring = getSnapshotBytes().substring(start, start + size);