- private void sendHeartBeat() {
- if (followers.size() > 0) {
- sendAppendEntries();
- }
- }
-
- private void stopHeartBeat() {
- if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
- heartbeatSchedule.cancel();
- }
- }
-
- private void stopInstallSnapshotSchedule() {
- if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
- installSnapshotSchedule.cancel();
- }
- }
-
- private void scheduleHeartBeat(FiniteDuration interval) {
- if(followers.size() == 0){
- // Optimization - do not bother scheduling a heartbeat as there are
- // no followers
- return;
- }
-
- stopHeartBeat();
-
- // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
- // message is sent to itself.
- // Scheduling the heartbeat only once here because heartbeats do not
- // need to be sent if there are other messages being sent to the remote
- // actor.
- heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
- interval, context.getActor(), new SendHeartBeat(),
- context.getActorSystem().dispatcher(), context.getActor());
- }
-
- private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
- if(followers.size() == 0){
- // Optimization - do not bother scheduling a heartbeat as there are
- // no followers
- return;
- }
-
- stopInstallSnapshotSchedule();
-
- // Schedule a message to send append entries to followers that can
- // accept an append entries with some data in it
- installSnapshotSchedule =
- context.getActorSystem().scheduler().scheduleOnce(
- interval,
- context.getActor(), new InitiateInstallSnapshot(),
- context.getActorSystem().dispatcher(), context.getActor());
- }
-
-
-
- @Override public void close() throws Exception {
- stopHeartBeat();
- }
-
- @Override public String getLeaderId() {
- return context.getId();
- }
-
- /**
- * Encapsulates the snapshot bytestring and handles the logic of sending
- * snapshot chunks
- */
- protected class FollowerToSnapshot {
- private ByteString snapshotBytes;
- private int offset = 0;
- // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
- private int replyReceivedForOffset;
- // if replyStatus is false, the previous chunk is attempted
- private boolean replyStatus = false;
- private int chunkIndex;
- private int totalChunks;
-
- public FollowerToSnapshot(ByteString snapshotBytes) {
- this.snapshotBytes = snapshotBytes;
- replyReceivedForOffset = -1;
- chunkIndex = 1;
- int size = snapshotBytes.size();
- totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
- ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot {} bytes, total chunks to send:{}",
- size, totalChunks);
- }
- }
-
- public ByteString getSnapshotBytes() {
- return snapshotBytes;
- }
-
- public int incrementOffset() {
- if(replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- offset = offset + context.getConfigParams().getSnapshotChunkSize();
- }
- return offset;
- }
-
- public int incrementChunkIndex() {
- if (replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- chunkIndex = chunkIndex + 1;
- }
- return chunkIndex;
- }
-
- public int getChunkIndex() {
- return chunkIndex;
- }
-
- public int getTotalChunks() {
- return totalChunks;
- }
-
- public boolean canSendNextChunk() {
- // we only send a false if a chunk is sent but we have not received a reply yet
- return replyReceivedForOffset == offset;
- }
-
- public boolean isLastChunk(int chunkIndex) {
- return totalChunks == chunkIndex;
- }
-
- public void markSendStatus(boolean success) {
- if (success) {
- // if the chunk sent was successful
- replyReceivedForOffset = offset;
- replyStatus = true;
- } else {
- // if the chunk sent was failure
- replyReceivedForOffset = offset;
- replyStatus = false;
- }
- }
-
- public ByteString getNextChunk() {
- int snapshotLength = getSnapshotBytes().size();
- int start = incrementOffset();
- int size = context.getConfigParams().getSnapshotChunkSize();
- if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
- size = snapshotLength;
- } else {
- if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("length={}, offset={},size={}",
- snapshotLength, start, size);
- }
- return getSnapshotBytes().substring(start, start + size);
-
- }
- }
-
- // called from example-actor for printing the follower-states
- public String printFollowerStates() {
- StringBuilder sb = new StringBuilder();
- for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
- sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
-
- }
- return "[" + sb.toString() + "]";