package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
-import akka.event.LoggingAdapter;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
public class Follower extends AbstractRaftActorBehavior {
private ByteString snapshotChunksCollected = ByteString.EMPTY;
- private final LoggingAdapter LOG;
-
public Follower(RaftActorContext context) {
super(context);
- LOG = context.getLogger();
-
scheduleElection(electionDuration());
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
);
- return state();
+ return this;
}
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
LOG.debug(
- "Number of entries to be appended = " + appendEntries
- .getEntries().size()
+ "Number of entries to be appended = {}", appendEntries.getEntries().size()
);
}
if(LOG.isDebugEnabled()) {
LOG.debug(
- "Removing entries from log starting at "
- + matchEntry.getIndex()
+ "Removing entries from log starting at {}", matchEntry.getIndex()
);
}
}
if(LOG.isDebugEnabled()) {
- context.getLogger().debug(
- "After cleanup entries to be added from = " + (addEntriesFrom
- + lastIndex())
+ LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
);
}
for (int i = addEntriesFrom;
i < appendEntries.getEntries().size(); i++) {
- context.getLogger().info(
- "Append entry to log " + appendEntries.getEntries().get(
- i).getData()
- .toString()
- );
- context.getReplicatedLog()
- .appendAndPersist(appendEntries.getEntries().get(i));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+ }
+ context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Log size is now " + context.getReplicatedLog().size());
+ LOG.debug("Log size is now {}", context.getReplicatedLog().size());
}
}
if (prevCommitIndex != context.getCommitIndex()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Commit index set to " + context.getCommitIndex());
+ LOG.debug("Commit index set to {}", context.getCommitIndex());
}
}
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm()), actor());
- return state();
+ return this;
}
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
- return state();
+ return this;
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
- return state();
+ return this;
}
@Override public RaftState state() {
return RaftState.Follower;
}
- @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Object message = fromSerializableMessage(originalMessage);
}
if (message instanceof ElectionTimeout) {
- return RaftState.Candidate;
+ return switchBehavior(new Candidate(context));
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
// this is the last chunk, create a snapshot object and apply
snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
- context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
- snapshotChunksCollected.size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
+ snapshotChunksCollected.size());
+ }
Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
new ArrayList<ReplicatedLogEntry>(),
true), actor());
} catch (Exception e) {
- context.getLogger().error("Exception in InstallSnapshot of follower", e);
+ LOG.error(e, "Exception in InstallSnapshot of follower:");
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
installSnapshot.getChunkIndex(), false), actor());