2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
19 * Abstract class handling the mapping of
20 * logical LogEntry Index and the physical list index.
22 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
23 private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
25 private final String logContext;
27 // We define this as ArrayList so we can use ensureCapacity.
28 private ArrayList<ReplicatedLogEntry> journal;
30 private long snapshotIndex = -1;
31 private long snapshotTerm = -1;
33 // to be used for rollback during save snapshot failure
34 private ArrayList<ReplicatedLogEntry> snapshottedJournal;
35 private long previousSnapshotIndex = -1;
36 private long previousSnapshotTerm = -1;
37 private int dataSize = 0;
39 protected AbstractReplicatedLogImpl(long snapshotIndex, long snapshotTerm,
40 List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
41 this.snapshotIndex = snapshotIndex;
42 this.snapshotTerm = snapshotTerm;
43 this.logContext = logContext;
45 this.journal = new ArrayList<>(unAppliedEntries.size());
46 for (ReplicatedLogEntry entry: unAppliedEntries) {
51 protected AbstractReplicatedLogImpl() {
52 this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), "");
55 protected int adjustedIndex(long logEntryIndex) {
56 if (snapshotIndex < 0) {
57 return (int) logEntryIndex;
59 return (int) (logEntryIndex - (snapshotIndex + 1));
63 public ReplicatedLogEntry get(long logEntryIndex) {
64 int adjustedIndex = adjustedIndex(logEntryIndex);
66 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
67 // physical index should be less than list size and >= 0
71 return journal.get(adjustedIndex);
75 public ReplicatedLogEntry last() {
76 if (journal.isEmpty()) {
79 // get the last entry directly from the physical index
80 return journal.get(journal.size() - 1);
84 public long lastIndex() {
85 if (journal.isEmpty()) {
86 // it can happen that after snapshot, all the entries of the
87 // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
90 return last().getIndex();
94 public long lastTerm() {
95 if (journal.isEmpty()) {
96 // it can happen that after snapshot, all the entries of the
97 // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
100 return last().getTerm();
104 public long removeFrom(long logEntryIndex) {
105 int adjustedIndex = adjustedIndex(logEntryIndex);
106 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
107 // physical index should be less than list size and >= 0
111 for (int i = adjustedIndex; i < journal.size(); i++) {
112 dataSize -= journal.get(i).size();
115 journal.subList(adjustedIndex , journal.size()).clear();
117 return adjustedIndex;
121 public boolean append(ReplicatedLogEntry replicatedLogEntry) {
122 if (replicatedLogEntry.getIndex() > lastIndex()) {
123 journal.add(replicatedLogEntry);
124 dataSize += replicatedLogEntry.size();
127 LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}",
128 logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace"));
134 public void increaseJournalLogCapacity(int amount) {
135 journal.ensureCapacity(journal.size() + amount);
139 public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
140 return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
144 public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int maxEntries, long maxDataSize) {
145 int adjustedIndex = adjustedIndex(logEntryIndex);
146 int size = journal.size();
147 if (adjustedIndex >= 0 && adjustedIndex < size) {
148 // physical index should be less than list size and >= 0
149 int maxIndex = adjustedIndex + maxEntries;
150 if (maxIndex > size) {
154 if (maxDataSize == NO_MAX_SIZE) {
155 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
157 return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
160 return Collections.emptyList();
164 private List<ReplicatedLogEntry> copyJournalEntries(int fromIndex, int toIndex, long maxDataSize) {
165 List<ReplicatedLogEntry> retList = new ArrayList<>(toIndex - fromIndex);
167 for (int i = fromIndex; i < toIndex; i++) {
168 ReplicatedLogEntry entry = journal.get(i);
169 totalSize += entry.size();
170 if (totalSize <= maxDataSize) {
173 if (retList.isEmpty()) {
174 // Edge case - the first entry's size exceeds the threshold. We need to return
175 // at least the first entry so add it here.
188 return journal.size();
192 public int dataSize() {
197 public boolean isPresent(long logEntryIndex) {
198 if (logEntryIndex > lastIndex()) {
199 // if the request logical index is less than the last present in the list
202 int adjustedIndex = adjustedIndex(logEntryIndex);
203 return adjustedIndex >= 0;
207 public boolean isInSnapshot(long logEntryIndex) {
208 return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
212 public long getSnapshotIndex() {
213 return snapshotIndex;
217 public long getSnapshotTerm() {
222 public void setSnapshotIndex(long snapshotIndex) {
223 this.snapshotIndex = snapshotIndex;
227 public void setSnapshotTerm(long snapshotTerm) {
228 this.snapshotTerm = snapshotTerm;
232 public void clear(int startIndex, int endIndex) {
233 journal.subList(startIndex, endIndex).clear();
237 public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
238 Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
239 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
241 snapshottedJournal = new ArrayList<>(journal.size());
243 List<ReplicatedLogEntry> snapshotJournalEntries =
244 journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
246 snapshottedJournal.addAll(snapshotJournalEntries);
247 snapshotJournalEntries.clear();
249 previousSnapshotIndex = snapshotIndex;
250 setSnapshotIndex(snapshotCapturedIndex);
252 previousSnapshotTerm = snapshotTerm;
253 setSnapshotTerm(snapshotCapturedTerm);
257 public void snapshotCommit() {
258 snapshottedJournal = null;
259 previousSnapshotIndex = -1;
260 previousSnapshotTerm = -1;
262 // need to recalc the datasize based on the entries left after precommit.
263 for (ReplicatedLogEntry logEntry : journal) {
264 dataSize += logEntry.size();
270 public void snapshotRollback() {
271 snapshottedJournal.addAll(journal);
272 journal = snapshottedJournal;
273 snapshottedJournal = null;
275 snapshotIndex = previousSnapshotIndex;
276 previousSnapshotIndex = -1;
278 snapshotTerm = previousSnapshotTerm;
279 previousSnapshotTerm = -1;
283 ReplicatedLogEntry getAtPhysicalIndex(int index) {
284 return journal.get(index);