2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
15 import org.eclipse.jdt.annotation.NonNull;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
20 * Abstract class handling the mapping of
21 * logical LogEntry Index and the physical list index.
23 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
24 private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
26 private final String logContext;
28 // We define this as ArrayList so we can use ensureCapacity.
29 private ArrayList<ReplicatedLogEntry> journal;
31 private long snapshotIndex = -1;
32 private long snapshotTerm = -1;
34 // to be used for rollback during save snapshot failure
35 private ArrayList<ReplicatedLogEntry> snapshottedJournal;
36 private long previousSnapshotIndex = -1;
37 private long previousSnapshotTerm = -1;
38 private int dataSize = 0;
40 protected AbstractReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm,
41 final List<ReplicatedLogEntry> unAppliedEntries, final String logContext) {
42 this.snapshotIndex = snapshotIndex;
43 this.snapshotTerm = snapshotTerm;
44 this.logContext = logContext;
46 this.journal = new ArrayList<>(unAppliedEntries.size());
47 for (ReplicatedLogEntry entry: unAppliedEntries) {
52 protected AbstractReplicatedLogImpl() {
53 this(-1L, -1L, Collections.emptyList(), "");
56 protected int adjustedIndex(final long logEntryIndex) {
57 if (snapshotIndex < 0) {
58 return (int) logEntryIndex;
60 return (int) (logEntryIndex - (snapshotIndex + 1));
64 public ReplicatedLogEntry get(final long logEntryIndex) {
65 int adjustedIndex = adjustedIndex(logEntryIndex);
67 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
68 // physical index should be less than list size and >= 0
72 return journal.get(adjustedIndex);
76 public ReplicatedLogEntry last() {
77 if (journal.isEmpty()) {
80 // get the last entry directly from the physical index
81 return journal.get(journal.size() - 1);
85 public long lastIndex() {
86 if (journal.isEmpty()) {
87 // it can happen that after snapshot, all the entries of the
88 // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
91 return last().getIndex();
95 public long lastTerm() {
96 if (journal.isEmpty()) {
97 // it can happen that after snapshot, all the entries of the
98 // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
101 return last().getTerm();
105 public long removeFrom(final long logEntryIndex) {
106 int adjustedIndex = adjustedIndex(logEntryIndex);
107 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
108 // physical index should be less than list size and >= 0
112 for (int i = adjustedIndex; i < journal.size(); i++) {
113 dataSize -= journal.get(i).size();
116 journal.subList(adjustedIndex , journal.size()).clear();
118 return adjustedIndex;
122 public boolean append(final ReplicatedLogEntry replicatedLogEntry) {
123 if (replicatedLogEntry.getIndex() > lastIndex()) {
124 journal.add(replicatedLogEntry);
125 dataSize += replicatedLogEntry.size();
128 LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}",
129 logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace"));
135 public void increaseJournalLogCapacity(final int amount) {
136 journal.ensureCapacity(journal.size() + amount);
140 public List<ReplicatedLogEntry> getFrom(final long logEntryIndex) {
141 return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
145 public List<ReplicatedLogEntry> getFrom(final long logEntryIndex, final int maxEntries, final long maxDataSize) {
146 int adjustedIndex = adjustedIndex(logEntryIndex);
147 int size = journal.size();
148 if (adjustedIndex >= 0 && adjustedIndex < size) {
149 // physical index should be less than list size and >= 0
150 int maxIndex = adjustedIndex + maxEntries;
151 if (maxIndex > size) {
155 if (maxDataSize == NO_MAX_SIZE) {
156 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
158 return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
161 return Collections.emptyList();
165 private @NonNull List<ReplicatedLogEntry> copyJournalEntries(final int fromIndex, final int toIndex,
166 final long maxDataSize) {
167 List<ReplicatedLogEntry> retList = new ArrayList<>(toIndex - fromIndex);
169 for (int i = fromIndex; i < toIndex; i++) {
170 ReplicatedLogEntry entry = journal.get(i);
171 totalSize += entry.size();
172 if (totalSize <= maxDataSize) {
175 if (retList.isEmpty()) {
176 // Edge case - the first entry's size exceeds the threshold. We need to return
177 // at least the first entry so add it here.
190 return journal.size();
194 public int dataSize() {
199 public boolean isPresent(final long logEntryIndex) {
200 if (logEntryIndex > lastIndex()) {
201 // if the request logical index is less than the last present in the list
204 int adjustedIndex = adjustedIndex(logEntryIndex);
205 return adjustedIndex >= 0;
209 public boolean isInSnapshot(final long logEntryIndex) {
210 return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
214 public long getSnapshotIndex() {
215 return snapshotIndex;
219 public long getSnapshotTerm() {
224 public void setSnapshotIndex(final long snapshotIndex) {
225 this.snapshotIndex = snapshotIndex;
229 public void setSnapshotTerm(final long snapshotTerm) {
230 this.snapshotTerm = snapshotTerm;
234 public void clear(final int startIndex, final int endIndex) {
235 journal.subList(startIndex, endIndex).clear();
239 public void snapshotPreCommit(final long snapshotCapturedIndex, final long snapshotCapturedTerm) {
240 Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
241 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
243 snapshottedJournal = new ArrayList<>(journal.size());
245 List<ReplicatedLogEntry> snapshotJournalEntries =
246 journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
248 snapshottedJournal.addAll(snapshotJournalEntries);
249 snapshotJournalEntries.clear();
251 previousSnapshotIndex = snapshotIndex;
252 setSnapshotIndex(snapshotCapturedIndex);
254 previousSnapshotTerm = snapshotTerm;
255 setSnapshotTerm(snapshotCapturedTerm);
259 public void snapshotCommit(final boolean updateDataSize) {
260 snapshottedJournal = null;
261 previousSnapshotIndex = -1;
262 previousSnapshotTerm = -1;
264 if (updateDataSize) {
265 // need to recalc the datasize based on the entries left after precommit.
267 for (ReplicatedLogEntry logEntry : journal) {
268 newDataSize += logEntry.size();
270 LOG.trace("{}: Updated dataSize from {} to {}", logContext, dataSize, newDataSize);
271 dataSize = newDataSize;
276 public void snapshotRollback() {
277 snapshottedJournal.addAll(journal);
278 journal = snapshottedJournal;
279 snapshottedJournal = null;
281 snapshotIndex = previousSnapshotIndex;
282 previousSnapshotIndex = -1;
284 snapshotTerm = previousSnapshotTerm;
285 previousSnapshotTerm = -1;
289 ReplicatedLogEntry getAtPhysicalIndex(final int index) {
290 return journal.get(index);