X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FSnapshotManager.java;h=57e6140fc9efc06bd0b343adce8ee852b896cbd4;hb=HEAD;hp=8037fb8d73ce88e6309f6a0f56b6d4fb787e8761;hpb=ff29db5dc6012f77bbe53f57ddce929b0de093b3;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 8037fb8d73..d306e1b5f0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.Optional; import java.util.function.Consumer; import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.io.FileBackedOutputStream; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -68,7 +69,7 @@ public class SnapshotManager implements SnapshotState { */ public SnapshotManager(final RaftActorContext context, final Logger logger) { this.context = context; - this.log = logger; + log = logger; } public boolean isApplying() { @@ -81,18 +82,18 @@ public class SnapshotManager implements SnapshotState { } @Override - public boolean captureToInstall(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, + public boolean captureToInstall(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex, final String targetFollower) { return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower); } @Override - public boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + public boolean capture(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex) { return currentState.capture(lastLogEntry, replicatedToAllIndex); } @Override - public boolean captureWithForcedTrim(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + public boolean captureWithForcedTrim(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex) { return currentState.captureWithForcedTrim(lastLogEntry, replicatedToAllIndex); } @@ -159,7 +160,7 @@ public class SnapshotManager implements SnapshotState { * @param replicatedToAllIndex the index of the last entry replicated to all followers. * @return a new CaptureSnapshot instance. */ - public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, + public CaptureSnapshot newCaptureSnapshot(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex, final boolean mandatoryTrim) { TermInformationReader lastAppliedTermInfoReader = lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), @@ -187,15 +188,15 @@ public class SnapshotManager implements SnapshotState { log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using snapshot values lastAppliedIndex {} and " + "lastAppliedTerm {} instead.", persistenceId(), lastAppliedIndex, lastAppliedTerm); } else { - lastLogEntryIndex = lastLogEntry.getIndex(); - lastLogEntryTerm = lastLogEntry.getTerm(); + lastLogEntryIndex = lastLogEntry.index(); + lastLogEntryTerm = lastLogEntry.term(); } return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm, newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, mandatoryTrim); } - private class AbstractSnapshotState implements SnapshotState { + private abstract class AbstractSnapshotState implements SnapshotState { @Override public boolean isCapturing() { @@ -203,20 +204,20 @@ public class SnapshotManager implements SnapshotState { } @Override - public boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + public boolean capture(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex) { log.debug("capture should not be called in state {}", this); return false; } @Override - public boolean captureToInstall(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, + public boolean captureToInstall(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex, final String targetFollower) { log.debug("captureToInstall should not be called in state {}", this); return false; } @Override - public boolean captureWithForcedTrim(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + public boolean captureWithForcedTrim(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex) { log.debug("captureWithForcedTrim should not be called in state {}", this); return false; } @@ -264,7 +265,7 @@ public class SnapshotManager implements SnapshotState { //use the term of the temp-min, since we check for isPresent, entry will not be null ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); - context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); + context.getReplicatedLog().snapshotPreCommit(tempMin, entry.term()); context.getReplicatedLog().snapshotCommit(false); return tempMin; } @@ -282,15 +283,14 @@ public class SnapshotManager implements SnapshotState { } } - private class Idle extends AbstractSnapshotState { - + private final class Idle extends AbstractSnapshotState { @Override public boolean isCapturing() { return false; } @SuppressWarnings("checkstyle:IllegalCatch") - private boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, + private boolean capture(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex, final String targetFollower, final boolean mandatoryTrim) { captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, mandatoryTrim); @@ -307,12 +307,12 @@ public class SnapshotManager implements SnapshotState { log.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber); - SnapshotManager.this.currentState = CREATING; + currentState = CREATING; try { createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream)); } catch (Exception e) { - SnapshotManager.this.currentState = IDLE; + currentState = IDLE; log.error("Error creating snapshot", e); return false; } @@ -321,24 +321,24 @@ public class SnapshotManager implements SnapshotState { } @Override - public boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + public boolean capture(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex) { return capture(lastLogEntry, replicatedToAllIndex, null, false); } @Override - public boolean captureToInstall(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, + public boolean captureToInstall(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex, final String targetFollower) { return capture(lastLogEntry, replicatedToAllIndex, targetFollower, false); } @Override - public boolean captureWithForcedTrim(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + public boolean captureWithForcedTrim(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex) { return capture(lastLogEntry, replicatedToAllIndex, null, true); } @Override public void apply(final ApplySnapshot toApply) { - SnapshotManager.this.applySnapshot = toApply; + applySnapshot = toApply; lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber(); @@ -346,7 +346,7 @@ public class SnapshotManager implements SnapshotState { context.getPersistenceProvider().saveSnapshot(toApply.getSnapshot()); - SnapshotManager.this.currentState = PERSISTING; + currentState = PERSISTING; } @Override @@ -360,8 +360,7 @@ public class SnapshotManager implements SnapshotState { } } - private class Creating extends AbstractSnapshotState { - + private final class Creating extends AbstractSnapshotState { @Override public void persist(final Snapshot.State snapshotState, final Optional installSnapshotStream, final long totalMemory) { @@ -440,7 +439,8 @@ public class SnapshotManager implements SnapshotState { if (installSnapshotStream.isPresent()) { if (context.getId().equals(currentBehavior.getLeaderId())) { try { - ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource(); + ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.orElseThrow()) + .asByteSource(); currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes)); } catch (IOException e) { @@ -448,12 +448,12 @@ public class SnapshotManager implements SnapshotState { context.getId(), e); } } else { - ((FileBackedOutputStream)installSnapshotStream.get()).cleanup(); + ((FileBackedOutputStream)installSnapshotStream.orElseThrow()).cleanup(); } } captureSnapshot = null; - SnapshotManager.this.currentState = PERSISTING; + currentState = PERSISTING; } @Override @@ -463,8 +463,7 @@ public class SnapshotManager implements SnapshotState { } - private class Persisting extends AbstractSnapshotState { - + private final class Persisting extends AbstractSnapshotState { @Override @SuppressWarnings("checkstyle:IllegalCatch") public void commit(final long sequenceNumber, final long timeStamp) { @@ -525,7 +524,7 @@ public class SnapshotManager implements SnapshotState { private void snapshotComplete() { lastSequenceNumber = -1; applySnapshot = null; - SnapshotManager.this.currentState = IDLE; + currentState = IDLE; context.getActor().tell(SnapshotComplete.INSTANCE, context.getActor()); } @@ -543,25 +542,25 @@ public class SnapshotManager implements SnapshotState { long getTerm(); } - static class LastAppliedTermInformationReader implements TermInformationReader { + static final class LastAppliedTermInformationReader implements TermInformationReader { private long index; private long term; LastAppliedTermInformationReader init(final ReplicatedLog log, final long originalIndex, - final ReplicatedLogEntry lastLogEntry, final boolean hasFollowers) { - ReplicatedLogEntry entry = log.get(originalIndex); - this.index = -1L; - this.term = -1L; + final @Nullable RaftEntryMeta lastLogEntry, final boolean hasFollowers) { + RaftEntryMeta entry = log.lookupMeta(originalIndex); + index = -1L; + term = -1L; if (!hasFollowers) { if (lastLogEntry != null) { // since we have persisted the last-log-entry to persistent journal before the capture, // we would want to snapshot from this entry. - index = lastLogEntry.getIndex(); - term = lastLogEntry.getTerm(); + index = lastLogEntry.index(); + term = lastLogEntry.term(); } } else if (entry != null) { - index = entry.getIndex(); - term = entry.getTerm(); + index = entry.index(); + term = entry.term(); } else if (log.getSnapshotIndex() > -1) { index = log.getSnapshotIndex(); term = log.getSnapshotTerm(); @@ -571,40 +570,39 @@ public class SnapshotManager implements SnapshotState { @Override public long getIndex() { - return this.index; + return index; } @Override public long getTerm() { - return this.term; + return term; } } - private static class ReplicatedToAllTermInformationReader implements TermInformationReader { + private static final class ReplicatedToAllTermInformationReader implements TermInformationReader { private long index; private long term; ReplicatedToAllTermInformationReader init(final ReplicatedLog log, final long originalIndex) { - ReplicatedLogEntry entry = log.get(originalIndex); - this.index = -1L; - this.term = -1L; - + var entry = log.get(originalIndex); if (entry != null) { - index = entry.getIndex(); - term = entry.getTerm(); + index = entry.index(); + term = entry.term(); + } else { + index = -1L; + term = -1L; } - return this; } @Override public long getIndex() { - return this.index; + return index; } @Override public long getTerm() { - return this.term; + return term; } } }