Install snapshot and Reply
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / AbstractReplicatedLogImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import com.google.protobuf.ByteString;
11
12 import java.util.ArrayList;
13 import java.util.List;
14
15 /**
16  * Abstract class handling the mapping of
17  * logical LogEntry Index and the physical list index.
18  */
19 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
20
21     protected List<ReplicatedLogEntry> journal;
22     protected ByteString snapshot;
23     protected long snapshotIndex = -1;
24     protected long snapshotTerm = -1;
25
26     // to be used for rollback during save snapshot failure
27     protected List<ReplicatedLogEntry> snapshottedJournal;
28     protected ByteString previousSnapshot;
29     protected long previousSnapshotIndex = -1;
30     protected long previousSnapshotTerm = -1;
31
32     public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
33         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
34         this.snapshot = state;
35         this.snapshotIndex = snapshotIndex;
36         this.snapshotTerm = snapshotTerm;
37         this.journal = new ArrayList<>(unAppliedEntries);
38     }
39
40
41     public AbstractReplicatedLogImpl() {
42         this.snapshot = null;
43         this.journal = new ArrayList<>();
44     }
45
46     protected int adjustedIndex(long logEntryIndex) {
47         if(snapshotIndex < 0){
48             return (int) logEntryIndex;
49         }
50         return (int) (logEntryIndex - (snapshotIndex + 1));
51     }
52
53     @Override
54     public ReplicatedLogEntry get(long logEntryIndex) {
55         int adjustedIndex = adjustedIndex(logEntryIndex);
56
57         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
58             // physical index should be less than list size and >= 0
59             return null;
60         }
61
62         return journal.get(adjustedIndex);
63     }
64
65     @Override
66     public ReplicatedLogEntry last() {
67         if (journal.isEmpty()) {
68             return null;
69         }
70         // get the last entry directly from the physical index
71         return journal.get(journal.size() - 1);
72     }
73
74     @Override
75     public long lastIndex() {
76         if (journal.isEmpty()) {
77             // it can happen that after snapshot, all the entries of the
78             // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
79             return snapshotIndex;
80         }
81         return last().getIndex();
82     }
83
84     @Override
85     public long lastTerm() {
86         if (journal.isEmpty()) {
87             // it can happen that after snapshot, all the entries of the
88             // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
89             return snapshotTerm;
90         }
91         return last().getTerm();
92     }
93
94     @Override
95     public void removeFrom(long logEntryIndex) {
96         int adjustedIndex = adjustedIndex(logEntryIndex);
97         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
98             // physical index should be less than list size and >= 0
99             return;
100         }
101         journal.subList(adjustedIndex , journal.size()).clear();
102     }
103
104     @Override
105     public void append(ReplicatedLogEntry replicatedLogEntry) {
106         journal.add(replicatedLogEntry);
107     }
108
109     @Override
110     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
111         return getFrom(logEntryIndex, journal.size());
112     }
113
114     @Override
115     public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
116         int adjustedIndex = adjustedIndex(logEntryIndex);
117         int size = journal.size();
118         List<ReplicatedLogEntry> entries = new ArrayList<>(100);
119         if (adjustedIndex >= 0 && adjustedIndex < size) {
120             // physical index should be less than list size and >= 0
121             int maxIndex = adjustedIndex + max;
122             if(maxIndex > size){
123                 maxIndex = size;
124             }
125             entries.addAll(journal.subList(adjustedIndex, maxIndex));
126         }
127         return entries;
128     }
129
130
131     @Override
132     public long size() {
133        return journal.size();
134     }
135
136     @Override
137     public boolean isPresent(long logEntryIndex) {
138         if (logEntryIndex > lastIndex()) {
139             // if the request logical index is less than the last present in the list
140             return false;
141         }
142         int adjustedIndex = adjustedIndex(logEntryIndex);
143         return (adjustedIndex >= 0);
144     }
145
146     @Override
147     public boolean isInSnapshot(long logEntryIndex) {
148         return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
149     }
150
151     @Override
152     public ByteString getSnapshot() {
153         return snapshot;
154     }
155
156     @Override
157     public long getSnapshotIndex() {
158         return snapshotIndex;
159     }
160
161     @Override
162     public long getSnapshotTerm() {
163         return snapshotTerm;
164     }
165
166     @Override
167     public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
168
169     @Override
170     public abstract void removeFromAndPersist(long index);
171
172     @Override
173     public void setSnapshotIndex(long snapshotIndex) {
174         this.snapshotIndex = snapshotIndex;
175     }
176
177     @Override
178     public void setSnapshotTerm(long snapshotTerm) {
179         this.snapshotTerm = snapshotTerm;
180     }
181
182     @Override
183     public void setSnapshot(ByteString snapshot) {
184         this.snapshot = snapshot;
185     }
186
187     @Override
188     public void clear(int startIndex, int endIndex) {
189         journal.subList(startIndex, endIndex).clear();
190     }
191
192     @Override
193     public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
194         snapshottedJournal = new ArrayList<>(journal.size());
195
196         snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
197         clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
198
199         previousSnapshotIndex = snapshotIndex;
200         setSnapshotIndex(snapshotCapturedIndex);
201
202         previousSnapshotTerm = snapshotTerm;
203         setSnapshotTerm(snapshotCapturedTerm);
204
205         previousSnapshot = getSnapshot();
206         setSnapshot(snapshot);
207     }
208
209     @Override
210     public void snapshotCommit() {
211         snapshottedJournal.clear();
212         snapshottedJournal = null;
213         previousSnapshotIndex = -1;
214         previousSnapshotTerm = -1;
215         previousSnapshot = null;
216     }
217
218     @Override
219     public void snapshotRollback() {
220         snapshottedJournal.addAll(journal);
221         journal.clear();
222         journal = snapshottedJournal;
223         snapshottedJournal = null;
224
225         snapshotIndex = previousSnapshotIndex;
226         previousSnapshotIndex = -1;
227
228         snapshotTerm = previousSnapshotTerm;
229         previousSnapshotTerm = -1;
230
231         snapshot = previousSnapshot;
232         previousSnapshot = null;
233
234     }
235 }