Merge "BUG 2676 : Use transaction-dispatcher for ShardTransaction"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / AbstractReplicatedLogImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import com.google.common.base.Preconditions;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14
15 /**
16  * Abstract class handling the mapping of
17  * logical LogEntry Index and the physical list index.
18  */
19 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
20
21     // We define this as ArrayList so we can use ensureCapacity.
22     protected ArrayList<ReplicatedLogEntry> journal;
23
24     private long snapshotIndex = -1;
25     private long snapshotTerm = -1;
26
27     // to be used for rollback during save snapshot failure
28     private ArrayList<ReplicatedLogEntry> snapshottedJournal;
29     private long previousSnapshotIndex = -1;
30     private long previousSnapshotTerm = -1;
31     protected int dataSize = 0;
32
33     public AbstractReplicatedLogImpl(long snapshotIndex,
34         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
35         this.snapshotIndex = snapshotIndex;
36         this.snapshotTerm = snapshotTerm;
37         this.journal = new ArrayList<>(unAppliedEntries);
38     }
39
40     public AbstractReplicatedLogImpl() {
41         this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
42     }
43
44     protected int adjustedIndex(long logEntryIndex) {
45         if (snapshotIndex < 0) {
46             return (int) logEntryIndex;
47         }
48         return (int) (logEntryIndex - (snapshotIndex + 1));
49     }
50
51     @Override
52     public ReplicatedLogEntry get(long logEntryIndex) {
53         int adjustedIndex = adjustedIndex(logEntryIndex);
54
55         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
56             // physical index should be less than list size and >= 0
57             return null;
58         }
59
60         return journal.get(adjustedIndex);
61     }
62
63     @Override
64     public ReplicatedLogEntry last() {
65         if (journal.isEmpty()) {
66             return null;
67         }
68         // get the last entry directly from the physical index
69         return journal.get(journal.size() - 1);
70     }
71
72     @Override
73     public long lastIndex() {
74         if (journal.isEmpty()) {
75             // it can happen that after snapshot, all the entries of the
76             // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
77             return snapshotIndex;
78         }
79         return last().getIndex();
80     }
81
82     @Override
83     public long lastTerm() {
84         if (journal.isEmpty()) {
85             // it can happen that after snapshot, all the entries of the
86             // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
87             return snapshotTerm;
88         }
89         return last().getTerm();
90     }
91
92     @Override
93     public void removeFrom(long logEntryIndex) {
94         int adjustedIndex = adjustedIndex(logEntryIndex);
95         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
96             // physical index should be less than list size and >= 0
97             return;
98         }
99         journal.subList(adjustedIndex , journal.size()).clear();
100     }
101
102     @Override
103     public void append(ReplicatedLogEntry replicatedLogEntry) {
104         journal.add(replicatedLogEntry);
105     }
106
107     @Override
108     public void increaseJournalLogCapacity(int amount) {
109         journal.ensureCapacity(journal.size() + amount);
110     }
111
112     @Override
113     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
114         return getFrom(logEntryIndex, journal.size());
115     }
116
117     @Override
118     public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
119         int adjustedIndex = adjustedIndex(logEntryIndex);
120         int size = journal.size();
121         if (adjustedIndex >= 0 && adjustedIndex < size) {
122             // physical index should be less than list size and >= 0
123             int maxIndex = adjustedIndex + max;
124             if(maxIndex > size){
125                 maxIndex = size;
126             }
127             return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
128         } else {
129             return Collections.emptyList();
130         }
131     }
132
133     @Override
134     public long size() {
135        return journal.size();
136     }
137
138     @Override
139     public int dataSize() {
140         return dataSize;
141     }
142
143     @Override
144     public boolean isPresent(long logEntryIndex) {
145         if (logEntryIndex > lastIndex()) {
146             // if the request logical index is less than the last present in the list
147             return false;
148         }
149         int adjustedIndex = adjustedIndex(logEntryIndex);
150         return (adjustedIndex >= 0);
151     }
152
153     @Override
154     public boolean isInSnapshot(long logEntryIndex) {
155         return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
156     }
157
158     @Override
159     public long getSnapshotIndex() {
160         return snapshotIndex;
161     }
162
163     @Override
164     public long getSnapshotTerm() {
165         return snapshotTerm;
166     }
167
168     @Override
169     public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
170
171     @Override
172     public abstract void removeFromAndPersist(long index);
173
174     @Override
175     public void setSnapshotIndex(long snapshotIndex) {
176         this.snapshotIndex = snapshotIndex;
177     }
178
179     @Override
180     public void setSnapshotTerm(long snapshotTerm) {
181         this.snapshotTerm = snapshotTerm;
182     }
183
184     @Override
185     public void clear(int startIndex, int endIndex) {
186         journal.subList(startIndex, endIndex).clear();
187     }
188
189     @Override
190     public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
191         Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
192                 "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
193
194         snapshottedJournal = new ArrayList<>(journal.size());
195
196         List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
197
198         snapshottedJournal.addAll(snapshotJournalEntries);
199         clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
200
201         previousSnapshotIndex = snapshotIndex;
202         setSnapshotIndex(snapshotCapturedIndex);
203
204         previousSnapshotTerm = snapshotTerm;
205         setSnapshotTerm(snapshotCapturedTerm);
206     }
207
208     @Override
209     public void snapshotCommit() {
210         snapshottedJournal = null;
211         previousSnapshotIndex = -1;
212         previousSnapshotTerm = -1;
213         dataSize = 0;
214         // need to recalc the datasize based on the entries left after precommit.
215         for(ReplicatedLogEntry logEntry : journal) {
216             dataSize += logEntry.size();
217         }
218
219     }
220
221     @Override
222     public void snapshotRollback() {
223         snapshottedJournal.addAll(journal);
224         journal = snapshottedJournal;
225         snapshottedJournal = null;
226
227         snapshotIndex = previousSnapshotIndex;
228         previousSnapshotIndex = -1;
229
230         snapshotTerm = previousSnapshotTerm;
231         previousSnapshotTerm = -1;
232     }
233 }