import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Queue;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
- private Cancellable heartbeatSchedule = null;
-
- private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
-
- private int minReplicationCount;
+ /**
+ * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
+ * expect the entries to be modified in sequence, hence we open-code the lookup.
+ *
+ * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
+ * but we already expect those to be far from frequent.
+ */
+ private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
+ private Cancellable heartbeatSchedule = null;
private Optional<SnapshotHolder> snapshot;
+ private int minReplicationCount;
- public AbstractLeader(RaftActorContext context) {
- super(context, RaftState.Leader);
-
- setLeaderPayloadVersion(context.getPayloadVersion());
+ protected AbstractLeader(RaftActorContext context, RaftState state) {
+ super(context, state);
for(PeerInfo peerInfo: context.getPeers()) {
FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
followerToLog.put(peerInfo.getId(), followerLogInformation);
}
- leaderId = context.getId();
-
LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
updateMinReplicaCount();
@Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
- final Iterator<ClientRequestTracker> it = trackerList.iterator();
+ final Iterator<ClientRequestTracker> it = trackers.iterator();
while (it.hasNext()) {
final ClientRequestTracker t = it.next();
if (t.getIndex() == logIndex) {
return null;
}
- @Override
- protected ClientRequestTracker findClientRequestTracker(long logIndex) {
- for (ClientRequestTracker tracker : trackerList) {
- if (tracker.getIndex() == logIndex) {
- return tracker;
- }
- }
- return null;
- }
-
@Override
protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
beforeSendHeartbeat();
sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
- return this;
-
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
setSnapshot(((SendInstallSnapshot) message).getSnapshot());
sendInstallSnapshot();
-
} else if (message instanceof Replicate) {
replicate((Replicate) message);
-
- } else if (message instanceof InstallSnapshotReply){
+ } else if (message instanceof InstallSnapshotReply) {
handleInstallSnapshotReply((InstallSnapshotReply) message);
-
+ } else {
+ return super.handleMessage(sender, message);
}
-
- return super.handleMessage(sender, message);
+ return this;
}
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
// Create a tracker entry we will use this later to notify the
// client actor
- trackerList.add(
+ trackers.add(
new ClientRequestTrackerImpl(replicate.getClientActor(),
replicate.getIdentifier(),
logIndex)
// need to be sent if there are other messages being sent to the remote
// actor.
heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
- interval, context.getActor(), new SendHeartBeat(),
+ interval, context.getActor(), SendHeartBeat.INSTANCE,
context.getActorSystem().dispatcher(), context.getActor());
}
@Override
- public void close() throws Exception {
+ public void close() {
stopHeartBeat();
}
@Override
- public String getLeaderId() {
+ public final String getLeaderId() {
return context.getId();
}
+ @Override
+ public final short getLeaderPayloadVersion() {
+ return context.getPayloadVersion();
+ }
+
protected boolean isLeaderIsolated() {
int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
- if (followerLogInformation.isFollowerActive()) {
+ final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+ if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
--minPresent;
if (minPresent == 0) {
- break;
+ return false;
}
}
}