import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
/**
* Implementation of ReplicatedLog used by the RaftActor.
class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
private static final int DATA_SIZE_DIVIDER = 5;
- private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
- @Override
- public void apply(final DeleteEntries notUsed) {
- }
- };
-
private final RaftActorContext context;
private long dataSizeSinceLastSnapshot = 0L;
- private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List<ReplicatedLogEntry> unAppliedEntries,
+ private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm,
+ final List<ReplicatedLogEntry> unAppliedEntries,
final RaftActorContext context) {
super(snapshotIndex, snapshotTerm, unAppliedEntries, context.getId());
this.context = Preconditions.checkNotNull(context);
}
@Override
- public void removeFromAndPersist(final long logEntryIndex) {
+ public boolean removeFromAndPersist(final long logEntryIndex) {
// FIXME: Maybe this should be done after the command is saved
long adjustedIndex = removeFrom(logEntryIndex);
- if(adjustedIndex >= 0) {
- context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+ if (adjustedIndex >= 0) {
+ context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), NoopProcedure.instance());
+ return true;
}
- }
- @Override
- public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
- appendAndPersist(replicatedLogEntry, null);
+ return false;
}
@Override
- public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
+ public boolean shouldCaptureSnapshot(long logIndex) {
final ConfigParams config = context.getConfigParams();
- final long journalSize = replicatedLogEntry.getIndex() + 1;
+ final long journalSize = logIndex + 1;
final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
- if (journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold) {
+ return journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold;
+ }
+
+ @Override
+ public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
+ if (shouldCaptureSnapshot(replicatedLogEntry.getIndex())) {
boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
context.getCurrentBehavior().getReplicatedToAllIndex());
if (started && !context.hasFollowers()) {
}
@Override
- public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
- final Procedure<ReplicatedLogEntry> callback) {
+ public boolean appendAndPersist(@Nonnull final ReplicatedLogEntry replicatedLogEntry,
+ @Nullable final Procedure<ReplicatedLogEntry> callback, boolean doAsync) {
context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
- // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
- if(!append(replicatedLogEntry)) {
- return;
+ if (!append(replicatedLogEntry)) {
+ return false;
}
- // When persisting events with persist it is guaranteed that the
- // persistent actor will not receive further commands between the
- // persist call and the execution(s) of the associated event
- // handler. This also holds for multiple persist calls in context
- // of a single command.
- context.getPersistenceProvider().persist(replicatedLogEntry,
- new Procedure<ReplicatedLogEntry>() {
- @Override
- public void apply(final ReplicatedLogEntry param) throws Exception {
- context.getLogger().debug("{}: persist complete {}", context.getId(), param);
-
- int logEntrySize = param.size();
- dataSizeSinceLastSnapshot += logEntrySize;
-
- if (callback != null) {
- callback.apply(param);
- }
- }
+ Procedure<ReplicatedLogEntry> persistCallback = persistedLogEntry -> {
+ context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
+
+ dataSizeSinceLastSnapshot += persistedLogEntry.size();
+
+ if (callback != null) {
+ callback.apply(persistedLogEntry);
}
- );
+ };
+
+ if (doAsync) {
+ context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback);
+ } else {
+ context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback);
+ }
+
+ return true;
}
-}
\ No newline at end of file
+}