c9a4ced4434e58e752679f00ef8e27292a639563
[controller.git] /
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import java.util.ArrayList;
14 import java.util.List;
15 import org.eclipse.jdt.annotation.NonNull;
16 import org.eclipse.jdt.annotation.NonNullByDefault;
17 import org.eclipse.jdt.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.spi.ImmutableRaftEntryMeta;
19 import org.opendaylight.controller.cluster.raft.spi.RaftEntryMeta;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 /**
24  * Abstract class handling the mapping of
25  * logical LogEntry Index and the physical list index.
26  */
27 public abstract class AbstractReplicatedLog implements ReplicatedLog {
28     private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLog.class);
29
30     final @NonNull String memberId;
31
32     // We define this as ArrayList so we can use ensureCapacity.
33     private ArrayList<ReplicatedLogEntry> journal = new ArrayList<>();
34
35     private long snapshotIndex = -1;
36     private long snapshotTerm = -1;
37     private long commitIndex = -1;
38     private long lastApplied = -1;
39
40     // to be used for rollback during save snapshot failure
41     private ArrayList<ReplicatedLogEntry> snapshottedJournal;
42     private long previousSnapshotIndex = -1;
43     private long previousSnapshotTerm = -1;
44     private int dataSize = 0;
45
46     protected AbstractReplicatedLog(final @NonNull String memberId) {
47         this.memberId = requireNonNull(memberId);
48     }
49
50     protected final int adjustedIndex(final long logEntryIndex) {
51         if (snapshotIndex < 0) {
52             return (int) logEntryIndex;
53         }
54         return (int) (logEntryIndex - (snapshotIndex + 1));
55     }
56
57     @Override
58     public final ReplicatedLogEntry get(final long logEntryIndex) {
59         int adjustedIndex = adjustedIndex(logEntryIndex);
60
61         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
62             // physical index should be less than list size and >= 0
63             return null;
64         }
65
66         return journal.get(adjustedIndex);
67     }
68
69     @Override
70     public final ReplicatedLogEntry last() {
71         if (journal.isEmpty()) {
72             return null;
73         }
74         // get the last entry directly from the physical index
75         return journal.get(journal.size() - 1);
76     }
77
78     @Override
79     public final long lastIndex() {
80         final var last = last();
81         // it can happen that after snapshot, all the entries of the journal are trimmed till lastApplied,
82         // so lastIndex = snapshotIndex
83         return last != null ? last.index() : snapshotIndex;
84     }
85
86     @Override
87     public final long lastTerm() {
88         final var last = last();
89         // it can happen that after snapshot, all the entries of the journal are trimmed till lastApplied,
90         // so lastTerm = snapshotTerm
91         return last != null ? last.term() : snapshotTerm;
92     }
93
94     @Override
95     public final long getCommitIndex() {
96         return commitIndex;
97     }
98
99     @Override
100     public final void setCommitIndex(final long commitIndex) {
101         this.commitIndex = commitIndex;
102     }
103
104     @Override
105     public final long getLastApplied() {
106         return lastApplied;
107     }
108
109     @Override
110     public final void setLastApplied(final long lastApplied) {
111         LOG.debug("{}: Moving last applied index from {} to {}", memberId, this.lastApplied, lastApplied,
112             LOG.isTraceEnabled() ? new Throwable() : null);
113         this.lastApplied = lastApplied;
114     }
115
116     @Override
117     public final long removeFrom(final long logEntryIndex) {
118         int adjustedIndex = adjustedIndex(logEntryIndex);
119         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
120             // physical index should be less than list size and >= 0
121             return -1;
122         }
123
124         for (int i = adjustedIndex; i < journal.size(); i++) {
125             dataSize -= journal.get(i).size();
126         }
127
128         journal.subList(adjustedIndex , journal.size()).clear();
129
130         return adjustedIndex;
131     }
132
133     @Override
134     public boolean append(final ReplicatedLogEntry replicatedLogEntry) {
135         final var entryIndex = replicatedLogEntry.index();
136         final var lastIndex = lastIndex();
137         if (entryIndex > lastIndex) {
138             journal.add(replicatedLogEntry);
139             dataSize += replicatedLogEntry.size();
140             return true;
141         }
142
143         LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}", memberId,
144             entryIndex, lastIndex, new Exception("stack trace"));
145         return false;
146     }
147
148     @Override
149     public final void increaseJournalLogCapacity(final int amount) {
150         journal.ensureCapacity(journal.size() + amount);
151     }
152
153     @Override
154     public final List<ReplicatedLogEntry> getFrom(final long logEntryIndex) {
155         return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
156     }
157
158     @Override
159     public final List<ReplicatedLogEntry> getFrom(final long logEntryIndex, final int maxEntries,
160             final long maxDataSize) {
161         int adjustedIndex = adjustedIndex(logEntryIndex);
162         int size = journal.size();
163         if (adjustedIndex < 0 || adjustedIndex >= size) {
164             return List.of();
165         }
166
167         // physical index should be less than list size and >= 0
168         int maxIndex = adjustedIndex + maxEntries;
169         if (maxIndex > size) {
170             maxIndex = size;
171         }
172
173         return maxDataSize == NO_MAX_SIZE ? new ArrayList<>(journal.subList(adjustedIndex, maxIndex))
174             : copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
175     }
176
177     private @NonNull List<ReplicatedLogEntry> copyJournalEntries(final int fromIndex, final int toIndex,
178             final long maxDataSize) {
179         final var retList = new ArrayList<ReplicatedLogEntry>(toIndex - fromIndex);
180         long totalSize = 0;
181         for (int i = fromIndex; i < toIndex; i++) {
182             final var entry = journal.get(i);
183             totalSize += entry.serializedSize();
184             if (totalSize > maxDataSize) {
185                 if (retList.isEmpty()) {
186                     // Edge case - the first entry's size exceeds the threshold. We need to return
187                     // at least the first entry so add it here.
188                     retList.add(entry);
189                 }
190                 break;
191             }
192
193             retList.add(entry);
194         }
195
196         return retList;
197     }
198
199     @Override
200     public final long size() {
201         return journal.size();
202     }
203
204     // Non-final for testing
205     @Override
206     public int dataSize() {
207         return dataSize;
208     }
209
210     @Override
211     public final boolean isPresent(final long logEntryIndex) {
212         if (logEntryIndex > lastIndex()) {
213             // if the request logical index is less than the last present in the list
214             return false;
215         }
216         int adjustedIndex = adjustedIndex(logEntryIndex);
217         return adjustedIndex >= 0;
218     }
219
220     @Override
221     public final boolean isInSnapshot(final long logEntryIndex) {
222         return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
223     }
224
225     @Override
226     public final long getSnapshotIndex() {
227         return snapshotIndex;
228     }
229
230     @Override
231     public final long getSnapshotTerm() {
232         return snapshotTerm;
233     }
234
235     @Override
236     public final void setSnapshotIndex(final long snapshotIndex) {
237         this.snapshotIndex = snapshotIndex;
238     }
239
240     @Override
241     public final void setSnapshotTerm(final long snapshotTerm) {
242         this.snapshotTerm = snapshotTerm;
243     }
244
245     @Override
246     public final void clear(final int startIndex, final int endIndex) {
247         journal.subList(startIndex, endIndex).clear();
248     }
249
250     @Override
251     public final void snapshotPreCommit(final long snapshotCapturedIndex, final long snapshotCapturedTerm) {
252         if (snapshotCapturedIndex < snapshotIndex) {
253             throw new IllegalArgumentException("snapshotCapturedIndex must be greater than or equal to snapshotIndex");
254         }
255
256         snapshottedJournal = new ArrayList<>(journal.size());
257
258         final var snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
259
260         snapshottedJournal.addAll(snapshotJournalEntries);
261         snapshotJournalEntries.clear();
262
263         previousSnapshotIndex = snapshotIndex;
264         setSnapshotIndex(snapshotCapturedIndex);
265
266         previousSnapshotTerm = snapshotTerm;
267         setSnapshotTerm(snapshotCapturedTerm);
268     }
269
270     @Override
271     public final void snapshotCommit(final boolean updateDataSize) {
272         snapshottedJournal = null;
273         previousSnapshotIndex = -1;
274         previousSnapshotTerm = -1;
275
276         if (updateDataSize) {
277             // need to recalc the datasize based on the entries left after precommit.
278             int newDataSize = 0;
279             for (ReplicatedLogEntry logEntry : journal) {
280                 newDataSize += logEntry.size();
281             }
282             LOG.trace("{}: Updated dataSize from {} to {}", memberId, dataSize, newDataSize);
283             dataSize = newDataSize;
284         }
285     }
286
287     @Override
288     public final void snapshotRollback() {
289         snapshottedJournal.addAll(journal);
290         journal = snapshottedJournal;
291         snapshottedJournal = null;
292
293         snapshotIndex = previousSnapshotIndex;
294         previousSnapshotIndex = -1;
295
296         snapshotTerm = previousSnapshotTerm;
297         previousSnapshotTerm = -1;
298     }
299
300     @VisibleForTesting
301     final ReplicatedLogEntry getAtPhysicalIndex(final int index) {
302         return journal.get(index);
303     }
304
305     @NonNullByDefault
306     static final RaftEntryMeta computeLastAppliedEntry(final ReplicatedLog log, final long originalIndex,
307             final @Nullable RaftEntryMeta lastLogEntry, final boolean hasFollowers) {
308         return hasFollowers ? compulateLastAppliedEntry(log, originalIndex)
309             : compulateLastAppliedEntry(log, lastLogEntry);
310     }
311
312     @NonNullByDefault
313     static final RaftEntryMeta compulateLastAppliedEntry(final ReplicatedLog log, final long originalIndex) {
314         final var entry = log.lookupMeta(originalIndex);
315         if (entry != null) {
316             return entry;
317         }
318
319         final var snapshotIndex = log.getSnapshotIndex();
320         return snapshotIndex > -1 ? ImmutableRaftEntryMeta.of(snapshotIndex, log.getSnapshotTerm())
321             : ImmutableRaftEntryMeta.of(-1, -1);
322     }
323
324     @NonNullByDefault
325     static final RaftEntryMeta compulateLastAppliedEntry(final ReplicatedLog log,
326             final @Nullable RaftEntryMeta lastLogEntry) {
327         if (lastLogEntry != null) {
328             // since we have persisted the last-log-entry to persistent journal before the capture, we would want
329             // to snapshot from this entry.
330             return lastLogEntry;
331         }
332         return ImmutableRaftEntryMeta.of(-1, -1);
333     }
334 }