2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
17 * Abstract class handling the mapping of
18 * logical LogEntry Index and the physical list index.
20 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
22 // We define this as ArrayList so we can use ensureCapacity.
23 private ArrayList<ReplicatedLogEntry> journal;
25 private long snapshotIndex = -1;
26 private long snapshotTerm = -1;
28 // to be used for rollback during save snapshot failure
29 private ArrayList<ReplicatedLogEntry> snapshottedJournal;
30 private long previousSnapshotIndex = -1;
31 private long previousSnapshotTerm = -1;
32 private int dataSize = 0;
34 public AbstractReplicatedLogImpl(long snapshotIndex,
35 long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
36 this.snapshotIndex = snapshotIndex;
37 this.snapshotTerm = snapshotTerm;
38 this.journal = new ArrayList<>(unAppliedEntries);
40 for(ReplicatedLogEntry entry: journal) {
41 dataSize += entry.size();
45 public AbstractReplicatedLogImpl() {
46 this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
49 protected int adjustedIndex(long logEntryIndex) {
50 if (snapshotIndex < 0) {
51 return (int) logEntryIndex;
53 return (int) (logEntryIndex - (snapshotIndex + 1));
57 public ReplicatedLogEntry get(long logEntryIndex) {
58 int adjustedIndex = adjustedIndex(logEntryIndex);
60 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
61 // physical index should be less than list size and >= 0
65 return journal.get(adjustedIndex);
69 public ReplicatedLogEntry last() {
70 if (journal.isEmpty()) {
73 // get the last entry directly from the physical index
74 return journal.get(journal.size() - 1);
78 public long lastIndex() {
79 if (journal.isEmpty()) {
80 // it can happen that after snapshot, all the entries of the
81 // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
84 return last().getIndex();
88 public long lastTerm() {
89 if (journal.isEmpty()) {
90 // it can happen that after snapshot, all the entries of the
91 // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
94 return last().getTerm();
98 public long removeFrom(long logEntryIndex) {
99 int adjustedIndex = adjustedIndex(logEntryIndex);
100 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
101 // physical index should be less than list size and >= 0
105 for(int i = adjustedIndex; i < journal.size(); i++) {
106 dataSize -= journal.get(i).size();
109 journal.subList(adjustedIndex , journal.size()).clear();
111 return adjustedIndex;
115 public void append(ReplicatedLogEntry replicatedLogEntry) {
116 journal.add(replicatedLogEntry);
117 dataSize += replicatedLogEntry.size();
121 public void increaseJournalLogCapacity(int amount) {
122 journal.ensureCapacity(journal.size() + amount);
126 public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
127 return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
131 public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int maxEntries, long maxDataSize) {
132 int adjustedIndex = adjustedIndex(logEntryIndex);
133 int size = journal.size();
134 if (adjustedIndex >= 0 && adjustedIndex < size) {
135 // physical index should be less than list size and >= 0
136 int maxIndex = adjustedIndex + maxEntries;
141 if(maxDataSize == NO_MAX_SIZE) {
142 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
144 List<ReplicatedLogEntry> retList = new ArrayList<>(maxIndex - adjustedIndex);
146 for(int i = adjustedIndex; i < maxIndex; i++) {
147 ReplicatedLogEntry entry = journal.get(i);
148 totalSize += entry.size();
149 if(totalSize <= maxDataSize) {
152 if(retList.isEmpty()) {
153 // Edge case - the first entry's size exceeds the threshold. We need to return
154 // at least the first entry so add it here.
165 return Collections.emptyList();
171 return journal.size();
175 public int dataSize() {
180 public boolean isPresent(long logEntryIndex) {
181 if (logEntryIndex > lastIndex()) {
182 // if the request logical index is less than the last present in the list
185 int adjustedIndex = adjustedIndex(logEntryIndex);
186 return (adjustedIndex >= 0);
190 public boolean isInSnapshot(long logEntryIndex) {
191 return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
195 public long getSnapshotIndex() {
196 return snapshotIndex;
200 public long getSnapshotTerm() {
205 public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
208 public abstract void removeFromAndPersist(long index);
211 public void setSnapshotIndex(long snapshotIndex) {
212 this.snapshotIndex = snapshotIndex;
216 public void setSnapshotTerm(long snapshotTerm) {
217 this.snapshotTerm = snapshotTerm;
221 public void clear(int startIndex, int endIndex) {
222 journal.subList(startIndex, endIndex).clear();
226 public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
227 Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
228 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
230 snapshottedJournal = new ArrayList<>(journal.size());
232 List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
234 snapshottedJournal.addAll(snapshotJournalEntries);
235 snapshotJournalEntries.clear();
237 previousSnapshotIndex = snapshotIndex;
238 setSnapshotIndex(snapshotCapturedIndex);
240 previousSnapshotTerm = snapshotTerm;
241 setSnapshotTerm(snapshotCapturedTerm);
245 public void snapshotCommit() {
246 snapshottedJournal = null;
247 previousSnapshotIndex = -1;
248 previousSnapshotTerm = -1;
250 // need to recalc the datasize based on the entries left after precommit.
251 for(ReplicatedLogEntry logEntry : journal) {
252 dataSize += logEntry.size();
258 public void snapshotRollback() {
259 snapshottedJournal.addAll(journal);
260 journal = snapshottedJournal;
261 snapshottedJournal = null;
263 snapshotIndex = previousSnapshotIndex;
264 previousSnapshotIndex = -1;
266 snapshotTerm = previousSnapshotTerm;
267 previousSnapshotTerm = -1;
271 ReplicatedLogEntry getAtPhysicalIndex(int index) {
272 return journal.get(index);