Separate out RaftEntryMeta
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / AbstractReplicatedLogImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
15 import org.eclipse.jdt.annotation.NonNull;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 /**
20  * Abstract class handling the mapping of
21  * logical LogEntry Index and the physical list index.
22  */
23 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
24     private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
25
26     private final String logContext;
27
28     // We define this as ArrayList so we can use ensureCapacity.
29     private ArrayList<ReplicatedLogEntry> journal;
30
31     private long snapshotIndex = -1;
32     private long snapshotTerm = -1;
33
34     // to be used for rollback during save snapshot failure
35     private ArrayList<ReplicatedLogEntry> snapshottedJournal;
36     private long previousSnapshotIndex = -1;
37     private long previousSnapshotTerm = -1;
38     private int dataSize = 0;
39
40     protected AbstractReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm,
41             final List<ReplicatedLogEntry> unAppliedEntries, final String logContext) {
42         this.snapshotIndex = snapshotIndex;
43         this.snapshotTerm = snapshotTerm;
44         this.logContext = logContext;
45
46         journal = new ArrayList<>(unAppliedEntries.size());
47         for (ReplicatedLogEntry entry: unAppliedEntries) {
48             append(entry);
49         }
50     }
51
52     protected AbstractReplicatedLogImpl() {
53         this(-1L, -1L, Collections.emptyList(), "");
54     }
55
56     protected int adjustedIndex(final long logEntryIndex) {
57         if (snapshotIndex < 0) {
58             return (int) logEntryIndex;
59         }
60         return (int) (logEntryIndex - (snapshotIndex + 1));
61     }
62
63     @Override
64     public ReplicatedLogEntry get(final long logEntryIndex) {
65         int adjustedIndex = adjustedIndex(logEntryIndex);
66
67         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
68             // physical index should be less than list size and >= 0
69             return null;
70         }
71
72         return journal.get(adjustedIndex);
73     }
74
75     @Override
76     public ReplicatedLogEntry last() {
77         if (journal.isEmpty()) {
78             return null;
79         }
80         // get the last entry directly from the physical index
81         return journal.get(journal.size() - 1);
82     }
83
84     @Override
85     public RaftEntryMeta lastMeta() {
86         return last();
87     }
88
89     @Override
90     public long lastIndex() {
91         final var last = last();
92         // it can happen that after snapshot, all the entries of the journal are trimmed till lastApplied,
93         // so lastIndex = snapshotIndex
94         return last != null ? last.index() : snapshotIndex;
95     }
96
97     @Override
98     public long lastTerm() {
99         final var last = last();
100         // it can happen that after snapshot, all the entries of the journal are trimmed till lastApplied,
101         // so lastTerm = snapshotTerm
102         return last != null ? last.term() : snapshotTerm;
103     }
104
105     @Override
106     public long removeFrom(final long logEntryIndex) {
107         int adjustedIndex = adjustedIndex(logEntryIndex);
108         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
109             // physical index should be less than list size and >= 0
110             return -1;
111         }
112
113         for (int i = adjustedIndex; i < journal.size(); i++) {
114             dataSize -= journal.get(i).size();
115         }
116
117         journal.subList(adjustedIndex , journal.size()).clear();
118
119         return adjustedIndex;
120     }
121
122     @Override
123     public boolean append(final ReplicatedLogEntry replicatedLogEntry) {
124         final var entryIndex = replicatedLogEntry.index();
125         final var lastIndex = lastIndex();
126         if (entryIndex > lastIndex) {
127             journal.add(replicatedLogEntry);
128             dataSize += replicatedLogEntry.size();
129             return true;
130         }
131
132         LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}", logContext,
133             entryIndex, lastIndex, new Exception("stack trace"));
134         return false;
135     }
136
137     @Override
138     public void increaseJournalLogCapacity(final int amount) {
139         journal.ensureCapacity(journal.size() + amount);
140     }
141
142     @Override
143     public List<ReplicatedLogEntry> getFrom(final long logEntryIndex) {
144         return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
145     }
146
147     @Override
148     public List<ReplicatedLogEntry> getFrom(final long logEntryIndex, final int maxEntries, final long maxDataSize) {
149         int adjustedIndex = adjustedIndex(logEntryIndex);
150         int size = journal.size();
151         if (adjustedIndex >= 0 && adjustedIndex < size) {
152             // physical index should be less than list size and >= 0
153             int maxIndex = adjustedIndex + maxEntries;
154             if (maxIndex > size) {
155                 maxIndex = size;
156             }
157
158             if (maxDataSize == NO_MAX_SIZE) {
159                 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
160             } else {
161                 return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
162             }
163         } else {
164             return Collections.emptyList();
165         }
166     }
167
168     private @NonNull List<ReplicatedLogEntry> copyJournalEntries(final int fromIndex, final int toIndex,
169             final long maxDataSize) {
170         final var retList = new ArrayList<ReplicatedLogEntry>(toIndex - fromIndex);
171         long totalSize = 0;
172         for (int i = fromIndex; i < toIndex; i++) {
173             final var entry = journal.get(i);
174             totalSize += entry.serializedSize();
175             if (totalSize <= maxDataSize) {
176                 retList.add(entry);
177             } else {
178                 if (retList.isEmpty()) {
179                     // Edge case - the first entry's size exceeds the threshold. We need to return
180                     // at least the first entry so add it here.
181                     retList.add(entry);
182                 }
183
184                 break;
185             }
186         }
187
188         return retList;
189     }
190
191     @Override
192     public long size() {
193         return journal.size();
194     }
195
196     @Override
197     public int dataSize() {
198         return dataSize;
199     }
200
201     @Override
202     public boolean isPresent(final long logEntryIndex) {
203         if (logEntryIndex > lastIndex()) {
204             // if the request logical index is less than the last present in the list
205             return false;
206         }
207         int adjustedIndex = adjustedIndex(logEntryIndex);
208         return adjustedIndex >= 0;
209     }
210
211     @Override
212     public boolean isInSnapshot(final long logEntryIndex) {
213         return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
214     }
215
216     @Override
217     public long getSnapshotIndex() {
218         return snapshotIndex;
219     }
220
221     @Override
222     public long getSnapshotTerm() {
223         return snapshotTerm;
224     }
225
226     @Override
227     public void setSnapshotIndex(final long snapshotIndex) {
228         this.snapshotIndex = snapshotIndex;
229     }
230
231     @Override
232     public void setSnapshotTerm(final long snapshotTerm) {
233         this.snapshotTerm = snapshotTerm;
234     }
235
236     @Override
237     public void clear(final int startIndex, final int endIndex) {
238         journal.subList(startIndex, endIndex).clear();
239     }
240
241     @Override
242     public void snapshotPreCommit(final long snapshotCapturedIndex, final long snapshotCapturedTerm) {
243         Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
244                 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
245
246         snapshottedJournal = new ArrayList<>(journal.size());
247
248         final var snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
249
250         snapshottedJournal.addAll(snapshotJournalEntries);
251         snapshotJournalEntries.clear();
252
253         previousSnapshotIndex = snapshotIndex;
254         setSnapshotIndex(snapshotCapturedIndex);
255
256         previousSnapshotTerm = snapshotTerm;
257         setSnapshotTerm(snapshotCapturedTerm);
258     }
259
260     @Override
261     public void snapshotCommit(final boolean updateDataSize) {
262         snapshottedJournal = null;
263         previousSnapshotIndex = -1;
264         previousSnapshotTerm = -1;
265
266         if (updateDataSize) {
267             // need to recalc the datasize based on the entries left after precommit.
268             int newDataSize = 0;
269             for (ReplicatedLogEntry logEntry : journal) {
270                 newDataSize += logEntry.size();
271             }
272             LOG.trace("{}: Updated dataSize from {} to {}", logContext, dataSize, newDataSize);
273             dataSize = newDataSize;
274         }
275     }
276
277     @Override
278     public void snapshotRollback() {
279         snapshottedJournal.addAll(journal);
280         journal = snapshottedJournal;
281         snapshottedJournal = null;
282
283         snapshotIndex = previousSnapshotIndex;
284         previousSnapshotIndex = -1;
285
286         snapshotTerm = previousSnapshotTerm;
287         previousSnapshotTerm = -1;
288     }
289
290     @VisibleForTesting
291     ReplicatedLogEntry getAtPhysicalIndex(final int index) {
292         return journal.get(index);
293     }
294 }