X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=462c94ec8a40736cc005c994b74520d9111c3430;hb=ce51608c403fd3ca5989afb4a4667e125f722fb4;hp=5606453e31be9a9303be5a28652c6e3b0b12caf9;hpb=1ccea52a9d18baf3d12c70232ba697941f71c7f6;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 5606453e31..462c94ec8a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,14 +14,18 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.protobuf.ByteString; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; @@ -76,14 +80,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // This would be passed as the hash code of the last chunk when sending the first chunk public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; - protected final Map followerToLog = new HashMap<>(); - protected final Map mapFollowerToSnapshot = new HashMap<>(); - - protected final Set followers; + private final Map followerToLog; + private final Map mapFollowerToSnapshot = new HashMap<>(); private Cancellable heartbeatSchedule = null; - private List trackerList = new ArrayList<>(); + private final Collection trackerList = new LinkedList<>(); protected final int minReplicationCount; @@ -94,24 +96,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public AbstractLeader(RaftActorContext context) { super(context); - followers = context.getPeerAddresses().keySet(); - - for (String followerId : followers) { + final Builder ftlBuilder = ImmutableMap.builder(); + for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, context.getCommitIndex(), -1, context.getConfigParams().getElectionTimeOutInterval()); - followerToLog.put(followerId, followerLogInformation); + ftlBuilder.put(followerId, followerLogInformation); } + followerToLog = ftlBuilder.build(); leaderId = context.getId(); - if(LOG.isDebugEnabled()) { - LOG.debug("Election:Leader has following peers: {}", followers); - } + LOG.debug("Election:Leader has following peers: {}", getFollowerIds()); - minReplicationCount = getMajorityVoteCount(followers.size()); + minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); // the isolated Leader peer count will be 1 less than the majority vote count. // this is because the vote count has the self vote counted in it @@ -130,6 +130,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); } + /** + * Return an immutable collection of follower identifiers. + * + * @return Collection of follower IDs + */ + protected final Collection getFollowerIds() { + return followerToLog.keySet(); + } + private Optional getSnapshot() { return snapshot; } @@ -196,7 +205,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { int replicatedCount = 1; for (FollowerLogInformation info : followerToLog.values()) { - if (info.getMatchIndex().get() >= N) { + if (info.getMatchIndex() >= N) { replicatedCount++; } } @@ -220,16 +229,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return this; } + @Override protected ClientRequestTracker removeClientRequestTracker(long logIndex) { - - ClientRequestTracker toRemove = findClientRequestTracker(logIndex); - if(toRemove != null) { - trackerList.remove(toRemove); + final Iterator it = trackerList.iterator(); + while (it.hasNext()) { + final ClientRequestTracker t = it.next(); + if (t.getIndex() == logIndex) { + it.remove(); + return t; + } } - return toRemove; + return null; } + @Override protected ClientRequestTracker findClientRequestTracker(long logIndex) { for (ClientRequestTracker tracker : trackerList) { if (tracker.getIndex() == logIndex) { @@ -322,8 +336,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.remove(followerId); if(LOG.isDebugEnabled()) { - LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" + - followerToLog.get(followerId).getNextIndex().get()); + LOG.debug("followerToLog.get(followerId).getNextIndex()=" + + followerToLog.get(followerId).getNextIndex()); } if (mapFollowerToSnapshot.isEmpty()) { @@ -374,7 +388,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { logIndex) ); - if (followers.size() == 0) { + if (followerToLog.isEmpty()) { context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); } else { @@ -384,14 +398,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendAppendEntries() { // Send an AppendEntries to all followers - for (String followerId : followers) { + for (Entry e : followerToLog.entrySet()) { + final String followerId = e.getKey(); ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long followerNextIndex = followerLogInformation.getNextIndex().get(); + long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); - List entries = null; if (mapFollowerToSnapshot.get(followerId) != null) { // if install snapshot is in process , then sent next chunk if possible @@ -406,6 +420,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); + final List entries; if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { @@ -471,23 +486,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * */ private void installSnapshotIfNeeded() { - for (String followerId : followers) { - ActorSelection followerActor = - context.getPeerActorSelection(followerId); - - if(followerActor != null) { - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); + for (Entry e : followerToLog.entrySet()) { + final ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); - long nextIndex = followerLogInformation.getNextIndex().get(); + if (followerActor != null) { + long nextIndex = e.getValue().getNextIndex(); if (!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)) { - LOG.info("{} follower needs a snapshot install", followerId); + LOG.info("{} follower needs a snapshot install", e.getKey()); if (snapshot.isPresent()) { // if a snapshot is present in the memory, most likely another install is in progress // no need to capture snapshot - sendSnapshotChunk(followerActor, followerId); + sendSnapshotChunk(followerActor, e.getKey()); } else { initiateCaptureSnapshot(); @@ -526,16 +537,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendInstallSnapshot() { - for (String followerId : followers) { - ActorSelection followerActor = context.getPeerActorSelection(followerId); + for (Entry e : followerToLog.entrySet()) { + ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); - if(followerActor != null) { - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long nextIndex = followerLogInformation.getNextIndex().get(); + if (followerActor != null) { + long nextIndex = e.getValue().getNextIndex(); if (!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)) { - sendSnapshotChunk(followerActor, followerId); + sendSnapshotChunk(followerActor, e.getKey()); } } } @@ -586,7 +596,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private void sendHeartBeat() { - if (followers.size() > 0) { + if (!followerToLog.isEmpty()) { sendAppendEntries(); } } @@ -598,7 +608,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private void scheduleHeartBeat(FiniteDuration interval) { - if(followers.size() == 0){ + if (followerToLog.isEmpty()) { // Optimization - do not bother scheduling a heartbeat as there are // no followers return; @@ -773,7 +783,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @VisibleForTesting - void markFollowerActive(String followerId) { - followerToLog.get(followerId).markFollowerActive(); + public FollowerLogInformation getFollower(String followerId) { + return followerToLog.get(followerId); + } + + @VisibleForTesting + protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) { + mapFollowerToSnapshot.put(followerId, snapshot); + } + + @VisibleForTesting + public int followerSnapshotSize() { + return mapFollowerToSnapshot.size(); + } + + @VisibleForTesting + public int followerLogSize() { + return followerToLog.size(); } }