import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
-import akka.event.LoggingAdapter;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
private final Set<String> followers;
private Cancellable heartbeatSchedule = null;
- private Cancellable appendEntriesSchedule = null;
private Cancellable installSnapshotSchedule = null;
private List<ClientRequestTracker> trackerList = new ArrayList<>();
private final int minReplicationCount;
- private final LoggingAdapter LOG;
-
public Leader(RaftActorContext context) {
super(context);
- LOG = context.getLogger();
-
- if (lastIndex() >= 0) {
- context.setCommitIndex(lastIndex());
- }
-
followers = context.getPeerAddresses().keySet();
for (String followerId : followers) {
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId,
- new AtomicLong(lastIndex()),
+ new AtomicLong(context.getCommitIndex()),
new AtomicLong(-1));
followerToLog.put(followerId, followerLogInformation);
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Leader has following peers:" + followers);
+ LOG.debug("Election:Leader has following peers: {}", followers);
}
if (followers.size() > 0) {
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
LOG.debug(appendEntries.toString());
}
- return state();
+ return this;
}
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
if(! appendEntriesReply.isSuccess()) {
if(followerLogInformation == null){
LOG.error("Unknown follower {}", followerId);
- return state();
+ return this;
}
if (appendEntriesReply.isSuccess()) {
applyLogToStateMachine(context.getCommitIndex());
}
- return state();
+ return this;
}
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
return null;
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
- return state();
+ return this;
}
@Override public RaftState state() {
return RaftState.Leader;
}
- @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Preconditions.checkNotNull(sender, "sender should not be null");
Object message = fromSerializableMessage(originalMessage);
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- return RaftState.Follower;
+
+ return switchBehavior(new Follower(context));
}
}
try {
if (message instanceof SendHeartBeat) {
- return sendHeartBeat();
+ sendHeartBeat();
+ return this;
} else if(message instanceof SendInstallSnapshot) {
installSnapshotIfNeeded();
} else if (message instanceof Replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
if(LOG.isDebugEnabled()) {
- LOG.debug("Replicate message " + logIndex);
+ LOG.debug("Replicate message {}", logIndex);
}
// Create a tracker entry we will use this later to notify the
followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
mapFollowerToSnapshot.get(followerId).getTotalChunks());
} catch (IOException e) {
- LOG.error("InstallSnapshot failed for Leader.", e);
+ LOG.error(e, "InstallSnapshot failed for Leader.");
}
}
return nextChunk;
}
- private RaftState sendHeartBeat() {
+ private void sendHeartBeat() {
if (followers.size() > 0) {
sendAppendEntries();
}
- return state();
}
private void stopHeartBeat() {