2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
19 * Abstract class handling the mapping of
20 * logical LogEntry Index and the physical list index.
22 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
23 private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
25 private final String logContext;
27 // We define this as ArrayList so we can use ensureCapacity.
28 private ArrayList<ReplicatedLogEntry> journal;
30 private long snapshotIndex = -1;
31 private long snapshotTerm = -1;
33 // to be used for rollback during save snapshot failure
34 private ArrayList<ReplicatedLogEntry> snapshottedJournal;
35 private long previousSnapshotIndex = -1;
36 private long previousSnapshotTerm = -1;
37 private int dataSize = 0;
39 public AbstractReplicatedLogImpl(long snapshotIndex,
40 long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
41 this.snapshotIndex = snapshotIndex;
42 this.snapshotTerm = snapshotTerm;
43 this.logContext = logContext;
45 this.journal = new ArrayList<>(unAppliedEntries.size());
46 for(ReplicatedLogEntry entry: unAppliedEntries) {
51 public AbstractReplicatedLogImpl() {
52 this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), "");
55 protected int adjustedIndex(long logEntryIndex) {
56 if (snapshotIndex < 0) {
57 return (int) logEntryIndex;
59 return (int) (logEntryIndex - (snapshotIndex + 1));
63 public ReplicatedLogEntry get(long logEntryIndex) {
64 int adjustedIndex = adjustedIndex(logEntryIndex);
66 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
67 // physical index should be less than list size and >= 0
71 return journal.get(adjustedIndex);
75 public ReplicatedLogEntry last() {
76 if (journal.isEmpty()) {
79 // get the last entry directly from the physical index
80 return journal.get(journal.size() - 1);
84 public long lastIndex() {
85 if (journal.isEmpty()) {
86 // it can happen that after snapshot, all the entries of the
87 // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
90 return last().getIndex();
94 public long lastTerm() {
95 if (journal.isEmpty()) {
96 // it can happen that after snapshot, all the entries of the
97 // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
100 return last().getTerm();
104 public long removeFrom(long logEntryIndex) {
105 int adjustedIndex = adjustedIndex(logEntryIndex);
106 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
107 // physical index should be less than list size and >= 0
111 for(int i = adjustedIndex; i < journal.size(); i++) {
112 dataSize -= journal.get(i).size();
115 journal.subList(adjustedIndex , journal.size()).clear();
117 return adjustedIndex;
121 public boolean append(ReplicatedLogEntry replicatedLogEntry) {
122 if(replicatedLogEntry.getIndex() > lastIndex()) {
123 journal.add(replicatedLogEntry);
124 dataSize += replicatedLogEntry.size();
127 LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}",
128 logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace"));
134 public void increaseJournalLogCapacity(int amount) {
135 journal.ensureCapacity(journal.size() + amount);
139 public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
140 return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
144 public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int maxEntries, long maxDataSize) {
145 int adjustedIndex = adjustedIndex(logEntryIndex);
146 int size = journal.size();
147 if (adjustedIndex >= 0 && adjustedIndex < size) {
148 // physical index should be less than list size and >= 0
149 int maxIndex = adjustedIndex + maxEntries;
154 if(maxDataSize == NO_MAX_SIZE) {
155 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
157 List<ReplicatedLogEntry> retList = new ArrayList<>(maxIndex - adjustedIndex);
159 for(int i = adjustedIndex; i < maxIndex; i++) {
160 ReplicatedLogEntry entry = journal.get(i);
161 totalSize += entry.size();
162 if(totalSize <= maxDataSize) {
165 if(retList.isEmpty()) {
166 // Edge case - the first entry's size exceeds the threshold. We need to return
167 // at least the first entry so add it here.
178 return Collections.emptyList();
184 return journal.size();
188 public int dataSize() {
193 public boolean isPresent(long logEntryIndex) {
194 if (logEntryIndex > lastIndex()) {
195 // if the request logical index is less than the last present in the list
198 int adjustedIndex = adjustedIndex(logEntryIndex);
199 return (adjustedIndex >= 0);
203 public boolean isInSnapshot(long logEntryIndex) {
204 return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
208 public long getSnapshotIndex() {
209 return snapshotIndex;
213 public long getSnapshotTerm() {
218 public void setSnapshotIndex(long snapshotIndex) {
219 this.snapshotIndex = snapshotIndex;
223 public void setSnapshotTerm(long snapshotTerm) {
224 this.snapshotTerm = snapshotTerm;
228 public void clear(int startIndex, int endIndex) {
229 journal.subList(startIndex, endIndex).clear();
233 public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
234 Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
235 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
237 snapshottedJournal = new ArrayList<>(journal.size());
239 List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
241 snapshottedJournal.addAll(snapshotJournalEntries);
242 snapshotJournalEntries.clear();
244 previousSnapshotIndex = snapshotIndex;
245 setSnapshotIndex(snapshotCapturedIndex);
247 previousSnapshotTerm = snapshotTerm;
248 setSnapshotTerm(snapshotCapturedTerm);
252 public void snapshotCommit() {
253 snapshottedJournal = null;
254 previousSnapshotIndex = -1;
255 previousSnapshotTerm = -1;
257 // need to recalc the datasize based on the entries left after precommit.
258 for(ReplicatedLogEntry logEntry : journal) {
259 dataSize += logEntry.size();
265 public void snapshotRollback() {
266 snapshottedJournal.addAll(journal);
267 journal = snapshottedJournal;
268 snapshottedJournal = null;
270 snapshotIndex = previousSnapshotIndex;
271 previousSnapshotIndex = -1;
273 snapshotTerm = previousSnapshotTerm;
274 previousSnapshotTerm = -1;
278 ReplicatedLogEntry getAtPhysicalIndex(int index) {
279 return journal.get(index);