X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FSnapshotManager.java;h=ef39416cf5a5bc7695ba5b10695c488e0b7fb35f;hb=ebb756e983e44035641b0e890ca6baa96e34c6dd;hp=3e2410efdea80b659bfd200b2d9a160a4cffa9fb;hpb=f19f33c725db92a59ead42384fd737030c684377;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 3e2410efde..ef39416cf5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -68,7 +68,7 @@ public class SnapshotManager implements SnapshotState { */ public SnapshotManager(final RaftActorContext context, final Logger logger) { this.context = context; - this.log = logger; + log = logger; } public boolean isApplying() { @@ -265,7 +265,7 @@ public class SnapshotManager implements SnapshotState { //use the term of the temp-min, since we check for isPresent, entry will not be null ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); - context.getReplicatedLog().snapshotCommit(); + context.getReplicatedLog().snapshotCommit(false); return tempMin; } @@ -307,12 +307,12 @@ public class SnapshotManager implements SnapshotState { log.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber); - SnapshotManager.this.currentState = CREATING; + currentState = CREATING; try { createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream)); } catch (Exception e) { - SnapshotManager.this.currentState = IDLE; + currentState = IDLE; log.error("Error creating snapshot", e); return false; } @@ -338,7 +338,7 @@ public class SnapshotManager implements SnapshotState { @Override public void apply(final ApplySnapshot toApply) { - SnapshotManager.this.applySnapshot = toApply; + applySnapshot = toApply; lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber(); @@ -346,7 +346,7 @@ public class SnapshotManager implements SnapshotState { context.getPersistenceProvider().saveSnapshot(toApply.getSnapshot()); - SnapshotManager.this.currentState = PERSISTING; + currentState = PERSISTING; } @Override @@ -379,11 +379,14 @@ public class SnapshotManager implements SnapshotState { log.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot); - long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; - boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold; + final ConfigParams config = context.getConfigParams(); + final long absoluteThreshold = config.getSnapshotDataThreshold(); + final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE + : totalMemory * config.getSnapshotDataThresholdPercentage() / 100; - boolean logSizeExceededSnapshotBatchCount = - context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount(); + final boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold; + final boolean logSizeExceededSnapshotBatchCount = + context.getReplicatedLog().size() >= config.getSnapshotBatchCount(); final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount || captureSnapshot.isMandatoryTrim()) { @@ -395,8 +398,7 @@ public class SnapshotManager implements SnapshotState { } else if (logSizeExceededSnapshotBatchCount) { log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with " + "index {}", context.getId(), context.getReplicatedLog().size(), - context.getConfigParams().getSnapshotBatchCount(), - captureSnapshot.getLastAppliedIndex()); + config.getSnapshotBatchCount(), captureSnapshot.getLastAppliedIndex()); } else { log.debug("{}: user triggered or root overwrite snapshot encountered, trimming log up to " + "last applied index {}", context.getId(), captureSnapshot.getLastAppliedIndex()); @@ -438,7 +440,8 @@ public class SnapshotManager implements SnapshotState { if (installSnapshotStream.isPresent()) { if (context.getId().equals(currentBehavior.getLeaderId())) { try { - ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource(); + ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.orElseThrow()) + .asByteSource(); currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes)); } catch (IOException e) { @@ -446,12 +449,12 @@ public class SnapshotManager implements SnapshotState { context.getId(), e); } } else { - ((FileBackedOutputStream)installSnapshotStream.get()).cleanup(); + ((FileBackedOutputStream)installSnapshotStream.orElseThrow()).cleanup(); } } captureSnapshot = null; - SnapshotManager.this.currentState = PERSISTING; + currentState = PERSISTING; } @Override @@ -523,7 +526,7 @@ public class SnapshotManager implements SnapshotState { private void snapshotComplete() { lastSequenceNumber = -1; applySnapshot = null; - SnapshotManager.this.currentState = IDLE; + currentState = IDLE; context.getActor().tell(SnapshotComplete.INSTANCE, context.getActor()); } @@ -548,8 +551,8 @@ public class SnapshotManager implements SnapshotState { LastAppliedTermInformationReader init(final ReplicatedLog log, final long originalIndex, final ReplicatedLogEntry lastLogEntry, final boolean hasFollowers) { ReplicatedLogEntry entry = log.get(originalIndex); - this.index = -1L; - this.term = -1L; + index = -1L; + term = -1L; if (!hasFollowers) { if (lastLogEntry != null) { // since we have persisted the last-log-entry to persistent journal before the capture, @@ -569,12 +572,12 @@ public class SnapshotManager implements SnapshotState { @Override public long getIndex() { - return this.index; + return index; } @Override public long getTerm() { - return this.term; + return term; } } @@ -584,8 +587,8 @@ public class SnapshotManager implements SnapshotState { ReplicatedToAllTermInformationReader init(final ReplicatedLog log, final long originalIndex) { ReplicatedLogEntry entry = log.get(originalIndex); - this.index = -1L; - this.term = -1L; + index = -1L; + term = -1L; if (entry != null) { index = entry.getIndex(); @@ -597,12 +600,12 @@ public class SnapshotManager implements SnapshotState { @Override public long getIndex() { - return this.index; + return index; } @Override public long getTerm() { - return this.term; + return term; } } }