X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FReplicatedLogImpl.java;h=c22f6f431bd4cf833ab6eb7c275563e9d357d386;hp=6b4427d5ce39078c772e0ea5f33953cc421712c8;hb=HEAD;hpb=eed2343da0b129030d374169f5364557a4d6144a diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index 6b4427d5ce..6167aac6d2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -7,82 +7,75 @@ */ package org.opendaylight.controller.cluster.raft; -import akka.japi.Procedure; -import com.google.common.base.Preconditions; +import static java.util.Objects.requireNonNull; + import java.util.Collections; import java.util.List; -import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; -import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import java.util.function.Consumer; +import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; /** * Implementation of ReplicatedLog used by the RaftActor. */ -class ReplicatedLogImpl extends AbstractReplicatedLogImpl { +final class ReplicatedLogImpl extends AbstractReplicatedLogImpl { private static final int DATA_SIZE_DIVIDER = 5; - private long dataSizeSinceLastSnapshot = 0L; private final RaftActorContext context; - private final RaftActorBehavior currentBehavior; - - private final Procedure deleteProcedure = new Procedure() { - @Override - public void apply(final DeleteEntries notUsed) { - } - }; + private long dataSizeSinceLastSnapshot = 0L; - static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context, - final RaftActorBehavior currentBehavior) { - return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), - snapshot.getUnAppliedEntries(), context, currentBehavior); + private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, + final List unAppliedEntries, + final RaftActorContext context) { + super(snapshotIndex, snapshotTerm, unAppliedEntries, context.getId()); + this.context = requireNonNull(context); } - static ReplicatedLog newInstance(final RaftActorContext context, final RaftActorBehavior currentBehavior) { - return new ReplicatedLogImpl(-1L, -1L, Collections.emptyList(), context, - currentBehavior); + static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context) { + return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), + snapshot.getUnAppliedEntries(), context); } - private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List unAppliedEntries, - final RaftActorContext context, final RaftActorBehavior currentBehavior) { - super(snapshotIndex, snapshotTerm, unAppliedEntries); - this.context = Preconditions.checkNotNull(context); - this.currentBehavior = Preconditions.checkNotNull(currentBehavior); + static ReplicatedLog newInstance(final RaftActorContext context) { + return new ReplicatedLogImpl(-1L, -1L, Collections.emptyList(), context); } @Override - public void removeFromAndPersist(final long logEntryIndex) { - // FIXME: Maybe this should be done after the command is saved + public boolean removeFromAndPersist(final long logEntryIndex) { long adjustedIndex = removeFrom(logEntryIndex); - if(adjustedIndex >= 0) { - context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure); + if (adjustedIndex >= 0) { + context.getPersistenceProvider().persist(new DeleteEntries(logEntryIndex), NoopProcedure.instance()); + return true; } + + return false; } @Override - public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) { - appendAndPersist(replicatedLogEntry, null); + public boolean shouldCaptureSnapshot(final long logIndex) { + final ConfigParams config = context.getConfigParams(); + if ((logIndex + 1) % config.getSnapshotBatchCount() == 0) { + return true; + } + + final long absoluteThreshold = config.getSnapshotDataThreshold(); + final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE + : context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100; + return getDataSizeForSnapshotCheck() > dataThreshold; } @Override public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) { - long journalSize = replicatedLogEntry.getIndex() + 1; - long dataThreshold = context.getTotalMemory() * - context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; - - if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 - || getDataSizeForSnapshotCheck() > dataThreshold)) { - + if (shouldCaptureSnapshot(replicatedLogEntry.getIndex())) { boolean started = context.getSnapshotManager().capture(replicatedLogEntry, - currentBehavior.getReplicatedToAllIndex()); - if (started) { - if (!context.hasFollowers()) { - dataSizeSinceLastSnapshot = 0; - } + context.getCurrentBehavior().getReplicatedToAllIndex()); + if (started && !context.hasFollowers()) { + dataSizeSinceLastSnapshot = 0; } } } private long getDataSizeForSnapshotCheck() { - long dataSizeForCheck = dataSize(); if (!context.hasFollowers()) { // When we do not have followers we do not maintain an in-memory log // due to this the journalSize will never become anything close to the @@ -95,41 +88,45 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { // need of doing a snapshot just for the sake of freeing up memory we adjust // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often // as if we were maintaining a real snapshot - dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER; + return dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER; + } else { + return dataSize(); } - return dataSizeForCheck; } @Override - public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry, - final Procedure callback) { + public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry, + final Consumer callback, final boolean doAsync) { - if (context.getLogger().isDebugEnabled()) { - context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry); + context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry); + + if (!append(replicatedLogEntry)) { + return false; } - // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs - append(replicatedLogEntry); - - // When persisting events with persist it is guaranteed that the - // persistent actor will not receive further commands between the - // persist call and the execution(s) of the associated event - // handler. This also holds for multiple persist calls in context - // of a single command. - context.getPersistenceProvider().persist(replicatedLogEntry, - new Procedure() { - @Override - public void apply(final ReplicatedLogEntry param) throws Exception { - context.getLogger().debug("{}: persist complete {}", context.getId(), param); - - int logEntrySize = param.size(); - dataSizeSinceLastSnapshot += logEntrySize; - - if (callback != null) { - callback.apply(param); - } - } - } - ); + if (doAsync) { + context.getPersistenceProvider().persistAsync(replicatedLogEntry, + entry -> persistCallback(entry, callback)); + } else { + context.getPersistenceProvider().persist(replicatedLogEntry, entry -> syncPersistCallback(entry, callback)); + } + + return true; + } + + private void persistCallback(final ReplicatedLogEntry persistedLogEntry, + final Consumer callback) { + context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback)); + } + + private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry, + final Consumer callback) { + context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry); + + dataSizeSinceLastSnapshot += persistedLogEntry.size(); + + if (callback != null) { + callback.accept(persistedLogEntry); + } } -} \ No newline at end of file +}