Bug 2787: Batch AppendEntries to speed up follower sync
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / AbstractReplicatedLogImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
15
16 /**
17  * Abstract class handling the mapping of
18  * logical LogEntry Index and the physical list index.
19  */
20 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
21
22     // We define this as ArrayList so we can use ensureCapacity.
23     private ArrayList<ReplicatedLogEntry> journal;
24
25     private long snapshotIndex = -1;
26     private long snapshotTerm = -1;
27
28     // to be used for rollback during save snapshot failure
29     private ArrayList<ReplicatedLogEntry> snapshottedJournal;
30     private long previousSnapshotIndex = -1;
31     private long previousSnapshotTerm = -1;
32     private int dataSize = 0;
33
34     public AbstractReplicatedLogImpl(long snapshotIndex,
35         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
36         this.snapshotIndex = snapshotIndex;
37         this.snapshotTerm = snapshotTerm;
38         this.journal = new ArrayList<>(unAppliedEntries);
39
40         for(ReplicatedLogEntry entry: journal) {
41             dataSize += entry.size();
42         }
43     }
44
45     public AbstractReplicatedLogImpl() {
46         this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
47     }
48
49     protected int adjustedIndex(long logEntryIndex) {
50         if (snapshotIndex < 0) {
51             return (int) logEntryIndex;
52         }
53         return (int) (logEntryIndex - (snapshotIndex + 1));
54     }
55
56     @Override
57     public ReplicatedLogEntry get(long logEntryIndex) {
58         int adjustedIndex = adjustedIndex(logEntryIndex);
59
60         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
61             // physical index should be less than list size and >= 0
62             return null;
63         }
64
65         return journal.get(adjustedIndex);
66     }
67
68     @Override
69     public ReplicatedLogEntry last() {
70         if (journal.isEmpty()) {
71             return null;
72         }
73         // get the last entry directly from the physical index
74         return journal.get(journal.size() - 1);
75     }
76
77     @Override
78     public long lastIndex() {
79         if (journal.isEmpty()) {
80             // it can happen that after snapshot, all the entries of the
81             // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
82             return snapshotIndex;
83         }
84         return last().getIndex();
85     }
86
87     @Override
88     public long lastTerm() {
89         if (journal.isEmpty()) {
90             // it can happen that after snapshot, all the entries of the
91             // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
92             return snapshotTerm;
93         }
94         return last().getTerm();
95     }
96
97     @Override
98     public long removeFrom(long logEntryIndex) {
99         int adjustedIndex = adjustedIndex(logEntryIndex);
100         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
101             // physical index should be less than list size and >= 0
102             return -1;
103         }
104
105         for(int i = adjustedIndex; i < journal.size(); i++) {
106             dataSize -= journal.get(i).size();
107         }
108
109         journal.subList(adjustedIndex , journal.size()).clear();
110
111         return adjustedIndex;
112     }
113
114     @Override
115     public void append(ReplicatedLogEntry replicatedLogEntry) {
116         journal.add(replicatedLogEntry);
117         dataSize += replicatedLogEntry.size();
118     }
119
120     @Override
121     public void increaseJournalLogCapacity(int amount) {
122         journal.ensureCapacity(journal.size() + amount);
123     }
124
125     @Override
126     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
127         return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
128     }
129
130     @Override
131     public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int maxEntries, long maxDataSize) {
132         int adjustedIndex = adjustedIndex(logEntryIndex);
133         int size = journal.size();
134         if (adjustedIndex >= 0 && adjustedIndex < size) {
135             // physical index should be less than list size and >= 0
136             int maxIndex = adjustedIndex + maxEntries;
137             if(maxIndex > size){
138                 maxIndex = size;
139             }
140
141             if(maxDataSize == NO_MAX_SIZE) {
142                 return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
143             } else {
144                 List<ReplicatedLogEntry> retList = new ArrayList<>(maxIndex - adjustedIndex);
145                 long totalSize = 0;
146                 for(int i = adjustedIndex; i < maxIndex; i++) {
147                     ReplicatedLogEntry entry = journal.get(i);
148                     totalSize += entry.size();
149                     if(totalSize <= maxDataSize) {
150                         retList.add(entry);
151                     } else {
152                         if(retList.isEmpty()) {
153                             // Edge case - the first entry's size exceeds the threshold. We need to return
154                             // at least the first entry so add it here.
155                             retList.add(entry);
156                         }
157
158                         break;
159                     }
160                 }
161
162                 return retList;
163             }
164         } else {
165             return Collections.emptyList();
166         }
167     }
168
169     @Override
170     public long size() {
171        return journal.size();
172     }
173
174     @Override
175     public int dataSize() {
176         return dataSize;
177     }
178
179     @Override
180     public boolean isPresent(long logEntryIndex) {
181         if (logEntryIndex > lastIndex()) {
182             // if the request logical index is less than the last present in the list
183             return false;
184         }
185         int adjustedIndex = adjustedIndex(logEntryIndex);
186         return (adjustedIndex >= 0);
187     }
188
189     @Override
190     public boolean isInSnapshot(long logEntryIndex) {
191         return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
192     }
193
194     @Override
195     public long getSnapshotIndex() {
196         return snapshotIndex;
197     }
198
199     @Override
200     public long getSnapshotTerm() {
201         return snapshotTerm;
202     }
203
204     @Override
205     public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
206
207     @Override
208     public abstract void removeFromAndPersist(long index);
209
210     @Override
211     public void setSnapshotIndex(long snapshotIndex) {
212         this.snapshotIndex = snapshotIndex;
213     }
214
215     @Override
216     public void setSnapshotTerm(long snapshotTerm) {
217         this.snapshotTerm = snapshotTerm;
218     }
219
220     @Override
221     public void clear(int startIndex, int endIndex) {
222         journal.subList(startIndex, endIndex).clear();
223     }
224
225     @Override
226     public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
227         Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
228                 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
229
230         snapshottedJournal = new ArrayList<>(journal.size());
231
232         List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
233
234         snapshottedJournal.addAll(snapshotJournalEntries);
235         snapshotJournalEntries.clear();
236
237         previousSnapshotIndex = snapshotIndex;
238         setSnapshotIndex(snapshotCapturedIndex);
239
240         previousSnapshotTerm = snapshotTerm;
241         setSnapshotTerm(snapshotCapturedTerm);
242     }
243
244     @Override
245     public void snapshotCommit() {
246         snapshottedJournal = null;
247         previousSnapshotIndex = -1;
248         previousSnapshotTerm = -1;
249         dataSize = 0;
250         // need to recalc the datasize based on the entries left after precommit.
251         for(ReplicatedLogEntry logEntry : journal) {
252             dataSize += logEntry.size();
253         }
254
255     }
256
257     @Override
258     public void snapshotRollback() {
259         snapshottedJournal.addAll(journal);
260         journal = snapshottedJournal;
261         snapshottedJournal = null;
262
263         snapshotIndex = previousSnapshotIndex;
264         previousSnapshotIndex = -1;
265
266         snapshotTerm = previousSnapshotTerm;
267         previousSnapshotTerm = -1;
268     }
269
270     @VisibleForTesting
271     ReplicatedLogEntry getAtPhysicalIndex(int index) {
272         return journal.get(index);
273     }
274 }