X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=de748675a7aae0ce4ab617cc8d0d698725dd3e6c;hb=ca3b362ff7d53c6846573e782070f1db287b9e9b;hp=199d2d61cf5bbbba34ad8cfa62709228331d2b0f;hpb=5a15471e74536f8fe6d62747b7b822655a17dd4e;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 199d2d61cf..de748675a7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; -import akka.event.LoggingAdapter; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; @@ -74,37 +73,28 @@ public class Leader extends AbstractRaftActorBehavior { private final Set followers; private Cancellable heartbeatSchedule = null; - private Cancellable appendEntriesSchedule = null; private Cancellable installSnapshotSchedule = null; private List trackerList = new ArrayList<>(); private final int minReplicationCount; - private final LoggingAdapter LOG; - public Leader(RaftActorContext context) { super(context); - LOG = context.getLogger(); - - if (lastIndex() >= 0) { - context.setCommitIndex(lastIndex()); - } - followers = context.getPeerAddresses().keySet(); for (String followerId : followers) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, - new AtomicLong(lastIndex()), + new AtomicLong(context.getCommitIndex()), new AtomicLong(-1)); followerToLog.put(followerId, followerLogInformation); } if(LOG.isDebugEnabled()) { - LOG.debug("Election:Leader has following peers:" + followers); + LOG.debug("Election:Leader has following peers: {}", followers); } if (followers.size() > 0) { @@ -127,17 +117,17 @@ public class Leader extends AbstractRaftActorBehavior { } - @Override protected RaftState handleAppendEntries(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { if(LOG.isDebugEnabled()) { LOG.debug(appendEntries.toString()); } - return state(); + return this; } - @Override protected RaftState handleAppendEntriesReply(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { if(! appendEntriesReply.isSuccess()) { @@ -153,7 +143,7 @@ public class Leader extends AbstractRaftActorBehavior { if(followerLogInformation == null){ LOG.error("Unknown follower {}", followerId); - return state(); + return this; } if (appendEntriesReply.isSuccess()) { @@ -203,7 +193,7 @@ public class Leader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } - return state(); + return this; } protected ClientRequestTracker removeClientRequestTracker(long logIndex) { @@ -226,16 +216,16 @@ public class Leader extends AbstractRaftActorBehavior { return null; } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, + @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { - return state(); + return this; } @Override public RaftState state() { return RaftState.Leader; } - @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) { + @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { Preconditions.checkNotNull(sender, "sender should not be null"); Object message = fromSerializableMessage(originalMessage); @@ -247,13 +237,15 @@ public class Leader extends AbstractRaftActorBehavior { // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return RaftState.Follower; + + return switchBehavior(new Follower(context)); } } try { if (message instanceof SendHeartBeat) { - return sendHeartBeat(); + sendHeartBeat(); + return this; } else if(message instanceof SendInstallSnapshot) { installSnapshotIfNeeded(); } else if (message instanceof Replicate) { @@ -325,7 +317,7 @@ public class Leader extends AbstractRaftActorBehavior { long logIndex = replicate.getReplicatedLogEntry().getIndex(); if(LOG.isDebugEnabled()) { - LOG.debug("Replicate message " + logIndex); + LOG.debug("Replicate message {}", logIndex); } // Create a tracker entry we will use this later to notify the @@ -449,7 +441,7 @@ public class Leader extends AbstractRaftActorBehavior { followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), mapFollowerToSnapshot.get(followerId).getTotalChunks()); } catch (IOException e) { - LOG.error("InstallSnapshot failed for Leader.", e); + LOG.error(e, "InstallSnapshot failed for Leader."); } } @@ -471,11 +463,10 @@ public class Leader extends AbstractRaftActorBehavior { return nextChunk; } - private RaftState sendHeartBeat() { + private void sendHeartBeat() { if (followers.size() > 0) { sendAppendEntries(); } - return state(); } private void stopHeartBeat() {