import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
+ if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
+ context.getLogger()
+ .info("Follower: Received {}", appendEntries.toString());
+ }
+
// TODO : Refactor this method into a bunch of smaller methods
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
int addEntriesFrom = 0;
if (context.getReplicatedLog().size() > 0) {
- // Find the entry up until which the one that is not in the
- // follower's log
- for (int i = 0;
- i < appendEntries.getEntries()
- .size(); i++, addEntriesFrom++) {
- ReplicatedLogEntry matchEntry =
- appendEntries.getEntries().get(i);
- ReplicatedLogEntry newEntry = context.getReplicatedLog()
- .get(matchEntry.getIndex());
+ // Find the entry up until which the one that is not in the follower's log
+ for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
+ ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
+ ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
if (newEntry == null) {
//newEntry not found in the log
for (int i = addEntriesFrom;
i < appendEntries.getEntries().size(); i++) {
- context.getLogger().debug(
- "Append entry to log " + appendEntries.getEntries().get(i).getData()
+ context.getLogger().info(
+ "Append entry to log " + appendEntries.getEntries().get(
+ i).getData()
.toString()
);
context.getReplicatedLog()
return RaftState.Follower;
}
- @Override public RaftState handleMessage(ActorRef sender, Object message) {
+ @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+
+ Object message = fromSerializableMessage(originalMessage);
+
if (message instanceof RaftRPC) {
RaftRPC rpc = (RaftRPC) message;
// If RPC request or response contains term T > currentTerm:
if (message instanceof ElectionTimeout) {
return RaftState.Candidate;
+
} else if (message instanceof InstallSnapshot) {
- InstallSnapshot snapshot = (InstallSnapshot) message;
- actor().tell(new ApplySnapshot(snapshot), actor());
+ InstallSnapshot installSnapshot = (InstallSnapshot) message;
+ actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
}
scheduleElection(electionDuration());