import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
private final Map<String, FollowerLogInformation> followerToLog =
new HashMap();
- private final Map<String, ActorSelection> followerToActor = new HashMap<>();
+ private final Set<String> followers;
private Cancellable heartbeatSchedule = null;
private Cancellable appendEntriesSchedule = null;
context.setCommitIndex(lastIndex());
}
- for (String followerId : context.getPeerAddresses().keySet()) {
+ followers = context.getPeerAddresses().keySet();
+
+ for (String followerId : followers) {
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId,
new AtomicLong(lastIndex()),
new AtomicLong(-1));
- followerToActor.put(followerId,
- context.actorSelection(context.getPeerAddress(followerId)));
-
followerToLog.put(followerId, followerLogInformation);
}
- context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet());
+ context.getLogger().debug("Election:Leader has following peers:"+ followers);
- if (followerToActor.size() > 0) {
- minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
+ if (followers.size() > 0) {
+ minReplicationCount = (followers.size() + 1) / 2 + 1;
} else {
minReplicationCount = 0;
}
scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
scheduleInstallSnapshotCheck(
- new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
- HEART_BEAT_INTERVAL.unit())
+ new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
+ context.getConfigParams().getHeartBeatInterval().unit())
);
}
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
+ context.getLogger().info("Leader: Received {}", appendEntries.toString());
+
return state();
}
@Override protected RaftState handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
+ if(! appendEntriesReply.isSuccess()) {
+ context.getLogger()
+ .info("Leader: Received {}", appendEntriesReply.toString());
+ }
+
// Update the FollowerLogInformation
String followerId = appendEntriesReply.getFollowerId();
FollowerLogInformation followerLogInformation =
followerToLog.get(followerId);
+
+ if(followerLogInformation == null){
+ context.getLogger().error("Unknown follower {}", followerId);
+ return state();
+ }
+
if (appendEntriesReply.isSuccess()) {
followerLogInformation
.setMatchIndex(appendEntriesReply.getLogLastIndex());
return RaftState.Leader;
}
- @Override public RaftState handleMessage(ActorRef sender, Object message) {
+ @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
Preconditions.checkNotNull(sender, "sender should not be null");
+ Object message = fromSerializableMessage(originalMessage);
+
if (message instanceof RaftRPC) {
RaftRPC rpc = (RaftRPC) message;
// If RPC request or response contains term T > currentTerm:
(InstallSnapshotReply) message);
}
} finally {
- scheduleHeartBeat(HEART_BEAT_INTERVAL);
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
return super.handleMessage(sender, message);
context.getLogger().debug("Replicate message " + logIndex);
- if (followerToActor.size() == 0) {
- context.setCommitIndex(
- replicate.getReplicatedLogEntry().getIndex());
+ // Create a tracker entry we will use this later to notify the
+ // client actor
+ trackerList.add(
+ new ClientRequestTrackerImpl(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ logIndex)
+ );
- context.getActor()
- .tell(new ApplyState(replicate.getClientActor(),
- replicate.getIdentifier(),
- replicate.getReplicatedLogEntry()),
- context.getActor()
- );
+ if (followers.size() == 0) {
+ context.setCommitIndex(logIndex);
+ applyLogToStateMachine(logIndex);
} else {
-
- // Create a tracker entry we will use this later to notify the
- // client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
sendAppendEntries();
}
}
private void sendAppendEntries() {
// Send an AppendEntries to all followers
- for (String followerId : followerToActor.keySet()) {
+ for (String followerId : followers) {
ActorSelection followerActor =
- followerToActor.get(followerId);
+ context.getPeerActorSelection(followerId);
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
+ if (followerActor != null) {
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
- long nextIndex = followerLogInformation.getNextIndex().get();
+ long nextIndex = followerLogInformation.getNextIndex().get();
- List<ReplicatedLogEntry> entries = Collections.emptyList();
+ List<ReplicatedLogEntry> entries = Collections.emptyList();
- if(context.getReplicatedLog().isPresent(nextIndex)){
- // TODO: Instead of sending all entries from nextIndex
- // only send a fixed number of entries to each follower
- // This is to avoid the situation where there are a lot of
- // entries to install for a fresh follower or to a follower
- // that has fallen too far behind with the log but yet is not
- // eligible to receive a snapshot
- entries =
- context.getReplicatedLog().getFrom(nextIndex);
- }
+ if (context.getReplicatedLog().isPresent(nextIndex)) {
+ // TODO: Instead of sending all entries from nextIndex
+ // only send a fixed number of entries to each follower
+ // This is to avoid the situation where there are a lot of
+ // entries to install for a fresh follower or to a follower
+ // that has fallen too far behind with the log but yet is not
+ // eligible to receive a snapshot
+ entries =
+ context.getReplicatedLog().getFrom(nextIndex, 1);
+ }
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(nextIndex), prevLogTerm(nextIndex),
- entries, context.getCommitIndex()
- ),
- actor()
- );
+ followerActor.tell(
+ new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(nextIndex),
+ prevLogTerm(nextIndex), entries,
+ context.getCommitIndex()).toSerializable(),
+ actor()
+ );
+ }
}
}
* snapshots at every heartbeat.
*/
private void installSnapshotIfNeeded(){
- for (String followerId : followerToActor.keySet()) {
+ for (String followerId : followers) {
ActorSelection followerActor =
- followerToActor.get(followerId);
-
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- context.getReplicatedLog().getSnapshot()
- ),
- actor()
- );
+ context.getPeerActorSelection(followerId);
+
+ if(followerActor != null) {
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ if (!context.getReplicatedLog().isPresent(nextIndex) && context
+ .getReplicatedLog().isInSnapshot(nextIndex)) {
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().getSnapshot()
+ ),
+ actor()
+ );
+ }
}
}
}
private RaftState sendHeartBeat() {
- if (followerToActor.size() > 0) {
+ if (followers.size() > 0) {
sendAppendEntries();
}
return state();
}
private void scheduleHeartBeat(FiniteDuration interval) {
- if(followerToActor.keySet().size() == 0){
+ if(followers.size() == 0){
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
return;
private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
- if(followerToActor.keySet().size() == 0){
+ if(followers.size() == 0){
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
return;