bf0fc10aad944ae8539c8b3b2a4da70020500944
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorSnapshotMessageSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.japi.Procedure;
11 import akka.persistence.SaveSnapshotFailure;
12 import akka.persistence.SaveSnapshotSuccess;
13 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
14 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
15 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
16 import org.slf4j.Logger;
17
18 /**
19  * Handles snapshot related messages for a RaftActor.
20  *
21  * @author Thomas Pantelis
22  */
23 class RaftActorSnapshotMessageSupport {
24     static final String COMMIT_SNAPSHOT = "commit_snapshot";
25
26     private final RaftActorContext context;
27     private final RaftActorBehavior currentBehavior;
28     private final RaftActorSnapshotCohort cohort;
29     private final Logger log;
30
31     private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
32         @Override
33         public void apply(Void notUsed) {
34             cohort.createSnapshot(context.getActor());
35         }
36     };
37
38     private final Procedure<byte[]> applySnapshotProcedure = new Procedure<byte[]>() {
39         @Override
40         public void apply(byte[] state) {
41             cohort.applySnapshot(state);
42         }
43     };
44
45     RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
46             RaftActorSnapshotCohort cohort) {
47         this.context = context;
48         this.currentBehavior = currentBehavior;
49         this.cohort = cohort;
50         this.log = context.getLogger();
51
52         context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
53         context.getSnapshotManager().setApplySnapshotProcedure(applySnapshotProcedure);
54     }
55
56     boolean handleSnapshotMessage(Object message) {
57         if(message instanceof ApplySnapshot ) {
58             onApplySnapshot((ApplySnapshot) message);
59             return true;
60         } else if (message instanceof SaveSnapshotSuccess) {
61             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
62             return true;
63         } else if (message instanceof SaveSnapshotFailure) {
64             onSaveSnapshotFailure((SaveSnapshotFailure) message);
65             return true;
66         } else if (message instanceof CaptureSnapshotReply) {
67             onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
68             return true;
69         } else if (message.equals(COMMIT_SNAPSHOT)) {
70             context.getSnapshotManager().commit(-1, currentBehavior);
71             return true;
72         } else {
73             return false;
74         }
75     }
76
77     private void onCaptureSnapshotReply(byte[] snapshotBytes) {
78         log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
79
80         context.getSnapshotManager().persist(snapshotBytes, currentBehavior, context.getTotalMemory());
81     }
82
83     private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
84         log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
85                 context.getId(), saveSnapshotFailure.cause());
86
87         context.getSnapshotManager().rollback();
88     }
89
90     private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
91         log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
92
93         long sequenceNumber = success.metadata().sequenceNr();
94
95         context.getSnapshotManager().commit(sequenceNumber, currentBehavior);
96     }
97
98     private void onApplySnapshot(ApplySnapshot message) {
99         log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(),
100                 message.getSnapshot().getLastAppliedIndex(), message.getSnapshot().getLastAppliedTerm());
101
102         context.getSnapshotManager().apply(message);
103     }
104 }