Add direct in-memory journal threshold
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / ReplicatedLogImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static java.util.Objects.requireNonNull;
11
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.function.Consumer;
15 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
16 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
17
18 /**
19  * Implementation of ReplicatedLog used by the RaftActor.
20  */
21 final class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
22     private static final int DATA_SIZE_DIVIDER = 5;
23
24     private final RaftActorContext context;
25     private long dataSizeSinceLastSnapshot = 0L;
26
27     private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm,
28             final List<ReplicatedLogEntry> unAppliedEntries,
29             final RaftActorContext context) {
30         super(snapshotIndex, snapshotTerm, unAppliedEntries, context.getId());
31         this.context = requireNonNull(context);
32     }
33
34     static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context) {
35         return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
36                 snapshot.getUnAppliedEntries(), context);
37     }
38
39     static ReplicatedLog newInstance(final RaftActorContext context) {
40         return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context);
41     }
42
43     @Override
44     public boolean removeFromAndPersist(final long logEntryIndex) {
45         long adjustedIndex = removeFrom(logEntryIndex);
46         if (adjustedIndex >= 0) {
47             context.getPersistenceProvider().persist(new DeleteEntries(logEntryIndex), NoopProcedure.instance());
48             return true;
49         }
50
51         return false;
52     }
53
54     @Override
55     public boolean shouldCaptureSnapshot(final long logIndex) {
56         final ConfigParams config = context.getConfigParams();
57         if ((logIndex + 1) % config.getSnapshotBatchCount() == 0) {
58             return true;
59         }
60
61         final long absoluteThreshold = config.getSnapshotDataThreshold();
62         final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
63                 : context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
64         return getDataSizeForSnapshotCheck() > dataThreshold;
65     }
66
67     @Override
68     public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
69         if (shouldCaptureSnapshot(replicatedLogEntry.getIndex())) {
70             boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
71                     context.getCurrentBehavior().getReplicatedToAllIndex());
72             if (started && !context.hasFollowers()) {
73                 dataSizeSinceLastSnapshot = 0;
74             }
75         }
76     }
77
78     private long getDataSizeForSnapshotCheck() {
79         if (!context.hasFollowers()) {
80             // When we do not have followers we do not maintain an in-memory log
81             // due to this the journalSize will never become anything close to the
82             // snapshot batch count. In fact will mostly be 1.
83             // Similarly since the journal's dataSize depends on the entries in the
84             // journal the journal's dataSize will never reach a value close to the
85             // memory threshold.
86             // By maintaining the dataSize outside the journal we are tracking essentially
87             // what we have written to the disk however since we no longer are in
88             // need of doing a snapshot just for the sake of freeing up memory we adjust
89             // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
90             // as if we were maintaining a real snapshot
91             return dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
92         } else {
93             return dataSize();
94         }
95     }
96
97     @Override
98     public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
99             final Consumer<ReplicatedLogEntry> callback, final boolean doAsync)  {
100
101         context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
102
103         if (!append(replicatedLogEntry)) {
104             return false;
105         }
106
107         if (doAsync) {
108             context.getPersistenceProvider().persistAsync(replicatedLogEntry,
109                 entry -> persistCallback(entry, callback));
110         } else {
111             context.getPersistenceProvider().persist(replicatedLogEntry, entry -> syncPersistCallback(entry, callback));
112         }
113
114         return true;
115     }
116
117     private void persistCallback(final ReplicatedLogEntry persistedLogEntry,
118             final Consumer<ReplicatedLogEntry> callback) {
119         context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback));
120     }
121
122     private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry,
123             final Consumer<ReplicatedLogEntry> callback) {
124         context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
125
126         dataSizeSinceLastSnapshot += persistedLogEntry.size();
127
128         if (callback != null) {
129             callback.accept(persistedLogEntry);
130         }
131     }
132 }