Bug 6587: Retain state when transitioning between Leader and IsolatedLeader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / AbstractReplicatedLogImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 /**
19  * Abstract class handling the mapping of
20  * logical LogEntry Index and the physical list index.
21  */
22 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
23     private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
24
25     private final String logContext;
26
27     // We define this as ArrayList so we can use ensureCapacity.
28     private ArrayList<ReplicatedLogEntry> journal;
29
30     private long snapshotIndex = -1;
31     private long snapshotTerm = -1;
32
33     // to be used for rollback during save snapshot failure
34     private ArrayList<ReplicatedLogEntry> snapshottedJournal;
35     private long previousSnapshotIndex = -1;
36     private long previousSnapshotTerm = -1;
37     private int dataSize = 0;
38
39     public AbstractReplicatedLogImpl(long snapshotIndex,
40         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
41         this.snapshotIndex = snapshotIndex;
42         this.snapshotTerm = snapshotTerm;
43         this.logContext = logContext;
44
45         this.journal = new ArrayList<>(unAppliedEntries.size());
46         for(ReplicatedLogEntry entry: unAppliedEntries) {
47             append(entry);
48         }
49     }
50
51     public AbstractReplicatedLogImpl() {
52         this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), "");
53     }
54
55     protected int adjustedIndex(long logEntryIndex) {
56         if (snapshotIndex < 0) {
57             return (int) logEntryIndex;
58         }
59         return (int) (logEntryIndex - (snapshotIndex + 1));
60     }
61
62     @Override
63     public ReplicatedLogEntry get(long logEntryIndex) {
64         int adjustedIndex = adjustedIndex(logEntryIndex);
65
66         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
67             // physical index should be less than list size and >= 0
68             return null;
69         }
70
71         return journal.get(adjustedIndex);
72     }
73
74     @Override
75     public ReplicatedLogEntry last() {
76         if (journal.isEmpty()) {
77             return null;
78         }
79         // get the last entry directly from the physical index
80         return journal.get(journal.size() - 1);
81     }
82
83     @Override
84     public long lastIndex() {
85         if (journal.isEmpty()) {
86             // it can happen that after snapshot, all the entries of the
87             // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
88             return snapshotIndex;
89         }
90         return last().getIndex();
91     }
92
93     @Override
94     public long lastTerm() {
95         if (journal.isEmpty()) {
96             // it can happen that after snapshot, all the entries of the
97             // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
98             return snapshotTerm;
99         }
100         return last().getTerm();
101     }
102
103     @Override
104     public long removeFrom(long logEntryIndex) {
105         int adjustedIndex = adjustedIndex(logEntryIndex);
106         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
107             // physical index should be less than list size and >= 0
108             return -1;
109         }
110
111         for(int i = adjustedIndex; i < journal.size(); i++) {
112             dataSize -= journal.get(i).size();
113         }
114
115         journal.subList(adjustedIndex , journal.size()).clear();
116
117         return adjustedIndex;
118     }
119
120     @Override
121     public boolean append(ReplicatedLogEntry replicatedLogEntry) {
122         if(replicatedLogEntry.getIndex() > lastIndex()) {
123             journal.add(replicatedLogEntry);
124             dataSize += replicatedLogEntry.size();
125             return true;
126         } else {
127             LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}",
128                     logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace"));
129             return false;
130         }
131     }
132
133     @Override
134     public void increaseJournalLogCapacity(int amount) {
135         journal.ensureCapacity(journal.size() + amount);
136     }
137
138     @Override
139     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
140         return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
141     }
142
143     @Override
144     public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int maxEntries, long maxDataSize) {
145         int adjustedIndex = adjustedIndex(logEntryIndex);
146         int size = journal.size();
147         if (adjustedIndex >= 0 && adjustedIndex < size) {
148             // physical index should be less than list size and >= 0
149             int maxIndex = adjustedIndex + maxEntries;
150             if(maxIndex > size){
151                 maxIndex = size;
152             }
153
154             if(maxDataSize == NO_MAX_SIZE) {
155                 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
156             } else {
157                 List<ReplicatedLogEntry> retList = new ArrayList<>(maxIndex - adjustedIndex);
158                 long totalSize = 0;
159                 for(int i = adjustedIndex; i < maxIndex; i++) {
160                     ReplicatedLogEntry entry = journal.get(i);
161                     totalSize += entry.size();
162                     if(totalSize <= maxDataSize) {
163                         retList.add(entry);
164                     } else {
165                         if(retList.isEmpty()) {
166                             // Edge case - the first entry's size exceeds the threshold. We need to return
167                             // at least the first entry so add it here.
168                             retList.add(entry);
169                         }
170
171                         break;
172                     }
173                 }
174
175                 return retList;
176             }
177         } else {
178             return Collections.emptyList();
179         }
180     }
181
182     @Override
183     public long size() {
184        return journal.size();
185     }
186
187     @Override
188     public int dataSize() {
189         return dataSize;
190     }
191
192     @Override
193     public boolean isPresent(long logEntryIndex) {
194         if (logEntryIndex > lastIndex()) {
195             // if the request logical index is less than the last present in the list
196             return false;
197         }
198         int adjustedIndex = adjustedIndex(logEntryIndex);
199         return (adjustedIndex >= 0);
200     }
201
202     @Override
203     public boolean isInSnapshot(long logEntryIndex) {
204         return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
205     }
206
207     @Override
208     public long getSnapshotIndex() {
209         return snapshotIndex;
210     }
211
212     @Override
213     public long getSnapshotTerm() {
214         return snapshotTerm;
215     }
216
217     @Override
218     public void setSnapshotIndex(long snapshotIndex) {
219         this.snapshotIndex = snapshotIndex;
220     }
221
222     @Override
223     public void setSnapshotTerm(long snapshotTerm) {
224         this.snapshotTerm = snapshotTerm;
225     }
226
227     @Override
228     public void clear(int startIndex, int endIndex) {
229         journal.subList(startIndex, endIndex).clear();
230     }
231
232     @Override
233     public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
234         Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
235                 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
236
237         snapshottedJournal = new ArrayList<>(journal.size());
238
239         List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
240
241         snapshottedJournal.addAll(snapshotJournalEntries);
242         snapshotJournalEntries.clear();
243
244         previousSnapshotIndex = snapshotIndex;
245         setSnapshotIndex(snapshotCapturedIndex);
246
247         previousSnapshotTerm = snapshotTerm;
248         setSnapshotTerm(snapshotCapturedTerm);
249     }
250
251     @Override
252     public void snapshotCommit() {
253         snapshottedJournal = null;
254         previousSnapshotIndex = -1;
255         previousSnapshotTerm = -1;
256         dataSize = 0;
257         // need to recalc the datasize based on the entries left after precommit.
258         for(ReplicatedLogEntry logEntry : journal) {
259             dataSize += logEntry.size();
260         }
261
262     }
263
264     @Override
265     public void snapshotRollback() {
266         snapshottedJournal.addAll(journal);
267         journal = snapshottedJournal;
268         snapshottedJournal = null;
269
270         snapshotIndex = previousSnapshotIndex;
271         previousSnapshotIndex = -1;
272
273         snapshotTerm = previousSnapshotTerm;
274         previousSnapshotTerm = -1;
275     }
276
277     @VisibleForTesting
278     ReplicatedLogEntry getAtPhysicalIndex(int index) {
279         return journal.get(index);
280     }
281 }