2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
15 import org.eclipse.jdt.annotation.NonNull;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
20 * Abstract class handling the mapping of
21 * logical LogEntry Index and the physical list index.
23 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
24 private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
26 private final String logContext;
28 // We define this as ArrayList so we can use ensureCapacity.
29 private ArrayList<ReplicatedLogEntry> journal;
31 private long snapshotIndex = -1;
32 private long snapshotTerm = -1;
34 // to be used for rollback during save snapshot failure
35 private ArrayList<ReplicatedLogEntry> snapshottedJournal;
36 private long previousSnapshotIndex = -1;
37 private long previousSnapshotTerm = -1;
38 private int dataSize = 0;
40 protected AbstractReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm,
41 final List<ReplicatedLogEntry> unAppliedEntries, final String logContext) {
42 this.snapshotIndex = snapshotIndex;
43 this.snapshotTerm = snapshotTerm;
44 this.logContext = logContext;
46 journal = new ArrayList<>(unAppliedEntries.size());
47 for (ReplicatedLogEntry entry: unAppliedEntries) {
52 protected AbstractReplicatedLogImpl() {
53 this(-1L, -1L, Collections.emptyList(), "");
56 protected int adjustedIndex(final long logEntryIndex) {
57 if (snapshotIndex < 0) {
58 return (int) logEntryIndex;
60 return (int) (logEntryIndex - (snapshotIndex + 1));
64 public ReplicatedLogEntry get(final long logEntryIndex) {
65 int adjustedIndex = adjustedIndex(logEntryIndex);
67 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
68 // physical index should be less than list size and >= 0
72 return journal.get(adjustedIndex);
76 public ReplicatedLogEntry last() {
77 if (journal.isEmpty()) {
80 // get the last entry directly from the physical index
81 return journal.get(journal.size() - 1);
85 public RaftEntryMeta lastMeta() {
90 public long lastIndex() {
91 final var last = last();
92 // it can happen that after snapshot, all the entries of the journal are trimmed till lastApplied,
93 // so lastIndex = snapshotIndex
94 return last != null ? last.index() : snapshotIndex;
98 public long lastTerm() {
99 final var last = last();
100 // it can happen that after snapshot, all the entries of the journal are trimmed till lastApplied,
101 // so lastTerm = snapshotTerm
102 return last != null ? last.term() : snapshotTerm;
106 public long removeFrom(final long logEntryIndex) {
107 int adjustedIndex = adjustedIndex(logEntryIndex);
108 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
109 // physical index should be less than list size and >= 0
113 for (int i = adjustedIndex; i < journal.size(); i++) {
114 dataSize -= journal.get(i).size();
117 journal.subList(adjustedIndex , journal.size()).clear();
119 return adjustedIndex;
123 public boolean append(final ReplicatedLogEntry replicatedLogEntry) {
124 final var entryIndex = replicatedLogEntry.index();
125 final var lastIndex = lastIndex();
126 if (entryIndex > lastIndex) {
127 journal.add(replicatedLogEntry);
128 dataSize += replicatedLogEntry.size();
132 LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}", logContext,
133 entryIndex, lastIndex, new Exception("stack trace"));
138 public void increaseJournalLogCapacity(final int amount) {
139 journal.ensureCapacity(journal.size() + amount);
143 public List<ReplicatedLogEntry> getFrom(final long logEntryIndex) {
144 return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
148 public List<ReplicatedLogEntry> getFrom(final long logEntryIndex, final int maxEntries, final long maxDataSize) {
149 int adjustedIndex = adjustedIndex(logEntryIndex);
150 int size = journal.size();
151 if (adjustedIndex >= 0 && adjustedIndex < size) {
152 // physical index should be less than list size and >= 0
153 int maxIndex = adjustedIndex + maxEntries;
154 if (maxIndex > size) {
158 if (maxDataSize == NO_MAX_SIZE) {
159 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
161 return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
164 return Collections.emptyList();
168 private @NonNull List<ReplicatedLogEntry> copyJournalEntries(final int fromIndex, final int toIndex,
169 final long maxDataSize) {
170 final var retList = new ArrayList<ReplicatedLogEntry>(toIndex - fromIndex);
172 for (int i = fromIndex; i < toIndex; i++) {
173 final var entry = journal.get(i);
174 totalSize += entry.serializedSize();
175 if (totalSize <= maxDataSize) {
178 if (retList.isEmpty()) {
179 // Edge case - the first entry's size exceeds the threshold. We need to return
180 // at least the first entry so add it here.
193 return journal.size();
197 public int dataSize() {
202 public boolean isPresent(final long logEntryIndex) {
203 if (logEntryIndex > lastIndex()) {
204 // if the request logical index is less than the last present in the list
207 int adjustedIndex = adjustedIndex(logEntryIndex);
208 return adjustedIndex >= 0;
212 public boolean isInSnapshot(final long logEntryIndex) {
213 return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
217 public long getSnapshotIndex() {
218 return snapshotIndex;
222 public long getSnapshotTerm() {
227 public void setSnapshotIndex(final long snapshotIndex) {
228 this.snapshotIndex = snapshotIndex;
232 public void setSnapshotTerm(final long snapshotTerm) {
233 this.snapshotTerm = snapshotTerm;
237 public void clear(final int startIndex, final int endIndex) {
238 journal.subList(startIndex, endIndex).clear();
242 public void snapshotPreCommit(final long snapshotCapturedIndex, final long snapshotCapturedTerm) {
243 Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
244 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
246 snapshottedJournal = new ArrayList<>(journal.size());
248 final var snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
250 snapshottedJournal.addAll(snapshotJournalEntries);
251 snapshotJournalEntries.clear();
253 previousSnapshotIndex = snapshotIndex;
254 setSnapshotIndex(snapshotCapturedIndex);
256 previousSnapshotTerm = snapshotTerm;
257 setSnapshotTerm(snapshotCapturedTerm);
261 public void snapshotCommit(final boolean updateDataSize) {
262 snapshottedJournal = null;
263 previousSnapshotIndex = -1;
264 previousSnapshotTerm = -1;
266 if (updateDataSize) {
267 // need to recalc the datasize based on the entries left after precommit.
269 for (ReplicatedLogEntry logEntry : journal) {
270 newDataSize += logEntry.size();
272 LOG.trace("{}: Updated dataSize from {} to {}", logContext, dataSize, newDataSize);
273 dataSize = newDataSize;
278 public void snapshotRollback() {
279 snapshottedJournal.addAll(journal);
280 journal = snapshottedJournal;
281 snapshottedJournal = null;
283 snapshotIndex = previousSnapshotIndex;
284 previousSnapshotIndex = -1;
286 snapshotTerm = previousSnapshotTerm;
287 previousSnapshotTerm = -1;
291 ReplicatedLogEntry getAtPhysicalIndex(final int index) {
292 return journal.get(index);