X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=3ec8cc5c5817d92825ba882101d21c5b437863cd;hp=c256c822a420e3a22b5a351778d58a88e73a9e8d;hb=2a89ae48921724ef5a4ab42dcff6afc74c5b0a4a;hpb=d13a643d14259975c33793335b73a7ad26c470eb diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index c256c822a4..3ec8cc5c58 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; @@ -24,6 +22,7 @@ import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; import java.io.Serializable; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.notifications.RoleChanged; @@ -33,16 +32,15 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * RaftActor encapsulates a state machine that needs to be kept synchronized @@ -85,8 +83,10 @@ import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntries * */ public abstract class RaftActor extends AbstractUntypedPersistentActor { - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + + private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis + + protected final Logger LOG = LoggerFactory.getLogger(getClass()); /** * The current state determines the current behavior of a RaftActor @@ -107,14 +107,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private CaptureSnapshot captureSnapshot = null; - private volatile boolean hasSnapshotCaptureInitiated = false; - private Stopwatch recoveryTimer; private int currentRecoveryBatchCount; - - public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); } @@ -131,8 +127,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void initRecoveryTimer() { if(recoveryTimer == null) { - recoveryTimer = new Stopwatch(); - recoveryTimer.start(); + recoveryTimer = Stopwatch.createStarted(); } } @@ -179,7 +174,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onRecoveredSnapshot(SnapshotOffer offer) { if(LOG.isDebugEnabled()) { - LOG.debug("SnapshotOffer called.."); + LOG.debug("{}: SnapshotOffer called..", persistenceId()); } initRecoveryTimer(); @@ -195,8 +190,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.setLastApplied(snapshot.getLastAppliedIndex()); context.setCommitIndex(snapshot.getLastAppliedIndex()); - Stopwatch timer = new Stopwatch(); - timer.start(); + Stopwatch timer = Stopwatch.createStarted(); // Apply the snapshot to the actors state applyRecoverySnapshot(snapshot.getState()); @@ -209,7 +203,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { if(LOG.isDebugEnabled()) { - LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex()); + LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex()); } replicatedLog.append(logEntry); @@ -217,8 +211,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onRecoveredApplyLogEntries(ApplyLogEntries ale) { if(LOG.isDebugEnabled()) { - LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}", - context.getLastApplied() + 1, ale.getToIndex()); + LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}", + persistenceId(), context.getLastApplied() + 1, ale.getToIndex()); } for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) { @@ -288,9 +282,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; + long elapsedTime = (System.nanoTime() - applyState.getStartTime()); + if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){ + LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", + TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); + } + if(LOG.isDebugEnabled()) { - LOG.debug("Applying state for log index {} data {}", - applyState.getReplicatedLogEntry().getIndex(), + LOG.debug("{}: Applying state for log index {} data {}", + persistenceId(), applyState.getReplicatedLogEntry().getIndex(), applyState.getReplicatedLogEntry().getData()); } @@ -300,7 +300,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof ApplyLogEntries){ ApplyLogEntries ale = (ApplyLogEntries) message; if(LOG.isDebugEnabled()) { - LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex()); + LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex()); } persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { @Override @@ -312,8 +312,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); if(LOG.isDebugEnabled()) { - LOG.debug("ApplySnapshot called on Follower Actor " + - "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(), + LOG.debug("{}: ApplySnapshot called on Follower Actor " + + "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm() ); } @@ -333,7 +333,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SaveSnapshotSuccess) { SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; - LOG.info("SaveSnapshotSuccess received for snapshot"); + LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId()); long sequenceNumber = success.metadata().sequenceNr(); @@ -342,19 +342,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SaveSnapshotFailure) { SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; - LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString()); - LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:"); + LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:", + persistenceId(), saveSnapshotFailure.cause()); context.getReplicatedLog().snapshotRollback(); - LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + - "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", + LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), context.getReplicatedLog().size()); } else if (message instanceof CaptureSnapshot) { - LOG.info("CaptureSnapshot received by actor"); + LOG.info("{}: CaptureSnapshot received by actor", persistenceId()); if(captureSnapshot == null) { captureSnapshot = (CaptureSnapshot)message; @@ -365,13 +365,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); } else { - if (!(message instanceof AppendEntriesMessages.AppendEntries) - && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) { - if(LOG.isDebugEnabled()) { - LOG.debug("onReceiveCommand: message: {}", message.getClass()); - } - } - RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = currentBehavior.handleMessage(getSender(), message); @@ -414,7 +407,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getTermInformation().getCurrentTerm(), data); if(LOG.isDebugEnabled()) { - LOG.debug("Persist data {}", replicatedLogEntry); + LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry); } final RaftActorContext raftContext = getRaftActorContext(); @@ -436,12 +429,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self()); // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot - if(!hasSnapshotCaptureInitiated){ + if(!context.isSnapshotCaptureInitiated()){ raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(), raftContext.getTermInformation().getCurrentTerm()); raftContext.getReplicatedLog().snapshotCommit(); } else { - LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId()); + LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress", + persistenceId(), getId()); } } else if (clientActor != null) { // Send message for replication @@ -579,7 +573,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * This method is called during recovery to reconstruct the state of the actor. * - * @param snapshot A snapshot of the state of the actor + * @param snapshotBytes A snapshot of the state of the actor */ protected abstract void applyRecoverySnapshot(byte[] snapshotBytes); @@ -652,15 +646,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } String peerAddress = context.getPeerAddress(leaderId); if(LOG.isDebugEnabled()) { - LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}", - leaderId, peerAddress); + LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", + persistenceId(), leaderId, peerAddress); } return peerAddress; } private void handleCaptureSnapshotReply(byte[] snapshotBytes) { - LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length); + LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length); // create a snapshot object from the state provided and save it // when snapshot is saved async, SaveSnapshotSuccess is raised. @@ -672,17 +666,26 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { persistence().saveSnapshot(sn); - LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); + LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); - //be greedy and remove entries from in-mem journal which are in the snapshot - // and update snapshotIndex and snapshotTerm without waiting for the success, + long dataThreshold = Runtime.getRuntime().totalMemory() * + getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; + if (context.getReplicatedLog().dataSize() > dataThreshold) { + // if memory is less, clear the log based on lastApplied. + // this could/should only happen if one of the followers is down + // as normally we keep removing from the log when its replicated to all. + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); - context.getReplicatedLog().snapshotPreCommit( - captureSnapshot.getLastAppliedIndex(), - captureSnapshot.getLastAppliedTerm()); + } else { + // clear the log based on replicatedToAllIndex + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), + captureSnapshot.getReplicatedToAllTerm()); + } + getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); - LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + - "and term:{}", captureSnapshot.getLastAppliedIndex(), + LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + + "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) { @@ -692,7 +695,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } captureSnapshot = null; - hasSnapshotCaptureInitiated = false; + context.setSnapshotCaptureInitiated(false); } protected boolean hasFollowers(){ @@ -723,13 +726,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // FIXME: Maybe this should be done after the command is saved journal.subList(adjustedIndex , journal.size()).clear(); - persistence().persist(new DeleteEntries(adjustedIndex), new Procedure(){ + persistence().persist(new DeleteEntries(adjustedIndex), new Procedure() { - @Override public void apply(DeleteEntries param) - throws Exception { + @Override + public void apply(DeleteEntries param) + throws Exception { //FIXME : Doing nothing for now dataSize = 0; - for(ReplicatedLogEntry entry : journal){ + for (ReplicatedLogEntry entry : journal) { dataSize += entry.size(); } } @@ -741,17 +745,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { appendAndPersist(replicatedLogEntry, null); } - @Override - public int dataSize() { - return dataSize; - } - public void appendAndPersist( final ReplicatedLogEntry replicatedLogEntry, final Procedure callback) { if(LOG.isDebugEnabled()) { - LOG.debug("Append log entry and persist {} ", replicatedLogEntry); + LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry); } // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs @@ -772,7 +771,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { long dataSizeForCheck = dataSize; dataSizeSinceLastSnapshot += logEntrySize; - long journalSize = lastIndex()+1; + long journalSize = lastIndex() + 1; if(!hasFollowers()) { // When we do not have followers we do not maintain an in-memory log @@ -793,13 +792,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; // when a snaphsot is being taken, captureSnapshot != null - if (hasSnapshotCaptureInitiated == false && + if (!context.isSnapshotCaptureInitiated() && ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 || dataSizeForCheck > dataThreshold)) { dataSizeSinceLastSnapshot = 0; - LOG.info("Initiating Snapshot Capture.."); + LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," + + " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold); + long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -813,20 +814,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot Capture logSize: {}", journal.size()); - LOG.debug("Snapshot Capture lastApplied:{} ", - context.getLastApplied()); - LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); - LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); + LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size()); + LOG.debug("{}: Snapshot Capture lastApplied:{} ", + persistenceId(), context.getLastApplied()); + LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(), + lastAppliedIndex); + LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(), + lastAppliedTerm); } // send a CaptureSnapshot to self to make the expensive operation async. - getSelf().tell(new CaptureSnapshot( - lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm), + long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex(); + ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); + getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, + (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), + (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)), null); - hasSnapshotCaptureInitiated = true; + context.setSnapshotCaptureInitiated(true); } - if(callback != null){ + if (callback != null){ callback.apply(replicatedLogEntry); } } @@ -869,7 +875,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void update(long currentTerm, String votedFor) { if(LOG.isDebugEnabled()) { - LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor); + LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor); } this.currentTerm = currentTerm; this.votedFor = votedFor;