Merge "Address comment in gerrit 17266"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorSnapshotMessageSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.japi.Procedure;
11 import akka.persistence.SaveSnapshotFailure;
12 import akka.persistence.SaveSnapshotSuccess;
13 import org.opendaylight.controller.cluster.DataPersistenceProvider;
14 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
15 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
16 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
17 import org.slf4j.Logger;
18
19 /**
20  * Handles snapshot related messages for a RaftActor.
21  *
22  * @author Thomas Pantelis
23  */
24 class RaftActorSnapshotMessageSupport {
25     static final String COMMIT_SNAPSHOT = "commit_snapshot";
26
27     private final DataPersistenceProvider persistence;
28     private final RaftActorContext context;
29     private final RaftActorBehavior currentBehavior;
30     private final RaftActorSnapshotCohort cohort;
31     private final Logger log;
32
33     private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
34         @Override
35         public void apply(Void notUsed) throws Exception {
36             cohort.createSnapshot(context.getActor());
37         }
38     };
39
40     RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
41             RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort) {
42         this.persistence = persistence;
43         this.context = context;
44         this.currentBehavior = currentBehavior;
45         this.cohort = cohort;
46         this.log = context.getLogger();
47
48         context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
49     }
50
51     boolean handleSnapshotMessage(Object message) {
52         if(message instanceof ApplySnapshot ) {
53             onApplySnapshot(((ApplySnapshot) message).getSnapshot());
54             return true;
55         } else if (message instanceof SaveSnapshotSuccess) {
56             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
57             return true;
58         } else if (message instanceof SaveSnapshotFailure) {
59             onSaveSnapshotFailure((SaveSnapshotFailure) message);
60             return true;
61         } else if (message instanceof CaptureSnapshotReply) {
62             onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
63             return true;
64         } else if (message.equals(COMMIT_SNAPSHOT)) {
65             context.getSnapshotManager().commit(persistence, -1);
66             return true;
67         } else {
68             return false;
69         }
70     }
71
72     private void onCaptureSnapshotReply(byte[] snapshotBytes) {
73         log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
74
75         context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
76     }
77
78     private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
79         log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
80                 context.getId(), saveSnapshotFailure.cause());
81
82         context.getSnapshotManager().rollback();
83     }
84
85     private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
86         log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
87
88         long sequenceNumber = success.metadata().sequenceNr();
89
90         context.getSnapshotManager().commit(persistence, sequenceNumber);
91     }
92
93     private void onApplySnapshot(Snapshot snapshot) {
94         if(log.isDebugEnabled()) {
95             log.debug("{}: ApplySnapshot called on Follower Actor " +
96                     "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
97                 snapshot.getLastAppliedTerm());
98         }
99
100         cohort.applySnapshot(snapshot.getState());
101
102         //clears the followers log, sets the snapshot index to ensure adjusted-index works
103         context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
104                 currentBehavior));
105         context.setLastApplied(snapshot.getLastAppliedIndex());
106     }
107 }