Merge changes I114cbac1,I45c2e7cd
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorSnapshotMessageSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.japi.Procedure;
12 import akka.persistence.SaveSnapshotFailure;
13 import akka.persistence.SaveSnapshotSuccess;
14 import org.opendaylight.controller.cluster.DataPersistenceProvider;
15 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
16 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
17 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
18 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
19 import org.slf4j.Logger;
20
21 /**
22  * Handles snapshot related messages for a RaftActor.
23  *
24  * @author Thomas Pantelis
25  */
26 class RaftActorSnapshotMessageSupport {
27     static final String COMMIT_SNAPSHOT = "commit_snapshot";
28
29     private final DataPersistenceProvider persistence;
30     private final RaftActorContext context;
31     private final RaftActorBehavior currentBehavior;
32     private final RaftActorSnapshotCohort cohort;
33     private final ActorRef raftActorRef;
34     private final Logger log;
35
36     private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
37         @Override
38         public void apply(Void notUsed) throws Exception {
39             cohort.createSnapshot(raftActorRef);
40         }
41     };
42
43     RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
44             RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort, ActorRef raftActorRef) {
45         this.persistence = persistence;
46         this.context = context;
47         this.currentBehavior = currentBehavior;
48         this.cohort = cohort;
49         this.raftActorRef = raftActorRef;
50         this.log = context.getLogger();
51     }
52
53     boolean handleSnapshotMessage(Object message) {
54         if(message instanceof ApplySnapshot ) {
55             onApplySnapshot(((ApplySnapshot) message).getSnapshot());
56             return true;
57         } else if (message instanceof SaveSnapshotSuccess) {
58             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
59             return true;
60         } else if (message instanceof SaveSnapshotFailure) {
61             onSaveSnapshotFailure((SaveSnapshotFailure) message);
62             return true;
63         } else if (message instanceof CaptureSnapshot) {
64             onCaptureSnapshot(message);
65             return true;
66         } else if (message instanceof CaptureSnapshotReply) {
67             onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
68             return true;
69         } else if (message.equals(COMMIT_SNAPSHOT)) {
70             context.getSnapshotManager().commit(persistence, -1);
71             return true;
72         } else {
73             return false;
74         }
75     }
76
77     private void onCaptureSnapshotReply(byte[] snapshotBytes) {
78         log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
79
80         context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
81     }
82
83     private void onCaptureSnapshot(Object message) {
84         log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
85
86         context.getSnapshotManager().create(createSnapshotProcedure);
87     }
88
89     private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
90         log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
91                 context.getId(), saveSnapshotFailure.cause());
92
93         context.getSnapshotManager().rollback();
94     }
95
96     private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
97         log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
98
99         long sequenceNumber = success.metadata().sequenceNr();
100
101         context.getSnapshotManager().commit(persistence, sequenceNumber);
102     }
103
104     private void onApplySnapshot(Snapshot snapshot) {
105         if(log.isDebugEnabled()) {
106             log.debug("{}: ApplySnapshot called on Follower Actor " +
107                     "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
108                 snapshot.getLastAppliedTerm());
109         }
110
111         cohort.applySnapshot(snapshot.getState());
112
113         //clears the followers log, sets the snapshot index to ensure adjusted-index works
114         context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
115                 currentBehavior));
116         context.setLastApplied(snapshot.getLastAppliedIndex());
117     }
118 }