2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
17 * Abstract class handling the mapping of
18 * logical LogEntry Index and the physical list index.
20 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
22 // We define this as ArrayList so we can use ensureCapacity.
23 private ArrayList<ReplicatedLogEntry> journal;
25 private long snapshotIndex = -1;
26 private long snapshotTerm = -1;
28 // to be used for rollback during save snapshot failure
29 private ArrayList<ReplicatedLogEntry> snapshottedJournal;
30 private long previousSnapshotIndex = -1;
31 private long previousSnapshotTerm = -1;
32 private int dataSize = 0;
34 public AbstractReplicatedLogImpl(long snapshotIndex,
35 long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
36 this.snapshotIndex = snapshotIndex;
37 this.snapshotTerm = snapshotTerm;
38 this.journal = new ArrayList<>(unAppliedEntries);
40 for(ReplicatedLogEntry entry: journal) {
41 dataSize += entry.size();
45 public AbstractReplicatedLogImpl() {
46 this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
49 protected int adjustedIndex(long logEntryIndex) {
50 if (snapshotIndex < 0) {
51 return (int) logEntryIndex;
53 return (int) (logEntryIndex - (snapshotIndex + 1));
57 public ReplicatedLogEntry get(long logEntryIndex) {
58 int adjustedIndex = adjustedIndex(logEntryIndex);
60 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
61 // physical index should be less than list size and >= 0
65 return journal.get(adjustedIndex);
69 public ReplicatedLogEntry last() {
70 if (journal.isEmpty()) {
73 // get the last entry directly from the physical index
74 return journal.get(journal.size() - 1);
78 public long lastIndex() {
79 if (journal.isEmpty()) {
80 // it can happen that after snapshot, all the entries of the
81 // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
84 return last().getIndex();
88 public long lastTerm() {
89 if (journal.isEmpty()) {
90 // it can happen that after snapshot, all the entries of the
91 // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
94 return last().getTerm();
98 public long removeFrom(long logEntryIndex) {
99 int adjustedIndex = adjustedIndex(logEntryIndex);
100 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
101 // physical index should be less than list size and >= 0
105 for(int i = adjustedIndex; i < journal.size(); i++) {
106 dataSize -= journal.get(i).size();
109 journal.subList(adjustedIndex , journal.size()).clear();
111 return adjustedIndex;
115 public void append(ReplicatedLogEntry replicatedLogEntry) {
116 journal.add(replicatedLogEntry);
117 dataSize += replicatedLogEntry.size();
121 public void increaseJournalLogCapacity(int amount) {
122 journal.ensureCapacity(journal.size() + amount);
126 public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
127 return getFrom(logEntryIndex, journal.size());
131 public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
132 int adjustedIndex = adjustedIndex(logEntryIndex);
133 int size = journal.size();
134 if (adjustedIndex >= 0 && adjustedIndex < size) {
135 // physical index should be less than list size and >= 0
136 int maxIndex = adjustedIndex + max;
140 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
142 return Collections.emptyList();
148 return journal.size();
152 public int dataSize() {
157 public boolean isPresent(long logEntryIndex) {
158 if (logEntryIndex > lastIndex()) {
159 // if the request logical index is less than the last present in the list
162 int adjustedIndex = adjustedIndex(logEntryIndex);
163 return (adjustedIndex >= 0);
167 public boolean isInSnapshot(long logEntryIndex) {
168 return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
172 public long getSnapshotIndex() {
173 return snapshotIndex;
177 public long getSnapshotTerm() {
182 public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
185 public abstract void removeFromAndPersist(long index);
188 public void setSnapshotIndex(long snapshotIndex) {
189 this.snapshotIndex = snapshotIndex;
193 public void setSnapshotTerm(long snapshotTerm) {
194 this.snapshotTerm = snapshotTerm;
198 public void clear(int startIndex, int endIndex) {
199 journal.subList(startIndex, endIndex).clear();
203 public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
204 Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
205 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
207 snapshottedJournal = new ArrayList<>(journal.size());
209 List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
211 snapshottedJournal.addAll(snapshotJournalEntries);
212 snapshotJournalEntries.clear();
214 previousSnapshotIndex = snapshotIndex;
215 setSnapshotIndex(snapshotCapturedIndex);
217 previousSnapshotTerm = snapshotTerm;
218 setSnapshotTerm(snapshotCapturedTerm);
222 public void snapshotCommit() {
223 snapshottedJournal = null;
224 previousSnapshotIndex = -1;
225 previousSnapshotTerm = -1;
227 // need to recalc the datasize based on the entries left after precommit.
228 for(ReplicatedLogEntry logEntry : journal) {
229 dataSize += logEntry.size();
235 public void snapshotRollback() {
236 snapshottedJournal.addAll(journal);
237 journal = snapshottedJournal;
238 snapshottedJournal = null;
240 snapshotIndex = previousSnapshotIndex;
241 previousSnapshotIndex = -1;
243 snapshotTerm = previousSnapshotTerm;
244 previousSnapshotTerm = -1;
248 ReplicatedLogEntry getAtPhysicalIndex(int index) {
249 return journal.get(index);