X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FReplicatedLogImpl.java;h=c22f6f431bd4cf833ab6eb7c275563e9d357d386;hb=refs%2Fchanges%2F40%2F81640%2F6;hp=fe873340aa3b414d4f89f60354dabcede970b52c;hpb=913ae866cd0cc82991e1f66ac80f6a42b0daaa48;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index fe873340aa..c22f6f431b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -7,18 +7,18 @@ */ package org.opendaylight.controller.cluster.raft; -import akka.japi.Procedure; -import com.google.common.base.Preconditions; +import static java.util.Objects.requireNonNull; + import java.util.Collections; import java.util.List; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import java.util.function.Consumer; import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; /** * Implementation of ReplicatedLog used by the RaftActor. */ -class ReplicatedLogImpl extends AbstractReplicatedLogImpl { +final class ReplicatedLogImpl extends AbstractReplicatedLogImpl { private static final int DATA_SIZE_DIVIDER = 5; private final RaftActorContext context; @@ -28,7 +28,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { final List unAppliedEntries, final RaftActorContext context) { super(snapshotIndex, snapshotTerm, unAppliedEntries, context.getId()); - this.context = Preconditions.checkNotNull(context); + this.context = requireNonNull(context); } static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context) { @@ -53,7 +53,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { } @Override - public boolean shouldCaptureSnapshot(long logIndex) { + public boolean shouldCaptureSnapshot(final long logIndex) { final ConfigParams config = context.getConfigParams(); final long journalSize = logIndex + 1; final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100; @@ -92,8 +92,8 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { } @Override - public boolean appendAndPersist(@Nonnull final ReplicatedLogEntry replicatedLogEntry, - @Nullable final Procedure callback, boolean doAsync) { + public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry, + final Consumer callback, final boolean doAsync) { context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry); @@ -101,22 +101,29 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { return false; } - Procedure persistCallback = persistedLogEntry -> { - context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry); - - dataSizeSinceLastSnapshot += persistedLogEntry.size(); - - if (callback != null) { - callback.apply(persistedLogEntry); - } - }; - if (doAsync) { - context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback); + context.getPersistenceProvider().persistAsync(replicatedLogEntry, + entry -> persistCallback(entry, callback)); } else { - context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback); + context.getPersistenceProvider().persist(replicatedLogEntry, entry -> syncPersistCallback(entry, callback)); } return true; } + + private void persistCallback(final ReplicatedLogEntry persistedLogEntry, + final Consumer callback) { + context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback)); + } + + private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry, + final Consumer callback) { + context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry); + + dataSizeSinceLastSnapshot += persistedLogEntry.size(); + + if (callback != null) { + callback.accept(persistedLogEntry); + } + } }