X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FSnapshotManager.java;h=ef39416cf5a5bc7695ba5b10695c488e0b7fb35f;hb=ebb756e983e44035641b0e890ca6baa96e34c6dd;hp=e6dc8bb367d6f0c27c49765e52e6dbdbcaa7de73;hpb=3859df9beca8f13f1ff2b2744ed3470a1715bec3;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index e6dc8bb367..ef39416cf5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -68,7 +68,7 @@ public class SnapshotManager implements SnapshotState { */ public SnapshotManager(final RaftActorContext context, final Logger logger) { this.context = context; - this.log = logger; + log = logger; } public boolean isApplying() { @@ -91,6 +91,11 @@ public class SnapshotManager implements SnapshotState { return currentState.capture(lastLogEntry, replicatedToAllIndex); } + @Override + public boolean captureWithForcedTrim(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + return currentState.captureWithForcedTrim(lastLogEntry, replicatedToAllIndex); + } + @Override public void apply(final ApplySnapshot snapshot) { currentState.apply(snapshot); @@ -154,7 +159,8 @@ public class SnapshotManager implements SnapshotState { * @param replicatedToAllIndex the index of the last entry replicated to all followers. * @return a new CaptureSnapshot instance. */ - public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, + final boolean mandatoryTrim) { TermInformationReader lastAppliedTermInfoReader = lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), lastLogEntry, hasFollowers()); @@ -170,18 +176,23 @@ public class SnapshotManager implements SnapshotState { List unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1); - long lastLogEntryIndex = lastAppliedIndex; - long lastLogEntryTerm = lastAppliedTerm; - if (lastLogEntry != null) { + final long lastLogEntryIndex; + final long lastLogEntryTerm; + if (lastLogEntry == null) { + // When we don't have journal present, for example two captureSnapshots executed right after another with no + // new journal we still want to preserve the index and term in the snapshot. + lastAppliedIndex = lastLogEntryIndex = context.getReplicatedLog().getSnapshotIndex(); + lastAppliedTerm = lastLogEntryTerm = context.getReplicatedLog().getSnapshotTerm(); + + log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using snapshot values lastAppliedIndex {} and " + + "lastAppliedTerm {} instead.", persistenceId(), lastAppliedIndex, lastAppliedTerm); + } else { lastLogEntryIndex = lastLogEntry.getIndex(); lastLogEntryTerm = lastLogEntry.getTerm(); - } else { - log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and " - + "lastAppliedTerm {} instead.", persistenceId(), lastAppliedIndex, lastAppliedTerm); } return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm, - newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries); + newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, mandatoryTrim); } private class AbstractSnapshotState implements SnapshotState { @@ -204,6 +215,12 @@ public class SnapshotManager implements SnapshotState { return false; } + @Override + public boolean captureWithForcedTrim(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + log.debug("captureWithForcedTrim should not be called in state {}", this); + return false; + } + @Override public void apply(final ApplySnapshot snapshot) { log.debug("apply should not be called in state {}", this); @@ -248,7 +265,7 @@ public class SnapshotManager implements SnapshotState { //use the term of the temp-min, since we check for isPresent, entry will not be null ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); - context.getReplicatedLog().snapshotCommit(); + context.getReplicatedLog().snapshotCommit(false); return tempMin; } @@ -274,8 +291,8 @@ public class SnapshotManager implements SnapshotState { @SuppressWarnings("checkstyle:IllegalCatch") private boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, - final String targetFollower) { - captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex); + final String targetFollower, final boolean mandatoryTrim) { + captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, mandatoryTrim); OutputStream installSnapshotStream = null; if (targetFollower != null) { @@ -290,12 +307,12 @@ public class SnapshotManager implements SnapshotState { log.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber); - SnapshotManager.this.currentState = CREATING; + currentState = CREATING; try { createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream)); } catch (Exception e) { - SnapshotManager.this.currentState = IDLE; + currentState = IDLE; log.error("Error creating snapshot", e); return false; } @@ -305,18 +322,23 @@ public class SnapshotManager implements SnapshotState { @Override public boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { - return capture(lastLogEntry, replicatedToAllIndex, null); + return capture(lastLogEntry, replicatedToAllIndex, null, false); } @Override public boolean captureToInstall(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, final String targetFollower) { - return capture(lastLogEntry, replicatedToAllIndex, targetFollower); + return capture(lastLogEntry, replicatedToAllIndex, targetFollower, false); + } + + @Override + public boolean captureWithForcedTrim(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + return capture(lastLogEntry, replicatedToAllIndex, null, true); } @Override public void apply(final ApplySnapshot toApply) { - SnapshotManager.this.applySnapshot = toApply; + applySnapshot = toApply; lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber(); @@ -324,7 +346,7 @@ public class SnapshotManager implements SnapshotState { context.getPersistenceProvider().saveSnapshot(toApply.getSnapshot()); - SnapshotManager.this.currentState = PERSISTING; + currentState = PERSISTING; } @Override @@ -357,24 +379,29 @@ public class SnapshotManager implements SnapshotState { log.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot); - long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; - boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold; + final ConfigParams config = context.getConfigParams(); + final long absoluteThreshold = config.getSnapshotDataThreshold(); + final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE + : totalMemory * config.getSnapshotDataThresholdPercentage() / 100; - boolean logSizeExceededSnapshotBatchCount = - context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount(); + final boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold; + final boolean logSizeExceededSnapshotBatchCount = + context.getReplicatedLog().size() >= config.getSnapshotBatchCount(); final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) { + if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount || captureSnapshot.isMandatoryTrim()) { if (log.isDebugEnabled()) { if (dataSizeThresholdExceeded) { log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit " + "with index {}", context.getId(), context.getReplicatedLog().dataSize(), dataThreshold, captureSnapshot.getLastAppliedIndex()); - } else { + } else if (logSizeExceededSnapshotBatchCount) { log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with " + "index {}", context.getId(), context.getReplicatedLog().size(), - context.getConfigParams().getSnapshotBatchCount(), - captureSnapshot.getLastAppliedIndex()); + config.getSnapshotBatchCount(), captureSnapshot.getLastAppliedIndex()); + } else { + log.debug("{}: user triggered or root overwrite snapshot encountered, trimming log up to " + + "last applied index {}", context.getId(), captureSnapshot.getLastAppliedIndex()); } } @@ -413,7 +440,8 @@ public class SnapshotManager implements SnapshotState { if (installSnapshotStream.isPresent()) { if (context.getId().equals(currentBehavior.getLeaderId())) { try { - ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource(); + ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.orElseThrow()) + .asByteSource(); currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes)); } catch (IOException e) { @@ -421,12 +449,12 @@ public class SnapshotManager implements SnapshotState { context.getId(), e); } } else { - ((FileBackedOutputStream)installSnapshotStream.get()).cleanup(); + ((FileBackedOutputStream)installSnapshotStream.orElseThrow()).cleanup(); } } captureSnapshot = null; - SnapshotManager.this.currentState = PERSISTING; + currentState = PERSISTING; } @Override @@ -498,7 +526,7 @@ public class SnapshotManager implements SnapshotState { private void snapshotComplete() { lastSequenceNumber = -1; applySnapshot = null; - SnapshotManager.this.currentState = IDLE; + currentState = IDLE; context.getActor().tell(SnapshotComplete.INSTANCE, context.getActor()); } @@ -523,8 +551,8 @@ public class SnapshotManager implements SnapshotState { LastAppliedTermInformationReader init(final ReplicatedLog log, final long originalIndex, final ReplicatedLogEntry lastLogEntry, final boolean hasFollowers) { ReplicatedLogEntry entry = log.get(originalIndex); - this.index = -1L; - this.term = -1L; + index = -1L; + term = -1L; if (!hasFollowers) { if (lastLogEntry != null) { // since we have persisted the last-log-entry to persistent journal before the capture, @@ -544,12 +572,12 @@ public class SnapshotManager implements SnapshotState { @Override public long getIndex() { - return this.index; + return index; } @Override public long getTerm() { - return this.term; + return term; } } @@ -559,8 +587,8 @@ public class SnapshotManager implements SnapshotState { ReplicatedToAllTermInformationReader init(final ReplicatedLog log, final long originalIndex) { ReplicatedLogEntry entry = log.get(originalIndex); - this.index = -1L; - this.term = -1L; + index = -1L; + term = -1L; if (entry != null) { index = entry.getIndex(); @@ -572,12 +600,12 @@ public class SnapshotManager implements SnapshotState { @Override public long getIndex() { - return this.index; + return index; } @Override public long getTerm() { - return this.term; + return term; } } }