2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import java.util.ArrayList;
14 import java.util.List;
15 import org.eclipse.jdt.annotation.NonNull;
16 import org.eclipse.jdt.annotation.NonNullByDefault;
17 import org.eclipse.jdt.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.spi.ImmutableRaftEntryMeta;
19 import org.opendaylight.controller.cluster.raft.spi.RaftEntryMeta;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
24 * Abstract class handling the mapping of
25 * logical LogEntry Index and the physical list index.
27 public abstract class AbstractReplicatedLog implements ReplicatedLog {
28 private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLog.class);
30 final @NonNull String memberId;
32 // We define this as ArrayList so we can use ensureCapacity.
33 private ArrayList<ReplicatedLogEntry> journal = new ArrayList<>();
35 private long snapshotIndex = -1;
36 private long snapshotTerm = -1;
37 private long commitIndex = -1;
38 private long lastApplied = -1;
40 // to be used for rollback during save snapshot failure
41 private ArrayList<ReplicatedLogEntry> snapshottedJournal;
42 private long previousSnapshotIndex = -1;
43 private long previousSnapshotTerm = -1;
44 private int dataSize = 0;
46 protected AbstractReplicatedLog(final @NonNull String memberId) {
47 this.memberId = requireNonNull(memberId);
50 protected final int adjustedIndex(final long logEntryIndex) {
51 if (snapshotIndex < 0) {
52 return (int) logEntryIndex;
54 return (int) (logEntryIndex - (snapshotIndex + 1));
58 public final ReplicatedLogEntry get(final long logEntryIndex) {
59 int adjustedIndex = adjustedIndex(logEntryIndex);
61 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
62 // physical index should be less than list size and >= 0
66 return journal.get(adjustedIndex);
70 public final ReplicatedLogEntry last() {
71 if (journal.isEmpty()) {
74 // get the last entry directly from the physical index
75 return journal.get(journal.size() - 1);
79 public final long lastIndex() {
80 final var last = last();
81 // it can happen that after snapshot, all the entries of the journal are trimmed till lastApplied,
82 // so lastIndex = snapshotIndex
83 return last != null ? last.index() : snapshotIndex;
87 public final long lastTerm() {
88 final var last = last();
89 // it can happen that after snapshot, all the entries of the journal are trimmed till lastApplied,
90 // so lastTerm = snapshotTerm
91 return last != null ? last.term() : snapshotTerm;
95 public final long getCommitIndex() {
100 public final void setCommitIndex(final long commitIndex) {
101 this.commitIndex = commitIndex;
105 public final long getLastApplied() {
110 public final void setLastApplied(final long lastApplied) {
111 LOG.debug("{}: Moving last applied index from {} to {}", memberId, this.lastApplied, lastApplied,
112 LOG.isTraceEnabled() ? new Throwable() : null);
113 this.lastApplied = lastApplied;
117 public final long removeFrom(final long logEntryIndex) {
118 int adjustedIndex = adjustedIndex(logEntryIndex);
119 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
120 // physical index should be less than list size and >= 0
124 for (int i = adjustedIndex; i < journal.size(); i++) {
125 dataSize -= journal.get(i).size();
128 journal.subList(adjustedIndex , journal.size()).clear();
130 return adjustedIndex;
134 public boolean append(final ReplicatedLogEntry replicatedLogEntry) {
135 final var entryIndex = replicatedLogEntry.index();
136 final var lastIndex = lastIndex();
137 if (entryIndex > lastIndex) {
138 journal.add(replicatedLogEntry);
139 dataSize += replicatedLogEntry.size();
143 LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}", memberId,
144 entryIndex, lastIndex, new Exception("stack trace"));
149 public final void increaseJournalLogCapacity(final int amount) {
150 journal.ensureCapacity(journal.size() + amount);
154 public final List<ReplicatedLogEntry> getFrom(final long logEntryIndex) {
155 return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
159 public final List<ReplicatedLogEntry> getFrom(final long logEntryIndex, final int maxEntries,
160 final long maxDataSize) {
161 int adjustedIndex = adjustedIndex(logEntryIndex);
162 int size = journal.size();
163 if (adjustedIndex < 0 || adjustedIndex >= size) {
167 // physical index should be less than list size and >= 0
168 int maxIndex = adjustedIndex + maxEntries;
169 if (maxIndex > size) {
173 return maxDataSize == NO_MAX_SIZE ? new ArrayList<>(journal.subList(adjustedIndex, maxIndex))
174 : copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
177 private @NonNull List<ReplicatedLogEntry> copyJournalEntries(final int fromIndex, final int toIndex,
178 final long maxDataSize) {
179 final var retList = new ArrayList<ReplicatedLogEntry>(toIndex - fromIndex);
181 for (int i = fromIndex; i < toIndex; i++) {
182 final var entry = journal.get(i);
183 totalSize += entry.serializedSize();
184 if (totalSize > maxDataSize) {
185 if (retList.isEmpty()) {
186 // Edge case - the first entry's size exceeds the threshold. We need to return
187 // at least the first entry so add it here.
200 public final long size() {
201 return journal.size();
204 // Non-final for testing
206 public int dataSize() {
211 public final boolean isPresent(final long logEntryIndex) {
212 if (logEntryIndex > lastIndex()) {
213 // if the request logical index is less than the last present in the list
216 int adjustedIndex = adjustedIndex(logEntryIndex);
217 return adjustedIndex >= 0;
221 public final boolean isInSnapshot(final long logEntryIndex) {
222 return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
226 public final long getSnapshotIndex() {
227 return snapshotIndex;
231 public final long getSnapshotTerm() {
236 public final void setSnapshotIndex(final long snapshotIndex) {
237 this.snapshotIndex = snapshotIndex;
241 public final void setSnapshotTerm(final long snapshotTerm) {
242 this.snapshotTerm = snapshotTerm;
246 public final void clear(final int startIndex, final int endIndex) {
247 journal.subList(startIndex, endIndex).clear();
251 public final void snapshotPreCommit(final long snapshotCapturedIndex, final long snapshotCapturedTerm) {
252 if (snapshotCapturedIndex < snapshotIndex) {
253 throw new IllegalArgumentException("snapshotCapturedIndex must be greater than or equal to snapshotIndex");
256 snapshottedJournal = new ArrayList<>(journal.size());
258 final var snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
260 snapshottedJournal.addAll(snapshotJournalEntries);
261 snapshotJournalEntries.clear();
263 previousSnapshotIndex = snapshotIndex;
264 setSnapshotIndex(snapshotCapturedIndex);
266 previousSnapshotTerm = snapshotTerm;
267 setSnapshotTerm(snapshotCapturedTerm);
271 public final void snapshotCommit(final boolean updateDataSize) {
272 snapshottedJournal = null;
273 previousSnapshotIndex = -1;
274 previousSnapshotTerm = -1;
276 if (updateDataSize) {
277 // need to recalc the datasize based on the entries left after precommit.
279 for (ReplicatedLogEntry logEntry : journal) {
280 newDataSize += logEntry.size();
282 LOG.trace("{}: Updated dataSize from {} to {}", memberId, dataSize, newDataSize);
283 dataSize = newDataSize;
288 public final void snapshotRollback() {
289 snapshottedJournal.addAll(journal);
290 journal = snapshottedJournal;
291 snapshottedJournal = null;
293 snapshotIndex = previousSnapshotIndex;
294 previousSnapshotIndex = -1;
296 snapshotTerm = previousSnapshotTerm;
297 previousSnapshotTerm = -1;
301 final ReplicatedLogEntry getAtPhysicalIndex(final int index) {
302 return journal.get(index);
306 static final RaftEntryMeta computeLastAppliedEntry(final ReplicatedLog log, final long originalIndex,
307 final @Nullable RaftEntryMeta lastLogEntry, final boolean hasFollowers) {
308 return hasFollowers ? compulateLastAppliedEntry(log, originalIndex)
309 : compulateLastAppliedEntry(log, lastLogEntry);
313 static final RaftEntryMeta compulateLastAppliedEntry(final ReplicatedLog log, final long originalIndex) {
314 final var entry = log.lookupMeta(originalIndex);
319 final var snapshotIndex = log.getSnapshotIndex();
320 return snapshotIndex > -1 ? ImmutableRaftEntryMeta.of(snapshotIndex, log.getSnapshotTerm())
321 : ImmutableRaftEntryMeta.of(-1, -1);
325 static final RaftEntryMeta compulateLastAppliedEntry(final ReplicatedLog log,
326 final @Nullable RaftEntryMeta lastLogEntry) {
327 if (lastLogEntry != null) {
328 // since we have persisted the last-log-entry to persistent journal before the capture, we would want
329 // to snapshot from this entry.
332 return ImmutableRaftEntryMeta.of(-1, -1);