Merge "BUG-2288: deprecate old binding Notification API elements"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorSnapshotMessageSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.japi.Procedure;
11 import akka.persistence.SaveSnapshotFailure;
12 import akka.persistence.SaveSnapshotSuccess;
13 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
14 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
15 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
16 import org.slf4j.Logger;
17
18 /**
19  * Handles snapshot related messages for a RaftActor.
20  *
21  * @author Thomas Pantelis
22  */
23 class RaftActorSnapshotMessageSupport {
24     static final String COMMIT_SNAPSHOT = "commit_snapshot";
25
26     private final RaftActorContext context;
27     private final RaftActorBehavior currentBehavior;
28     private final RaftActorSnapshotCohort cohort;
29     private final Logger log;
30
31     private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
32         @Override
33         public void apply(Void notUsed) throws Exception {
34             cohort.createSnapshot(context.getActor());
35         }
36     };
37
38     RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
39             RaftActorSnapshotCohort cohort) {
40         this.context = context;
41         this.currentBehavior = currentBehavior;
42         this.cohort = cohort;
43         this.log = context.getLogger();
44
45         context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
46     }
47
48     boolean handleSnapshotMessage(Object message) {
49         if(message instanceof ApplySnapshot ) {
50             onApplySnapshot(((ApplySnapshot) message).getSnapshot());
51             return true;
52         } else if (message instanceof SaveSnapshotSuccess) {
53             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
54             return true;
55         } else if (message instanceof SaveSnapshotFailure) {
56             onSaveSnapshotFailure((SaveSnapshotFailure) message);
57             return true;
58         } else if (message instanceof CaptureSnapshotReply) {
59             onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
60             return true;
61         } else if (message.equals(COMMIT_SNAPSHOT)) {
62             context.getSnapshotManager().commit(-1);
63             return true;
64         } else {
65             return false;
66         }
67     }
68
69     private void onCaptureSnapshotReply(byte[] snapshotBytes) {
70         log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
71
72         context.getSnapshotManager().persist(snapshotBytes, currentBehavior, context.getTotalMemory());
73     }
74
75     private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
76         log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
77                 context.getId(), saveSnapshotFailure.cause());
78
79         context.getSnapshotManager().rollback();
80     }
81
82     private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
83         log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
84
85         long sequenceNumber = success.metadata().sequenceNr();
86
87         context.getSnapshotManager().commit(sequenceNumber);
88     }
89
90     private void onApplySnapshot(Snapshot snapshot) {
91         if(log.isDebugEnabled()) {
92             log.debug("{}: ApplySnapshot called on Follower Actor " +
93                     "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
94                 snapshot.getLastAppliedTerm());
95         }
96
97         cohort.applySnapshot(snapshot.getState());
98
99         //clears the followers log, sets the snapshot index to ensure adjusted-index works
100         context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
101         context.setLastApplied(snapshot.getLastAppliedIndex());
102     }
103 }