import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Queue;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
- private Cancellable heartbeatSchedule = null;
-
- private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
-
- private int minReplicationCount;
+ /**
+ * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
+ * expect the entries to be modified in sequence, hence we open-code the lookup.
+ *
+ * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
+ * but we already expect those to be far from frequent.
+ */
+ private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
+ private Cancellable heartbeatSchedule = null;
private Optional<SnapshotHolder> snapshot;
+ private int minReplicationCount;
- public AbstractLeader(RaftActorContext context) {
- super(context, RaftState.Leader);
-
- setLeaderPayloadVersion(context.getPayloadVersion());
+ protected AbstractLeader(RaftActorContext context, RaftState state) {
+ super(context, state);
for(PeerInfo peerInfo: context.getPeers()) {
FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
followerToLog.put(peerInfo.getId(), followerLogInformation);
}
- leaderId = context.getId();
-
LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
updateMinReplicaCount();
@Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
- final Iterator<ClientRequestTracker> it = trackerList.iterator();
+ final Iterator<ClientRequestTracker> it = trackers.iterator();
while (it.hasNext()) {
final ClientRequestTracker t = it.next();
if (t.getIndex() == logIndex) {
return null;
}
- @Override
- protected ClientRequestTracker findClientRequestTracker(long logIndex) {
- for (ClientRequestTracker tracker : trackerList) {
- if (tracker.getIndex() == logIndex) {
- return tracker;
- }
- }
- return null;
- }
-
@Override
protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
beforeSendHeartbeat();
sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
- return this;
-
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
setSnapshot(((SendInstallSnapshot) message).getSnapshot());
sendInstallSnapshot();
-
} else if (message instanceof Replicate) {
replicate((Replicate) message);
-
- } else if (message instanceof InstallSnapshotReply){
+ } else if (message instanceof InstallSnapshotReply) {
handleInstallSnapshotReply((InstallSnapshotReply) message);
-
+ } else {
+ return super.handleMessage(sender, message);
}
-
- return super.handleMessage(sender, message);
+ return this;
}
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
// Create a tracker entry we will use this later to notify the
// client actor
- trackerList.add(
+ trackers.add(
new ClientRequestTrackerImpl(replicate.getClientActor(),
replicate.getIdentifier(),
logIndex)
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
- ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
+ byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
followerToSnapshot.incrementChunkIndex(),
followerToSnapshot.getTotalChunks(),
Optional.of(followerToSnapshot.getLastChunkHashCode())
- ).toSerializable(),
+ ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
actor()
);
* Acccepts snaphot as ByteString, enters into map for future chunks
* creates and return a ByteString chunk
*/
- private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+ private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
- ByteString nextChunk = followerToSnapshot.getNextChunk();
+ byte[] nextChunk = followerToSnapshot.getNextChunk();
- LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
+ LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
return nextChunk;
}
// need to be sent if there are other messages being sent to the remote
// actor.
heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
- interval, context.getActor(), new SendHeartBeat(),
+ interval, context.getActor(), SendHeartBeat.INSTANCE,
context.getActorSystem().dispatcher(), context.getActor());
}
@Override
- public void close() throws Exception {
+ public void close() {
stopHeartBeat();
}
@Override
- public String getLeaderId() {
+ public final String getLeaderId() {
return context.getId();
}
+ @Override
+ public final short getLeaderPayloadVersion() {
+ return context.getPayloadVersion();
+ }
+
protected boolean isLeaderIsolated() {
int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
- if (followerLogInformation.isFollowerActive()) {
+ final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+ if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
--minPresent;
if (minPresent == 0) {
- break;
+ return false;
}
}
}
}
}
- public ByteString getNextChunk() {
+ public byte[] getNextChunk() {
int snapshotLength = getSnapshotBytes().size();
int start = incrementOffset();
int size = context.getConfigParams().getSnapshotChunkSize();
if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
size = snapshotLength;
- } else {
- if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
+ } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+ size = snapshotLength - start;
}
+ byte[] nextChunk = new byte[size];
+ getSnapshotBytes().copyTo(nextChunk, start, 0, size);
+ nextChunkHashCode = Arrays.hashCode(nextChunk);
- LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
- snapshotLength, start, size);
-
- ByteString substring = getSnapshotBytes().substring(start, start + size);
- nextChunkHashCode = substring.hashCode();
- return substring;
+ LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
+ snapshotLength, start, size, nextChunkHashCode);
+ return nextChunk;
}
/**