X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=a1ec4d883135f42d88b9d08db82ba49acb1816ba;hb=61e85d54cfcd70053993f910092eba1ab3fcc850;hp=fd78431d16bbf593535174844029e4afc502436d;hpb=252ba03242407ee584c38fafdbfa1c322e66151d;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index fd78431d16..a1ec4d8831 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -25,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; @@ -84,26 +85,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Map followerToLog = new HashMap<>(); private final Map mapFollowerToSnapshot = new HashMap<>(); - private Cancellable heartbeatSchedule = null; - - private final Collection trackerList = new LinkedList<>(); - - private int minReplicationCount; + /** + * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really + * expect the entries to be modified in sequence, hence we open-code the lookup. + * + * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly, + * but we already expect those to be far from frequent. + */ + private final Queue trackers = new LinkedList<>(); + private Cancellable heartbeatSchedule = null; private Optional snapshot; + private int minReplicationCount; - public AbstractLeader(RaftActorContext context) { - super(context, RaftState.Leader); - - setLeaderPayloadVersion(context.getPayloadVersion()); + protected AbstractLeader(RaftActorContext context, RaftState state) { + super(context, state); for(PeerInfo peerInfo: context.getPeers()) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); followerToLog.put(peerInfo.getId(), followerLogInformation); } - leaderId = context.getId(); - LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); updateMinReplicaCount(); @@ -253,19 +255,34 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // If there exists an N such that N > commitIndex, a majority // of matchIndex[i] ≥ N, and log[N].term == currentTerm: // set commitIndex = N (§5.3, §5.4). + if(LOG.isTraceEnabled()) { + LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", + logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm()); + } + for (long N = context.getCommitIndex() + 1; ; N++) { int replicatedCount = 1; + LOG.trace("{}: checking Nth index {}", logName(), N); for (FollowerLogInformation info : followerToLog.values()) { final PeerInfo peerInfo = context.getPeerInfo(info.getId()); if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) { replicatedCount++; + } else if(LOG.isTraceEnabled()) { + LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), + info.getMatchIndex(), peerInfo); } } + if(LOG.isTraceEnabled()) { + LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount); + } + if (replicatedCount >= minReplicationCount) { ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); if (replicatedLogEntry == null) { + LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", + logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size()); break; } @@ -275,9 +292,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // reach consensus, as per §5.4.1: "once an entry from the current term is committed by // counting replicas, then all prior entries are committed indirectly". if (replicatedLogEntry.getTerm() == currentTerm()) { + LOG.trace("{}: Setting commit index to {}", logName(), N); context.setCommitIndex(N); + } else { + LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}", + logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm()); } } else { + LOG.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount); break; } } @@ -329,7 +351,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { @Override protected ClientRequestTracker removeClientRequestTracker(long logIndex) { - final Iterator it = trackerList.iterator(); + final Iterator it = trackers.iterator(); while (it.hasNext()) { final ClientRequestTracker t = it.next(); if (t.getIndex() == logIndex) { @@ -341,16 +363,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return null; } - @Override - protected ClientRequestTracker findClientRequestTracker(long logIndex) { - for (ClientRequestTracker tracker : trackerList) { - if (tracker.getIndex() == logIndex) { - return tracker; - } - } - return null; - } - @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { @@ -384,23 +396,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { beforeSendHeartbeat(); sendHeartBeat(); scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); - return this; - } else if(message instanceof SendInstallSnapshot) { // received from RaftActor setSnapshot(((SendInstallSnapshot) message).getSnapshot()); sendInstallSnapshot(); - } else if (message instanceof Replicate) { replicate((Replicate) message); - - } else if (message instanceof InstallSnapshotReply){ + } else if (message instanceof InstallSnapshotReply) { handleInstallSnapshotReply((InstallSnapshotReply) message); - + } else { + return super.handleMessage(sender, message); } - - return super.handleMessage(sender, message); + return this; } private void handleInstallSnapshotReply(InstallSnapshotReply reply) { @@ -501,7 +509,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Create a tracker entry we will use this later to notify the // client actor - trackerList.add( + trackers.add( new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(), logIndex) @@ -765,27 +773,33 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // need to be sent if there are other messages being sent to the remote // actor. heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( - interval, context.getActor(), new SendHeartBeat(), + interval, context.getActor(), SendHeartBeat.INSTANCE, context.getActorSystem().dispatcher(), context.getActor()); } @Override - public void close() throws Exception { + public void close() { stopHeartBeat(); } @Override - public String getLeaderId() { + public final String getLeaderId() { return context.getId(); } + @Override + public final short getLeaderPayloadVersion() { + return context.getPayloadVersion(); + } + protected boolean isLeaderIsolated() { int minPresent = getMinIsolatedLeaderPeerCount(); for (FollowerLogInformation followerLogInformation : followerToLog.values()) { - if (followerLogInformation.isFollowerActive()) { + final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId()); + if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) { --minPresent; if (minPresent == 0) { - break; + return false; } } }