Merge "Refactor snapshot code"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.japi.Procedure;
12 import akka.persistence.SnapshotSelectionCriteria;
13 import com.google.protobuf.ByteString;
14 import org.opendaylight.controller.cluster.DataPersistenceProvider;
15 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
16 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
17 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
18 import org.slf4j.Logger;
19
20 public class SnapshotManager implements SnapshotState {
21
22
23     private final SnapshotState IDLE = new Idle();
24     private final SnapshotState CAPTURING = new Capturing();
25     private final SnapshotState PERSISTING = new Persisting();
26     private final SnapshotState CREATING = new Creating();
27
28     private final Logger LOG;
29     private final RaftActorContext context;
30     private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
31             new LastAppliedTermInformationReader();
32     private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader =
33             new ReplicatedToAllTermInformationReader();
34
35
36     private SnapshotState currentState = IDLE;
37     private CaptureSnapshot captureSnapshot;
38
39     public SnapshotManager(RaftActorContext context, Logger logger) {
40         this.context = context;
41         this.LOG = logger;
42     }
43
44     @Override
45     public boolean isCapturing() {
46         return currentState.isCapturing();
47     }
48
49     @Override
50     public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
51         currentState.captureToInstall(lastLogEntry, replicatedToAllIndex);
52     }
53
54     @Override
55     public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
56         currentState.capture(lastLogEntry, replicatedToAllIndex);
57     }
58
59     @Override
60     public void create(Procedure<Void> callback) {
61         currentState.create(callback);
62     }
63
64     @Override
65     public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
66         currentState.persist(persistenceProvider, snapshotBytes, currentBehavior);
67     }
68
69     @Override
70     public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
71         currentState.commit(persistenceProvider, sequenceNumber);
72     }
73
74     @Override
75     public void rollback() {
76         currentState.rollback();
77     }
78
79     @Override
80     public long trimLog(long desiredTrimIndex) {
81         return currentState.trimLog(desiredTrimIndex);
82     }
83
84     private boolean hasFollowers(){
85         return context.getPeerAddresses().keySet().size() > 0;
86     }
87
88     private String persistenceId(){
89         return context.getId();
90     }
91
92     private class AbstractSnapshotState implements SnapshotState {
93
94         @Override
95         public boolean isCapturing() {
96             return false;
97         }
98
99         @Override
100         public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
101             LOG.debug("capture should not be called in state {}", this);
102         }
103
104         @Override
105         public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
106             LOG.debug("captureToInstall should not be called in state {}", this);
107         }
108
109         @Override
110         public void create(Procedure<Void> callback) {
111             LOG.debug("create should not be called in state {}", this);
112         }
113
114         @Override
115         public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
116             LOG.debug("persist should not be called in state {}", this);
117         }
118
119         @Override
120         public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
121             LOG.debug("commit should not be called in state {}", this);
122         }
123
124         @Override
125         public void rollback() {
126             LOG.debug("rollback should not be called in state {}", this);
127         }
128
129         @Override
130         public long trimLog(long desiredTrimIndex) {
131             LOG.debug("trimLog should not be called in state {}", this);
132             return -1;
133         }
134
135         protected long doTrimLog(long desiredTrimIndex){
136             //  we would want to keep the lastApplied as its used while capturing snapshots
137             long lastApplied = context.getLastApplied();
138             long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
139
140             if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
141                 LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
142                         context.getTermInformation().getCurrentTerm());
143
144                 //use the term of the temp-min, since we check for isPresent, entry will not be null
145                 ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
146                 context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
147                 context.getReplicatedLog().snapshotCommit();
148                 return tempMin;
149             }
150
151             return -1;
152         }
153     }
154
155     private class Idle extends AbstractSnapshotState {
156
157         private void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, boolean toInstall) {
158             TermInformationReader lastAppliedTermInfoReader =
159                     lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
160                             lastLogEntry, hasFollowers());
161
162             long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
163             long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
164
165             TermInformationReader replicatedToAllTermInfoReader =
166                     replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
167
168             long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
169             long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
170
171             // send a CaptureSnapshot to self to make the expensive operation async.
172             captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
173                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
174                     newReplicatedToAllIndex, newReplicatedToAllTerm, toInstall);
175
176             SnapshotManager.this.currentState = CAPTURING;
177
178             LOG.info("{}: Initiating snapshot capture {}: {}", persistenceId(), toInstall ? "to install" : "",
179                     captureSnapshot);
180
181             context.getActor().tell(captureSnapshot, context.getActor());
182         }
183
184         @Override
185         public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
186             capture(lastLogEntry, replicatedToAllIndex, false);
187         }
188
189         @Override
190         public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
191             capture(lastLogEntry, replicatedToAllIndex, true);
192         }
193
194         @Override
195         public String toString() {
196             return "Idle";
197         }
198
199         @Override
200         public long trimLog(long desiredTrimIndex) {
201             return doTrimLog(desiredTrimIndex);
202         }
203     }
204
205     private class Capturing extends AbstractSnapshotState {
206
207         @Override
208         public boolean isCapturing() {
209             return true;
210         }
211
212         @Override
213         public void create(Procedure<Void> callback) {
214             try {
215                 callback.apply(null);
216                 SnapshotManager.this.currentState = CREATING;
217             } catch (Exception e) {
218                 LOG.error("Unexpected error occurred", e);
219             }
220         }
221
222         @Override
223         public String toString() {
224             return "Capturing";
225         }
226
227     }
228
229     private class Creating extends AbstractSnapshotState {
230
231         @Override
232         public boolean isCapturing() {
233             return true;
234         }
235
236         @Override
237         public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
238                             RaftActorBehavior currentBehavior) {
239             // create a snapshot object from the state provided and save it
240             // when snapshot is saved async, SaveSnapshotSuccess is raised.
241
242             Snapshot sn = Snapshot.create(snapshotBytes,
243                     context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
244                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
245                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
246
247             persistenceProvider.saveSnapshot(sn);
248
249             LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
250
251             long dataThreshold = Runtime.getRuntime().totalMemory() *
252                     context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
253             if (context.getReplicatedLog().dataSize() > dataThreshold) {
254                 // if memory is less, clear the log based on lastApplied.
255                 // this could/should only happen if one of the followers is down
256                 // as normally we keep removing from the log when its replicated to all.
257                 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
258                         captureSnapshot.getLastAppliedTerm());
259
260                 currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
261             } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
262                 // clear the log based on replicatedToAllIndex
263                 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
264                         captureSnapshot.getReplicatedToAllTerm());
265
266                 currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
267             } else {
268                 // The replicatedToAllIndex was not found in the log
269                 // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
270                 // In this scenario we may need to save the snapshot to the akka persistence
271                 // snapshot for recovery but we do not need to do the replicated log trimming.
272                 context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(),
273                         context.getReplicatedLog().getSnapshotTerm());
274             }
275
276             LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
277                             "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
278                     captureSnapshot.getLastAppliedTerm());
279
280             if (context.getId().equals(currentBehavior.getLeaderId())
281                     && captureSnapshot.isInstallSnapshotInitiated()) {
282                 // this would be call straight to the leader and won't initiate in serialization
283                 currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(
284                         ByteString.copyFrom(snapshotBytes)));
285             }
286
287             captureSnapshot = null;
288             SnapshotManager.this.currentState = PERSISTING;
289         }
290
291         @Override
292         public String toString() {
293             return "Creating";
294         }
295
296     }
297
298     private class Persisting extends AbstractSnapshotState {
299
300         @Override
301         public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
302             context.getReplicatedLog().snapshotCommit();
303             persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
304                     sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
305
306             persistenceProvider.deleteMessages(sequenceNumber);
307
308             SnapshotManager.this.currentState = IDLE;
309         }
310
311         @Override
312         public void rollback() {
313             context.getReplicatedLog().snapshotRollback();
314
315             LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
316                             "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
317                     context.getReplicatedLog().getSnapshotIndex(),
318                     context.getReplicatedLog().getSnapshotTerm(),
319                     context.getReplicatedLog().size());
320
321             SnapshotManager.this.currentState = IDLE;
322         }
323
324         @Override
325         public String toString() {
326             return "Persisting";
327         }
328
329     }
330
331     private static interface TermInformationReader {
332         long getIndex();
333         long getTerm();
334     }
335
336     private static class LastAppliedTermInformationReader implements TermInformationReader{
337         private long index;
338         private long term;
339
340         public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex,
341                                          ReplicatedLogEntry lastLogEntry, boolean hasFollowers){
342             ReplicatedLogEntry entry = log.get(originalIndex);
343             this.index = -1L;
344             this.term = -1L;
345             if (!hasFollowers) {
346                 if(lastLogEntry != null) {
347                     index = lastLogEntry.getIndex();
348                     term = lastLogEntry.getTerm();
349                 }
350             } else if (entry != null) {
351                 index = entry.getIndex();
352                 term = entry.getTerm();
353             } else if(originalIndex == log.getSnapshotIndex()){
354                 index = log.getSnapshotIndex();
355                 term = log.getSnapshotTerm();
356             }
357             return this;
358         }
359
360         @Override
361         public long getIndex(){
362             return this.index;
363         }
364
365         @Override
366         public long getTerm(){
367             return this.term;
368         }
369     }
370
371     private static class ReplicatedToAllTermInformationReader implements TermInformationReader{
372         private long index;
373         private long term;
374
375         ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){
376             ReplicatedLogEntry entry = log.get(originalIndex);
377             this.index = -1L;
378             this.term = -1L;
379
380             if (entry != null) {
381                 index = entry.getIndex();
382                 term = entry.getTerm();
383             }
384
385             return this;
386         }
387
388         @Override
389         public long getIndex(){
390             return this.index;
391         }
392
393         @Override
394         public long getTerm(){
395             return this.term;
396         }
397     }
398 }