X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=c06ee9bd2b836c784c4adb75e7f318875ebf22c3;hb=fdab53ef9033fc83c812f7d3d6d3327d3d176f0f;hp=cfefd21c568029d1e4ce9eed3e5c139724fb5498;hpb=a0c5aba42aa36337ff1c6760175918b786897c9e;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index cfefd21c56..c06ee9bd2b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -12,17 +12,22 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.raft.ClientRequestTracker; +import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; -import java.util.Collections; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,21 +66,37 @@ public class Leader extends AbstractRaftActorBehavior { private Cancellable heartbeatCancel = null; - public Leader(RaftActorContext context, List followerPaths) { + private List trackerList = new ArrayList<>(); + + private final int minReplicationCount; + + public Leader(RaftActorContext context) { super(context); - for (String followerPath : followerPaths) { + if(lastIndex() >= 0) { + context.setCommitIndex(lastIndex()); + } + + for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerPath, - new AtomicLong(0), - new AtomicLong(0)); + new FollowerLogInformationImpl(followerId, + new AtomicLong(lastIndex()), + new AtomicLong(-1)); - followerToActor.put(followerPath, - context.actorSelection(followerLogInformation.getId())); - followerToLog.put(followerPath, followerLogInformation); + followerToActor.put(followerId, + context.actorSelection(context.getPeerAddress(followerId))); + + followerToLog.put(followerId, followerLogInformation); + + } + if (followerToActor.size() > 0) { + minReplicationCount = (followerToActor.size() + 1) / 2 + 1; + } else { + minReplicationCount = 0; } + // Immediately schedule a heartbeat // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to @@ -87,47 +108,184 @@ public class Leader extends AbstractRaftActorBehavior { @Override protected RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries, RaftState suggestedState) { + + context.getLogger() + .error("An unexpected AppendEntries received in state " + state()); + return suggestedState; } @Override protected RaftState handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply, RaftState suggestedState) { + + // Do not take any other action since a behavior change is coming + if (suggestedState != state()) + return suggestedState; + + // Update the FollowerLogInformation + String followerId = appendEntriesReply.getFollowerId(); + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + if (appendEntriesReply.isSuccess()) { + followerLogInformation + .setMatchIndex(appendEntriesReply.getLogLastIndex()); + followerLogInformation + .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); + } else { + followerLogInformation.decrNextIndex(); + } + + // Now figure out if this reply warrants a change in the commitIndex + // If there exists an N such that N > commitIndex, a majority + // of matchIndex[i] ≥ N, and log[N].term == currentTerm: + // set commitIndex = N (§5.3, §5.4). + for (long N = context.getCommitIndex() + 1; ; N++) { + int replicatedCount = 1; + + for (FollowerLogInformation info : followerToLog.values()) { + if (info.getMatchIndex().get() >= N) { + replicatedCount++; + } + } + + if (replicatedCount >= minReplicationCount){ + ReplicatedLogEntry replicatedLogEntry = + context.getReplicatedLog().get(N); + if (replicatedLogEntry != null + && replicatedLogEntry.getTerm() + == currentTerm()) { + context.setCommitIndex(N); + } + } else { + break; + } + } + + if(context.getCommitIndex() > context.getLastApplied()){ + applyLogToStateMachine(context.getCommitIndex()); + } + return suggestedState; } + protected ClientRequestTracker findClientRequestTracker(long logIndex) { + for (ClientRequestTracker tracker : trackerList) { + if (tracker.getIndex() == logIndex) { + return tracker; + } + } + + return null; + } + @Override protected RaftState handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply, RaftState suggestedState) { return suggestedState; } - @Override protected RaftState state() { + @Override public RaftState state() { return RaftState.Leader; } @Override public RaftState handleMessage(ActorRef sender, Object message) { Preconditions.checkNotNull(sender, "sender should not be null"); - scheduleHeartBeat(HEART_BEAT_INTERVAL); - - if (message instanceof SendHeartBeat) { - for (ActorSelection follower : followerToActor.values()) { - follower.tell(new AppendEntries( - context.getTermInformation().getCurrentTerm(), - context.getId(), - context.getReplicatedLog().last().getIndex(), - context.getReplicatedLog().last().getTerm(), - Collections.EMPTY_LIST, context.getCommitIndex()), - context.getActor()); + try { + if (message instanceof SendHeartBeat) { + return sendHeartBeat(); + } else if (message instanceof Replicate) { + + Replicate replicate = (Replicate) message; + long logIndex = replicate.getReplicatedLogEntry().getIndex(); + + context.getLogger().debug("Replicate message " + logIndex); + + if (followerToActor.size() == 0) { + context.setCommitIndex( + replicate.getReplicatedLogEntry().getIndex()); + + context.getActor() + .tell(new ApplyState(replicate.getClientActor(), + replicate.getIdentifier(), + replicate.getReplicatedLogEntry()), + context.getActor() + ); + } else { + + trackerList.add( + new ClientRequestTrackerImpl(replicate.getClientActor(), + replicate.getIdentifier(), + logIndex) + ); + + ReplicatedLogEntry prevEntry = + context.getReplicatedLog().get(lastIndex() - 1); + long prevLogIndex = -1; + long prevLogTerm = -1; + if (prevEntry != null) { + prevLogIndex = prevEntry.getIndex(); + prevLogTerm = prevEntry.getTerm(); + } + // Send an AppendEntries to all followers + for (String followerId : followerToActor.keySet()) { + ActorSelection followerActor = + followerToActor.get(followerId); + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex, prevLogTerm, + context.getReplicatedLog().getFrom( + followerLogInformation.getNextIndex() + .get() + ), context.getCommitIndex() + ), + actor() + ); + } + } } - return state(); + } finally { + scheduleHeartBeat(HEART_BEAT_INTERVAL); } + return super.handleMessage(sender, message); } - private void scheduleHeartBeat(FiniteDuration interval) { + private RaftState sendHeartBeat() { + if (followerToActor.size() > 0) { + for (String follower : followerToActor.keySet()) { + + FollowerLogInformation followerLogInformation = + followerToLog.get(follower); + + AtomicLong nextIndex = + followerLogInformation.getNextIndex(); + + List entries = + context.getReplicatedLog().getFrom(nextIndex.get()); + + followerToActor.get(follower).tell(new AppendEntries( + context.getTermInformation().getCurrentTerm(), + context.getId(), + context.getReplicatedLog().lastIndex(), + context.getReplicatedLog().lastTerm(), + entries, context.getCommitIndex()), + context.getActor() + ); + } + } + return state(); + } + + private void stopHeartBeat() { if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) { heartbeatCancel.cancel(); } + } + + private void scheduleHeartBeat(FiniteDuration interval) { + stopHeartBeat(); // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat // message is sent to itself. @@ -135,9 +293,18 @@ public class Leader extends AbstractRaftActorBehavior { // need to be sent if there are other messages being sent to the remote // actor. heartbeatCancel = - context.getActorSystem().scheduler().scheduleOnce(interval, + context.getActorSystem().scheduler().scheduleOnce( + interval, context.getActor(), new SendHeartBeat(), context.getActorSystem().dispatcher(), context.getActor()); } + @Override public void close() throws Exception { + stopHeartBeat(); + } + + @Override public String getLeaderId() { + return context.getId(); + } + }