Log replicated log dataSize changes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / AbstractReplicatedLogImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
15 import org.eclipse.jdt.annotation.NonNull;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 /**
20  * Abstract class handling the mapping of
21  * logical LogEntry Index and the physical list index.
22  */
23 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
24     private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
25
26     private final String logContext;
27
28     // We define this as ArrayList so we can use ensureCapacity.
29     private ArrayList<ReplicatedLogEntry> journal;
30
31     private long snapshotIndex = -1;
32     private long snapshotTerm = -1;
33
34     // to be used for rollback during save snapshot failure
35     private ArrayList<ReplicatedLogEntry> snapshottedJournal;
36     private long previousSnapshotIndex = -1;
37     private long previousSnapshotTerm = -1;
38     private int dataSize = 0;
39
40     protected AbstractReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm,
41             final List<ReplicatedLogEntry> unAppliedEntries, final String logContext) {
42         this.snapshotIndex = snapshotIndex;
43         this.snapshotTerm = snapshotTerm;
44         this.logContext = logContext;
45
46         this.journal = new ArrayList<>(unAppliedEntries.size());
47         for (ReplicatedLogEntry entry: unAppliedEntries) {
48             append(entry);
49         }
50     }
51
52     protected AbstractReplicatedLogImpl() {
53         this(-1L, -1L, Collections.emptyList(), "");
54     }
55
56     protected int adjustedIndex(final long logEntryIndex) {
57         if (snapshotIndex < 0) {
58             return (int) logEntryIndex;
59         }
60         return (int) (logEntryIndex - (snapshotIndex + 1));
61     }
62
63     @Override
64     public ReplicatedLogEntry get(final long logEntryIndex) {
65         int adjustedIndex = adjustedIndex(logEntryIndex);
66
67         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
68             // physical index should be less than list size and >= 0
69             return null;
70         }
71
72         return journal.get(adjustedIndex);
73     }
74
75     @Override
76     public ReplicatedLogEntry last() {
77         if (journal.isEmpty()) {
78             return null;
79         }
80         // get the last entry directly from the physical index
81         return journal.get(journal.size() - 1);
82     }
83
84     @Override
85     public long lastIndex() {
86         if (journal.isEmpty()) {
87             // it can happen that after snapshot, all the entries of the
88             // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
89             return snapshotIndex;
90         }
91         return last().getIndex();
92     }
93
94     @Override
95     public long lastTerm() {
96         if (journal.isEmpty()) {
97             // it can happen that after snapshot, all the entries of the
98             // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
99             return snapshotTerm;
100         }
101         return last().getTerm();
102     }
103
104     @Override
105     public long removeFrom(final long logEntryIndex) {
106         int adjustedIndex = adjustedIndex(logEntryIndex);
107         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
108             // physical index should be less than list size and >= 0
109             return -1;
110         }
111
112         for (int i = adjustedIndex; i < journal.size(); i++) {
113             dataSize -= journal.get(i).size();
114         }
115
116         journal.subList(adjustedIndex , journal.size()).clear();
117
118         return adjustedIndex;
119     }
120
121     @Override
122     public boolean append(final ReplicatedLogEntry replicatedLogEntry) {
123         if (replicatedLogEntry.getIndex() > lastIndex()) {
124             journal.add(replicatedLogEntry);
125             dataSize += replicatedLogEntry.size();
126             return true;
127         } else {
128             LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}",
129                     logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace"));
130             return false;
131         }
132     }
133
134     @Override
135     public void increaseJournalLogCapacity(final int amount) {
136         journal.ensureCapacity(journal.size() + amount);
137     }
138
139     @Override
140     public List<ReplicatedLogEntry> getFrom(final long logEntryIndex) {
141         return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
142     }
143
144     @Override
145     public List<ReplicatedLogEntry> getFrom(final long logEntryIndex, final int maxEntries, final long maxDataSize) {
146         int adjustedIndex = adjustedIndex(logEntryIndex);
147         int size = journal.size();
148         if (adjustedIndex >= 0 && adjustedIndex < size) {
149             // physical index should be less than list size and >= 0
150             int maxIndex = adjustedIndex + maxEntries;
151             if (maxIndex > size) {
152                 maxIndex = size;
153             }
154
155             if (maxDataSize == NO_MAX_SIZE) {
156                 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
157             } else {
158                 return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
159             }
160         } else {
161             return Collections.emptyList();
162         }
163     }
164
165     private @NonNull List<ReplicatedLogEntry> copyJournalEntries(final int fromIndex, final int toIndex,
166             final long maxDataSize) {
167         List<ReplicatedLogEntry> retList = new ArrayList<>(toIndex - fromIndex);
168         long totalSize = 0;
169         for (int i = fromIndex; i < toIndex; i++) {
170             ReplicatedLogEntry entry = journal.get(i);
171             totalSize += entry.size();
172             if (totalSize <= maxDataSize) {
173                 retList.add(entry);
174             } else {
175                 if (retList.isEmpty()) {
176                     // Edge case - the first entry's size exceeds the threshold. We need to return
177                     // at least the first entry so add it here.
178                     retList.add(entry);
179                 }
180
181                 break;
182             }
183         }
184
185         return retList;
186     }
187
188     @Override
189     public long size() {
190         return journal.size();
191     }
192
193     @Override
194     public int dataSize() {
195         return dataSize;
196     }
197
198     @Override
199     public boolean isPresent(final long logEntryIndex) {
200         if (logEntryIndex > lastIndex()) {
201             // if the request logical index is less than the last present in the list
202             return false;
203         }
204         int adjustedIndex = adjustedIndex(logEntryIndex);
205         return adjustedIndex >= 0;
206     }
207
208     @Override
209     public boolean isInSnapshot(final long logEntryIndex) {
210         return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
211     }
212
213     @Override
214     public long getSnapshotIndex() {
215         return snapshotIndex;
216     }
217
218     @Override
219     public long getSnapshotTerm() {
220         return snapshotTerm;
221     }
222
223     @Override
224     public void setSnapshotIndex(final long snapshotIndex) {
225         this.snapshotIndex = snapshotIndex;
226     }
227
228     @Override
229     public void setSnapshotTerm(final long snapshotTerm) {
230         this.snapshotTerm = snapshotTerm;
231     }
232
233     @Override
234     public void clear(final int startIndex, final int endIndex) {
235         journal.subList(startIndex, endIndex).clear();
236     }
237
238     @Override
239     public void snapshotPreCommit(final long snapshotCapturedIndex, final long snapshotCapturedTerm) {
240         Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
241                 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
242
243         snapshottedJournal = new ArrayList<>(journal.size());
244
245         List<ReplicatedLogEntry> snapshotJournalEntries =
246                 journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
247
248         snapshottedJournal.addAll(snapshotJournalEntries);
249         snapshotJournalEntries.clear();
250
251         previousSnapshotIndex = snapshotIndex;
252         setSnapshotIndex(snapshotCapturedIndex);
253
254         previousSnapshotTerm = snapshotTerm;
255         setSnapshotTerm(snapshotCapturedTerm);
256     }
257
258     @Override
259     public void snapshotCommit() {
260         snapshottedJournal = null;
261         previousSnapshotIndex = -1;
262         previousSnapshotTerm = -1;
263
264         // need to recalc the datasize based on the entries left after precommit.
265         int newDataSize = 0;
266         for (ReplicatedLogEntry logEntry : journal) {
267             newDataSize += logEntry.size();
268         }
269         LOG.trace("{}: Updated dataSize from {} to {}", logContext, dataSize, newDataSize);
270         dataSize = newDataSize;
271     }
272
273     @Override
274     public void snapshotRollback() {
275         snapshottedJournal.addAll(journal);
276         journal = snapshottedJournal;
277         snapshottedJournal = null;
278
279         snapshotIndex = previousSnapshotIndex;
280         previousSnapshotIndex = -1;
281
282         snapshotTerm = previousSnapshotTerm;
283         previousSnapshotTerm = -1;
284     }
285
286     @VisibleForTesting
287     ReplicatedLogEntry getAtPhysicalIndex(final int index) {
288         return journal.get(index);
289     }
290 }