2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import com.google.protobuf.ByteString;
12 import java.util.ArrayList;
13 import java.util.List;
16 * Abstract class handling the mapping of
17 * logical LogEntry Index and the physical list index.
19 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
21 // We define this as ArrayList so we can use ensureCapacity.
22 protected ArrayList<ReplicatedLogEntry> journal;
23 protected ByteString snapshot;
24 protected long snapshotIndex = -1;
25 protected long snapshotTerm = -1;
27 // to be used for rollback during save snapshot failure
28 protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
29 protected ByteString previousSnapshot;
30 protected long previousSnapshotIndex = -1;
31 protected long previousSnapshotTerm = -1;
33 public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
34 long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
35 this.snapshot = state;
36 this.snapshotIndex = snapshotIndex;
37 this.snapshotTerm = snapshotTerm;
38 this.journal = new ArrayList<>(unAppliedEntries);
42 public AbstractReplicatedLogImpl() {
44 this.journal = new ArrayList<>();
47 protected int adjustedIndex(long logEntryIndex) {
48 if(snapshotIndex < 0){
49 return (int) logEntryIndex;
51 return (int) (logEntryIndex - (snapshotIndex + 1));
55 public ReplicatedLogEntry get(long logEntryIndex) {
56 int adjustedIndex = adjustedIndex(logEntryIndex);
58 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
59 // physical index should be less than list size and >= 0
63 return journal.get(adjustedIndex);
67 public ReplicatedLogEntry last() {
68 if (journal.isEmpty()) {
71 // get the last entry directly from the physical index
72 return journal.get(journal.size() - 1);
76 public long lastIndex() {
77 if (journal.isEmpty()) {
78 // it can happen that after snapshot, all the entries of the
79 // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
82 return last().getIndex();
86 public long lastTerm() {
87 if (journal.isEmpty()) {
88 // it can happen that after snapshot, all the entries of the
89 // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
92 return last().getTerm();
96 public void removeFrom(long logEntryIndex) {
97 int adjustedIndex = adjustedIndex(logEntryIndex);
98 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
99 // physical index should be less than list size and >= 0
102 journal.subList(adjustedIndex , journal.size()).clear();
106 public void append(ReplicatedLogEntry replicatedLogEntry) {
107 journal.add(replicatedLogEntry);
111 public void increaseJournalLogCapacity(int amount) {
112 journal.ensureCapacity(journal.size() + amount);
116 public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
117 return getFrom(logEntryIndex, journal.size());
121 public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
122 int adjustedIndex = adjustedIndex(logEntryIndex);
123 int size = journal.size();
124 List<ReplicatedLogEntry> entries = new ArrayList<>(100);
125 if (adjustedIndex >= 0 && adjustedIndex < size) {
126 // physical index should be less than list size and >= 0
127 int maxIndex = adjustedIndex + max;
131 entries.addAll(journal.subList(adjustedIndex, maxIndex));
139 return journal.size();
143 public boolean isPresent(long logEntryIndex) {
144 if (logEntryIndex > lastIndex()) {
145 // if the request logical index is less than the last present in the list
148 int adjustedIndex = adjustedIndex(logEntryIndex);
149 return (adjustedIndex >= 0);
153 public boolean isInSnapshot(long logEntryIndex) {
154 return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
158 public ByteString getSnapshot() {
163 public long getSnapshotIndex() {
164 return snapshotIndex;
168 public long getSnapshotTerm() {
173 public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
176 public abstract void removeFromAndPersist(long index);
179 public void setSnapshotIndex(long snapshotIndex) {
180 this.snapshotIndex = snapshotIndex;
184 public void setSnapshotTerm(long snapshotTerm) {
185 this.snapshotTerm = snapshotTerm;
189 public void setSnapshot(ByteString snapshot) {
190 this.snapshot = snapshot;
194 public void clear(int startIndex, int endIndex) {
195 journal.subList(startIndex, endIndex).clear();
199 public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
200 snapshottedJournal = new ArrayList<>(journal.size());
202 snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
203 clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
205 previousSnapshotIndex = snapshotIndex;
206 setSnapshotIndex(snapshotCapturedIndex);
208 previousSnapshotTerm = snapshotTerm;
209 setSnapshotTerm(snapshotCapturedTerm);
211 previousSnapshot = getSnapshot();
212 setSnapshot(snapshot);
216 public void snapshotCommit() {
217 snapshottedJournal = null;
218 previousSnapshotIndex = -1;
219 previousSnapshotTerm = -1;
220 previousSnapshot = null;
224 public void snapshotRollback() {
225 snapshottedJournal.addAll(journal);
226 journal = snapshottedJournal;
227 snapshottedJournal = null;
229 snapshotIndex = previousSnapshotIndex;
230 previousSnapshotIndex = -1;
232 snapshotTerm = previousSnapshotTerm;
233 previousSnapshotTerm = -1;
235 snapshot = previousSnapshot;
236 previousSnapshot = null;