2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import com.google.common.base.Preconditions;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
16 * Abstract class handling the mapping of
17 * logical LogEntry Index and the physical list index.
19 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
21 // We define this as ArrayList so we can use ensureCapacity.
22 protected ArrayList<ReplicatedLogEntry> journal;
24 private long snapshotIndex = -1;
25 private long snapshotTerm = -1;
27 // to be used for rollback during save snapshot failure
28 private ArrayList<ReplicatedLogEntry> snapshottedJournal;
29 private long previousSnapshotIndex = -1;
30 private long previousSnapshotTerm = -1;
31 protected int dataSize = 0;
33 public AbstractReplicatedLogImpl(long snapshotIndex,
34 long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
35 this.snapshotIndex = snapshotIndex;
36 this.snapshotTerm = snapshotTerm;
37 this.journal = new ArrayList<>(unAppliedEntries);
40 public AbstractReplicatedLogImpl() {
41 this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
44 protected int adjustedIndex(long logEntryIndex) {
45 if (snapshotIndex < 0) {
46 return (int) logEntryIndex;
48 return (int) (logEntryIndex - (snapshotIndex + 1));
52 public ReplicatedLogEntry get(long logEntryIndex) {
53 int adjustedIndex = adjustedIndex(logEntryIndex);
55 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
56 // physical index should be less than list size and >= 0
60 return journal.get(adjustedIndex);
64 public ReplicatedLogEntry last() {
65 if (journal.isEmpty()) {
68 // get the last entry directly from the physical index
69 return journal.get(journal.size() - 1);
73 public long lastIndex() {
74 if (journal.isEmpty()) {
75 // it can happen that after snapshot, all the entries of the
76 // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
79 return last().getIndex();
83 public long lastTerm() {
84 if (journal.isEmpty()) {
85 // it can happen that after snapshot, all the entries of the
86 // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
89 return last().getTerm();
93 public void removeFrom(long logEntryIndex) {
94 int adjustedIndex = adjustedIndex(logEntryIndex);
95 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
96 // physical index should be less than list size and >= 0
99 journal.subList(adjustedIndex , journal.size()).clear();
103 public void append(ReplicatedLogEntry replicatedLogEntry) {
104 journal.add(replicatedLogEntry);
108 public void increaseJournalLogCapacity(int amount) {
109 journal.ensureCapacity(journal.size() + amount);
113 public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
114 return getFrom(logEntryIndex, journal.size());
118 public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
119 int adjustedIndex = adjustedIndex(logEntryIndex);
120 int size = journal.size();
121 if (adjustedIndex >= 0 && adjustedIndex < size) {
122 // physical index should be less than list size and >= 0
123 int maxIndex = adjustedIndex + max;
127 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
129 return Collections.emptyList();
135 return journal.size();
139 public int dataSize() {
144 public boolean isPresent(long logEntryIndex) {
145 if (logEntryIndex > lastIndex()) {
146 // if the request logical index is less than the last present in the list
149 int adjustedIndex = adjustedIndex(logEntryIndex);
150 return (adjustedIndex >= 0);
154 public boolean isInSnapshot(long logEntryIndex) {
155 return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
159 public long getSnapshotIndex() {
160 return snapshotIndex;
164 public long getSnapshotTerm() {
169 public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
172 public abstract void removeFromAndPersist(long index);
175 public void setSnapshotIndex(long snapshotIndex) {
176 this.snapshotIndex = snapshotIndex;
180 public void setSnapshotTerm(long snapshotTerm) {
181 this.snapshotTerm = snapshotTerm;
185 public void clear(int startIndex, int endIndex) {
186 journal.subList(startIndex, endIndex).clear();
190 public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
191 Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
192 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
194 snapshottedJournal = new ArrayList<>(journal.size());
196 List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
198 snapshottedJournal.addAll(snapshotJournalEntries);
199 clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
201 previousSnapshotIndex = snapshotIndex;
202 setSnapshotIndex(snapshotCapturedIndex);
204 previousSnapshotTerm = snapshotTerm;
205 setSnapshotTerm(snapshotCapturedTerm);
209 public void snapshotCommit() {
210 snapshottedJournal = null;
211 previousSnapshotIndex = -1;
212 previousSnapshotTerm = -1;
214 // need to recalc the datasize based on the entries left after precommit.
215 for(ReplicatedLogEntry logEntry : journal) {
216 dataSize += logEntry.size();
222 public void snapshotRollback() {
223 snapshottedJournal.addAll(journal);
224 journal = snapshottedJournal;
225 snapshottedJournal = null;
227 snapshotIndex = previousSnapshotIndex;
228 previousSnapshotIndex = -1;
230 snapshotTerm = previousSnapshotTerm;
231 previousSnapshotTerm = -1;