X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=353d0b4a241f844f1338435e712202211263e3d7;hp=aa7b4533b7758f7a3f457ea48b1ae72cc6d94162;hb=6bfcbfc5158130c4255b861cb02dfaa68c52aa63;hpb=0fe0f14ff574f1f17d25b3129ea6073d1c1c8ef9 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index aa7b4533b7..353d0b4a24 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; @@ -43,6 +41,8 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * RaftActor encapsulates a state machine that needs to be kept synchronized @@ -85,8 +85,7 @@ import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntries * */ public abstract class RaftActor extends AbstractUntypedPersistentActor { - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + protected final Logger LOG = LoggerFactory.getLogger(getClass()); /** * The current state determines the current behavior of a RaftActor @@ -107,14 +106,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private CaptureSnapshot captureSnapshot = null; - private volatile boolean hasSnapshotCaptureInitiated = false; - private Stopwatch recoveryTimer; private int currentRecoveryBatchCount; - - public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); } @@ -131,8 +126,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void initRecoveryTimer() { if(recoveryTimer == null) { - recoveryTimer = new Stopwatch(); - recoveryTimer.start(); + recoveryTimer = Stopwatch.createStarted(); } } @@ -195,8 +189,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.setLastApplied(snapshot.getLastAppliedIndex()); context.setCommitIndex(snapshot.getLastAppliedIndex()); - Stopwatch timer = new Stopwatch(); - timer.start(); + Stopwatch timer = Stopwatch.createStarted(); // Apply the snapshot to the actors state applyRecoverySnapshot(snapshot.getState()); @@ -342,8 +335,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SaveSnapshotFailure) { SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; - LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:", - persistenceId()); + LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:", + persistenceId(), saveSnapshotFailure.cause()); context.getReplicatedLog().snapshotRollback(); @@ -436,7 +429,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self()); // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot - if(!hasSnapshotCaptureInitiated){ + if(!context.isSnapshotCaptureInitiated()){ raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(), raftContext.getTermInformation().getCurrentTerm()); raftContext.getReplicatedLog().snapshotCommit(); @@ -580,7 +573,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * This method is called during recovery to reconstruct the state of the actor. * - * @param snapshot A snapshot of the state of the actor + * @param snapshotBytes A snapshot of the state of the actor */ protected abstract void applyRecoverySnapshot(byte[] snapshotBytes); @@ -675,12 +668,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); - //be greedy and remove entries from in-mem journal which are in the snapshot - // and update snapshotIndex and snapshotTerm without waiting for the success, + long dataThreshold = Runtime.getRuntime().totalMemory() * + getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; + if (context.getReplicatedLog().dataSize() > dataThreshold) { + // if memory is less, clear the log based on lastApplied. + // this could/should only happen if one of the followers is down + // as normally we keep removing from the log when its replicated to all. + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); - context.getReplicatedLog().snapshotPreCommit( - captureSnapshot.getLastAppliedIndex(), - captureSnapshot.getLastAppliedTerm()); + } else { + // clear the log based on replicatedToAllIndex + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), + captureSnapshot.getReplicatedToAllTerm()); + } + getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), @@ -693,7 +695,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } captureSnapshot = null; - hasSnapshotCaptureInitiated = false; + context.setSnapshotCaptureInitiated(false); } protected boolean hasFollowers(){ @@ -724,13 +726,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // FIXME: Maybe this should be done after the command is saved journal.subList(adjustedIndex , journal.size()).clear(); - persistence().persist(new DeleteEntries(adjustedIndex), new Procedure(){ + persistence().persist(new DeleteEntries(adjustedIndex), new Procedure() { - @Override public void apply(DeleteEntries param) - throws Exception { + @Override + public void apply(DeleteEntries param) + throws Exception { //FIXME : Doing nothing for now dataSize = 0; - for(ReplicatedLogEntry entry : journal){ + for (ReplicatedLogEntry entry : journal) { dataSize += entry.size(); } } @@ -742,11 +745,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { appendAndPersist(replicatedLogEntry, null); } - @Override - public int dataSize() { - return dataSize; - } - public void appendAndPersist( final ReplicatedLogEntry replicatedLogEntry, final Procedure callback) { @@ -773,7 +771,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { long dataSizeForCheck = dataSize; dataSizeSinceLastSnapshot += logEntrySize; - long journalSize = lastIndex()+1; + long journalSize = lastIndex() + 1; if(!hasFollowers()) { // When we do not have followers we do not maintain an in-memory log @@ -794,7 +792,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; // when a snaphsot is being taken, captureSnapshot != null - if (hasSnapshotCaptureInitiated == false && + if (!context.isSnapshotCaptureInitiated() && ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 || dataSizeForCheck > dataThreshold)) { @@ -824,12 +822,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } // send a CaptureSnapshot to self to make the expensive operation async. - getSelf().tell(new CaptureSnapshot( - lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm), + long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex(); + ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); + getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, + (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), + (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)), null); - hasSnapshotCaptureInitiated = true; + context.setSnapshotCaptureInitiated(true); } - if(callback != null){ + if (callback != null){ callback.apply(replicatedLogEntry); } }