2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import akka.japi.Procedure;
12 import akka.persistence.SnapshotSelectionCriteria;
13 import com.google.protobuf.ByteString;
14 import org.opendaylight.controller.cluster.DataPersistenceProvider;
15 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
16 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
17 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
18 import org.slf4j.Logger;
20 public class SnapshotManager implements SnapshotState {
23 private final SnapshotState IDLE = new Idle();
24 private final SnapshotState CAPTURING = new Capturing();
25 private final SnapshotState PERSISTING = new Persisting();
26 private final SnapshotState CREATING = new Creating();
28 private final Logger LOG;
29 private final RaftActorContext context;
30 private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
31 new LastAppliedTermInformationReader();
32 private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader =
33 new ReplicatedToAllTermInformationReader();
36 private SnapshotState currentState = IDLE;
37 private CaptureSnapshot captureSnapshot;
39 public SnapshotManager(RaftActorContext context, Logger logger) {
40 this.context = context;
45 public boolean isCapturing() {
46 return currentState.isCapturing();
50 public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
51 currentState.captureToInstall(lastLogEntry, replicatedToAllIndex);
55 public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
56 currentState.capture(lastLogEntry, replicatedToAllIndex);
60 public void create(Procedure<Void> callback) {
61 currentState.create(callback);
65 public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
66 currentState.persist(persistenceProvider, snapshotBytes, currentBehavior);
70 public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
71 currentState.commit(persistenceProvider, sequenceNumber);
75 public void rollback() {
76 currentState.rollback();
80 public long trimLog(long desiredTrimIndex) {
81 return currentState.trimLog(desiredTrimIndex);
84 private boolean hasFollowers(){
85 return context.getPeerAddresses().keySet().size() > 0;
88 private String persistenceId(){
89 return context.getId();
92 private class AbstractSnapshotState implements SnapshotState {
95 public boolean isCapturing() {
100 public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
101 LOG.debug("capture should not be called in state {}", this);
105 public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
106 LOG.debug("captureToInstall should not be called in state {}", this);
110 public void create(Procedure<Void> callback) {
111 LOG.debug("create should not be called in state {}", this);
115 public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
116 LOG.debug("persist should not be called in state {}", this);
120 public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
121 LOG.debug("commit should not be called in state {}", this);
125 public void rollback() {
126 LOG.debug("rollback should not be called in state {}", this);
130 public long trimLog(long desiredTrimIndex) {
131 LOG.debug("trimLog should not be called in state {}", this);
135 protected long doTrimLog(long desiredTrimIndex){
136 // we would want to keep the lastApplied as its used while capturing snapshots
137 long lastApplied = context.getLastApplied();
138 long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
140 if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
141 LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
142 context.getTermInformation().getCurrentTerm());
144 //use the term of the temp-min, since we check for isPresent, entry will not be null
145 ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
146 context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
147 context.getReplicatedLog().snapshotCommit();
155 private class Idle extends AbstractSnapshotState {
157 private void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, boolean toInstall) {
158 TermInformationReader lastAppliedTermInfoReader =
159 lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
160 lastLogEntry, hasFollowers());
162 long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
163 long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
165 TermInformationReader replicatedToAllTermInfoReader =
166 replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
168 long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
169 long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
171 // send a CaptureSnapshot to self to make the expensive operation async.
172 captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
173 lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
174 newReplicatedToAllIndex, newReplicatedToAllTerm, toInstall);
176 SnapshotManager.this.currentState = CAPTURING;
178 LOG.info("{}: Initiating snapshot capture {}: {}", persistenceId(), toInstall ? "to install" : "",
181 context.getActor().tell(captureSnapshot, context.getActor());
185 public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
186 capture(lastLogEntry, replicatedToAllIndex, false);
190 public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
191 capture(lastLogEntry, replicatedToAllIndex, true);
195 public String toString() {
200 public long trimLog(long desiredTrimIndex) {
201 return doTrimLog(desiredTrimIndex);
205 private class Capturing extends AbstractSnapshotState {
208 public boolean isCapturing() {
213 public void create(Procedure<Void> callback) {
215 callback.apply(null);
216 SnapshotManager.this.currentState = CREATING;
217 } catch (Exception e) {
218 LOG.error("Unexpected error occurred", e);
223 public String toString() {
229 private class Creating extends AbstractSnapshotState {
232 public boolean isCapturing() {
237 public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
238 RaftActorBehavior currentBehavior) {
239 // create a snapshot object from the state provided and save it
240 // when snapshot is saved async, SaveSnapshotSuccess is raised.
242 Snapshot sn = Snapshot.create(snapshotBytes,
243 context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
244 captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
245 captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
247 persistenceProvider.saveSnapshot(sn);
249 LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
251 long dataThreshold = Runtime.getRuntime().totalMemory() *
252 context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
253 if (context.getReplicatedLog().dataSize() > dataThreshold) {
254 // if memory is less, clear the log based on lastApplied.
255 // this could/should only happen if one of the followers is down
256 // as normally we keep removing from the log when its replicated to all.
257 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
258 captureSnapshot.getLastAppliedTerm());
260 currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
261 } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
262 // clear the log based on replicatedToAllIndex
263 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
264 captureSnapshot.getReplicatedToAllTerm());
266 currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
268 // The replicatedToAllIndex was not found in the log
269 // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
270 // In this scenario we may need to save the snapshot to the akka persistence
271 // snapshot for recovery but we do not need to do the replicated log trimming.
272 context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(),
273 context.getReplicatedLog().getSnapshotTerm());
276 LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
277 "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
278 captureSnapshot.getLastAppliedTerm());
280 if (context.getId().equals(currentBehavior.getLeaderId())
281 && captureSnapshot.isInstallSnapshotInitiated()) {
282 // this would be call straight to the leader and won't initiate in serialization
283 currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(
284 ByteString.copyFrom(snapshotBytes)));
287 captureSnapshot = null;
288 SnapshotManager.this.currentState = PERSISTING;
292 public String toString() {
298 private class Persisting extends AbstractSnapshotState {
301 public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
302 context.getReplicatedLog().snapshotCommit();
303 persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
304 sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
306 persistenceProvider.deleteMessages(sequenceNumber);
308 SnapshotManager.this.currentState = IDLE;
312 public void rollback() {
313 context.getReplicatedLog().snapshotRollback();
315 LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
316 "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
317 context.getReplicatedLog().getSnapshotIndex(),
318 context.getReplicatedLog().getSnapshotTerm(),
319 context.getReplicatedLog().size());
321 SnapshotManager.this.currentState = IDLE;
325 public String toString() {
331 private static interface TermInformationReader {
336 private static class LastAppliedTermInformationReader implements TermInformationReader{
340 public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex,
341 ReplicatedLogEntry lastLogEntry, boolean hasFollowers){
342 ReplicatedLogEntry entry = log.get(originalIndex);
346 if(lastLogEntry != null) {
347 index = lastLogEntry.getIndex();
348 term = lastLogEntry.getTerm();
350 } else if (entry != null) {
351 index = entry.getIndex();
352 term = entry.getTerm();
353 } else if(originalIndex == log.getSnapshotIndex()){
354 index = log.getSnapshotIndex();
355 term = log.getSnapshotTerm();
361 public long getIndex(){
366 public long getTerm(){
371 private static class ReplicatedToAllTermInformationReader implements TermInformationReader{
375 ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){
376 ReplicatedLogEntry entry = log.get(originalIndex);
381 index = entry.getIndex();
382 term = entry.getTerm();
389 public long getIndex(){
394 public long getTerm(){