Bug 8606: Continue leadership transfer on pauseLeader timeout
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / AbstractReplicatedLogImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 /**
19  * Abstract class handling the mapping of
20  * logical LogEntry Index and the physical list index.
21  */
22 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
23     private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
24
25     private final String logContext;
26
27     // We define this as ArrayList so we can use ensureCapacity.
28     private ArrayList<ReplicatedLogEntry> journal;
29
30     private long snapshotIndex = -1;
31     private long snapshotTerm = -1;
32
33     // to be used for rollback during save snapshot failure
34     private ArrayList<ReplicatedLogEntry> snapshottedJournal;
35     private long previousSnapshotIndex = -1;
36     private long previousSnapshotTerm = -1;
37     private int dataSize = 0;
38
39     protected AbstractReplicatedLogImpl(long snapshotIndex, long snapshotTerm,
40             List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
41         this.snapshotIndex = snapshotIndex;
42         this.snapshotTerm = snapshotTerm;
43         this.logContext = logContext;
44
45         this.journal = new ArrayList<>(unAppliedEntries.size());
46         for (ReplicatedLogEntry entry: unAppliedEntries) {
47             append(entry);
48         }
49     }
50
51     protected AbstractReplicatedLogImpl() {
52         this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), "");
53     }
54
55     protected int adjustedIndex(long logEntryIndex) {
56         if (snapshotIndex < 0) {
57             return (int) logEntryIndex;
58         }
59         return (int) (logEntryIndex - (snapshotIndex + 1));
60     }
61
62     @Override
63     public ReplicatedLogEntry get(long logEntryIndex) {
64         int adjustedIndex = adjustedIndex(logEntryIndex);
65
66         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
67             // physical index should be less than list size and >= 0
68             return null;
69         }
70
71         return journal.get(adjustedIndex);
72     }
73
74     @Override
75     public ReplicatedLogEntry last() {
76         if (journal.isEmpty()) {
77             return null;
78         }
79         // get the last entry directly from the physical index
80         return journal.get(journal.size() - 1);
81     }
82
83     @Override
84     public long lastIndex() {
85         if (journal.isEmpty()) {
86             // it can happen that after snapshot, all the entries of the
87             // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
88             return snapshotIndex;
89         }
90         return last().getIndex();
91     }
92
93     @Override
94     public long lastTerm() {
95         if (journal.isEmpty()) {
96             // it can happen that after snapshot, all the entries of the
97             // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
98             return snapshotTerm;
99         }
100         return last().getTerm();
101     }
102
103     @Override
104     public long removeFrom(long logEntryIndex) {
105         int adjustedIndex = adjustedIndex(logEntryIndex);
106         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
107             // physical index should be less than list size and >= 0
108             return -1;
109         }
110
111         for (int i = adjustedIndex; i < journal.size(); i++) {
112             dataSize -= journal.get(i).size();
113         }
114
115         journal.subList(adjustedIndex , journal.size()).clear();
116
117         return adjustedIndex;
118     }
119
120     @Override
121     public boolean append(ReplicatedLogEntry replicatedLogEntry) {
122         if (replicatedLogEntry.getIndex() > lastIndex()) {
123             journal.add(replicatedLogEntry);
124             dataSize += replicatedLogEntry.size();
125             return true;
126         } else {
127             LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}",
128                     logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace"));
129             return false;
130         }
131     }
132
133     @Override
134     public void increaseJournalLogCapacity(int amount) {
135         journal.ensureCapacity(journal.size() + amount);
136     }
137
138     @Override
139     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
140         return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
141     }
142
143     @Override
144     public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int maxEntries, long maxDataSize) {
145         int adjustedIndex = adjustedIndex(logEntryIndex);
146         int size = journal.size();
147         if (adjustedIndex >= 0 && adjustedIndex < size) {
148             // physical index should be less than list size and >= 0
149             int maxIndex = adjustedIndex + maxEntries;
150             if (maxIndex > size) {
151                 maxIndex = size;
152             }
153
154             if (maxDataSize == NO_MAX_SIZE) {
155                 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
156             } else {
157                 return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
158             }
159         } else {
160             return Collections.emptyList();
161         }
162     }
163
164     private List<ReplicatedLogEntry> copyJournalEntries(int fromIndex, int toIndex, long maxDataSize) {
165         List<ReplicatedLogEntry> retList = new ArrayList<>(toIndex - fromIndex);
166         long totalSize = 0;
167         for (int i = fromIndex; i < toIndex; i++) {
168             ReplicatedLogEntry entry = journal.get(i);
169             totalSize += entry.size();
170             if (totalSize <= maxDataSize) {
171                 retList.add(entry);
172             } else {
173                 if (retList.isEmpty()) {
174                     // Edge case - the first entry's size exceeds the threshold. We need to return
175                     // at least the first entry so add it here.
176                     retList.add(entry);
177                 }
178
179                 break;
180             }
181         }
182
183         return retList;
184     }
185
186     @Override
187     public long size() {
188         return journal.size();
189     }
190
191     @Override
192     public int dataSize() {
193         return dataSize;
194     }
195
196     @Override
197     public boolean isPresent(long logEntryIndex) {
198         if (logEntryIndex > lastIndex()) {
199             // if the request logical index is less than the last present in the list
200             return false;
201         }
202         int adjustedIndex = adjustedIndex(logEntryIndex);
203         return adjustedIndex >= 0;
204     }
205
206     @Override
207     public boolean isInSnapshot(long logEntryIndex) {
208         return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
209     }
210
211     @Override
212     public long getSnapshotIndex() {
213         return snapshotIndex;
214     }
215
216     @Override
217     public long getSnapshotTerm() {
218         return snapshotTerm;
219     }
220
221     @Override
222     public void setSnapshotIndex(long snapshotIndex) {
223         this.snapshotIndex = snapshotIndex;
224     }
225
226     @Override
227     public void setSnapshotTerm(long snapshotTerm) {
228         this.snapshotTerm = snapshotTerm;
229     }
230
231     @Override
232     public void clear(int startIndex, int endIndex) {
233         journal.subList(startIndex, endIndex).clear();
234     }
235
236     @Override
237     public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
238         Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
239                 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
240
241         snapshottedJournal = new ArrayList<>(journal.size());
242
243         List<ReplicatedLogEntry> snapshotJournalEntries =
244                 journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
245
246         snapshottedJournal.addAll(snapshotJournalEntries);
247         snapshotJournalEntries.clear();
248
249         previousSnapshotIndex = snapshotIndex;
250         setSnapshotIndex(snapshotCapturedIndex);
251
252         previousSnapshotTerm = snapshotTerm;
253         setSnapshotTerm(snapshotCapturedTerm);
254     }
255
256     @Override
257     public void snapshotCommit() {
258         snapshottedJournal = null;
259         previousSnapshotIndex = -1;
260         previousSnapshotTerm = -1;
261         dataSize = 0;
262         // need to recalc the datasize based on the entries left after precommit.
263         for (ReplicatedLogEntry logEntry : journal) {
264             dataSize += logEntry.size();
265         }
266
267     }
268
269     @Override
270     public void snapshotRollback() {
271         snapshottedJournal.addAll(journal);
272         journal = snapshottedJournal;
273         snapshottedJournal = null;
274
275         snapshotIndex = previousSnapshotIndex;
276         previousSnapshotIndex = -1;
277
278         snapshotTerm = previousSnapshotTerm;
279         previousSnapshotTerm = -1;
280     }
281
282     @VisibleForTesting
283     ReplicatedLogEntry getAtPhysicalIndex(int index) {
284         return journal.get(index);
285     }
286 }