Bug 1831 Batch messages on journal recovery
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / AbstractReplicatedLogImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import com.google.protobuf.ByteString;
11
12 import java.util.ArrayList;
13 import java.util.List;
14
15 /**
16  * Abstract class handling the mapping of
17  * logical LogEntry Index and the physical list index.
18  */
19 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
20
21     // We define this as ArrayList so we can use ensureCapacity.
22     protected ArrayList<ReplicatedLogEntry> journal;
23     protected ByteString snapshot;
24     protected long snapshotIndex = -1;
25     protected long snapshotTerm = -1;
26
27     // to be used for rollback during save snapshot failure
28     protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
29     protected ByteString previousSnapshot;
30     protected long previousSnapshotIndex = -1;
31     protected long previousSnapshotTerm = -1;
32
33     public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
34         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
35         this.snapshot = state;
36         this.snapshotIndex = snapshotIndex;
37         this.snapshotTerm = snapshotTerm;
38         this.journal = new ArrayList<>(unAppliedEntries);
39     }
40
41
42     public AbstractReplicatedLogImpl() {
43         this.snapshot = null;
44         this.journal = new ArrayList<>();
45     }
46
47     protected int adjustedIndex(long logEntryIndex) {
48         if(snapshotIndex < 0){
49             return (int) logEntryIndex;
50         }
51         return (int) (logEntryIndex - (snapshotIndex + 1));
52     }
53
54     @Override
55     public ReplicatedLogEntry get(long logEntryIndex) {
56         int adjustedIndex = adjustedIndex(logEntryIndex);
57
58         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
59             // physical index should be less than list size and >= 0
60             return null;
61         }
62
63         return journal.get(adjustedIndex);
64     }
65
66     @Override
67     public ReplicatedLogEntry last() {
68         if (journal.isEmpty()) {
69             return null;
70         }
71         // get the last entry directly from the physical index
72         return journal.get(journal.size() - 1);
73     }
74
75     @Override
76     public long lastIndex() {
77         if (journal.isEmpty()) {
78             // it can happen that after snapshot, all the entries of the
79             // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
80             return snapshotIndex;
81         }
82         return last().getIndex();
83     }
84
85     @Override
86     public long lastTerm() {
87         if (journal.isEmpty()) {
88             // it can happen that after snapshot, all the entries of the
89             // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
90             return snapshotTerm;
91         }
92         return last().getTerm();
93     }
94
95     @Override
96     public void removeFrom(long logEntryIndex) {
97         int adjustedIndex = adjustedIndex(logEntryIndex);
98         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
99             // physical index should be less than list size and >= 0
100             return;
101         }
102         journal.subList(adjustedIndex , journal.size()).clear();
103     }
104
105     @Override
106     public void append(ReplicatedLogEntry replicatedLogEntry) {
107         journal.add(replicatedLogEntry);
108     }
109
110     @Override
111     public void increaseJournalLogCapacity(int amount) {
112         journal.ensureCapacity(journal.size() + amount);
113     }
114
115     @Override
116     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
117         return getFrom(logEntryIndex, journal.size());
118     }
119
120     @Override
121     public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
122         int adjustedIndex = adjustedIndex(logEntryIndex);
123         int size = journal.size();
124         List<ReplicatedLogEntry> entries = new ArrayList<>(100);
125         if (adjustedIndex >= 0 && adjustedIndex < size) {
126             // physical index should be less than list size and >= 0
127             int maxIndex = adjustedIndex + max;
128             if(maxIndex > size){
129                 maxIndex = size;
130             }
131             entries.addAll(journal.subList(adjustedIndex, maxIndex));
132         }
133         return entries;
134     }
135
136
137     @Override
138     public long size() {
139        return journal.size();
140     }
141
142     @Override
143     public boolean isPresent(long logEntryIndex) {
144         if (logEntryIndex > lastIndex()) {
145             // if the request logical index is less than the last present in the list
146             return false;
147         }
148         int adjustedIndex = adjustedIndex(logEntryIndex);
149         return (adjustedIndex >= 0);
150     }
151
152     @Override
153     public boolean isInSnapshot(long logEntryIndex) {
154         return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
155     }
156
157     @Override
158     public ByteString getSnapshot() {
159         return snapshot;
160     }
161
162     @Override
163     public long getSnapshotIndex() {
164         return snapshotIndex;
165     }
166
167     @Override
168     public long getSnapshotTerm() {
169         return snapshotTerm;
170     }
171
172     @Override
173     public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
174
175     @Override
176     public abstract void removeFromAndPersist(long index);
177
178     @Override
179     public void setSnapshotIndex(long snapshotIndex) {
180         this.snapshotIndex = snapshotIndex;
181     }
182
183     @Override
184     public void setSnapshotTerm(long snapshotTerm) {
185         this.snapshotTerm = snapshotTerm;
186     }
187
188     @Override
189     public void setSnapshot(ByteString snapshot) {
190         this.snapshot = snapshot;
191     }
192
193     @Override
194     public void clear(int startIndex, int endIndex) {
195         journal.subList(startIndex, endIndex).clear();
196     }
197
198     @Override
199     public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
200         snapshottedJournal = new ArrayList<>(journal.size());
201
202         snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
203         clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
204
205         previousSnapshotIndex = snapshotIndex;
206         setSnapshotIndex(snapshotCapturedIndex);
207
208         previousSnapshotTerm = snapshotTerm;
209         setSnapshotTerm(snapshotCapturedTerm);
210
211         previousSnapshot = getSnapshot();
212         setSnapshot(snapshot);
213     }
214
215     @Override
216     public void snapshotCommit() {
217         snapshottedJournal = null;
218         previousSnapshotIndex = -1;
219         previousSnapshotTerm = -1;
220         previousSnapshot = null;
221     }
222
223     @Override
224     public void snapshotRollback() {
225         snapshottedJournal.addAll(journal);
226         journal = snapshottedJournal;
227         snapshottedJournal = null;
228
229         snapshotIndex = previousSnapshotIndex;
230         previousSnapshotIndex = -1;
231
232         snapshotTerm = previousSnapshotTerm;
233         previousSnapshotTerm = -1;
234
235         snapshot = previousSnapshot;
236         previousSnapshot = null;
237
238     }
239 }