Add unit tests for RaftActorSnapshotMessageSupport
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorSnapshotMessageSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.japi.Procedure;
11 import akka.persistence.SaveSnapshotFailure;
12 import akka.persistence.SaveSnapshotSuccess;
13 import org.opendaylight.controller.cluster.DataPersistenceProvider;
14 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
15 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
16 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
17 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
18 import org.slf4j.Logger;
19
20 /**
21  * Handles snapshot related messages for a RaftActor.
22  *
23  * @author Thomas Pantelis
24  */
25 class RaftActorSnapshotMessageSupport {
26     static final String COMMIT_SNAPSHOT = "commit_snapshot";
27
28     private final DataPersistenceProvider persistence;
29     private final RaftActorContext context;
30     private final RaftActorBehavior currentBehavior;
31     private final RaftActorSnapshotCohort cohort;
32     private final Logger log;
33
34     private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
35         @Override
36         public void apply(Void notUsed) throws Exception {
37             cohort.createSnapshot(context.getActor());
38         }
39     };
40
41     RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
42             RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort) {
43         this.persistence = persistence;
44         this.context = context;
45         this.currentBehavior = currentBehavior;
46         this.cohort = cohort;
47         this.log = context.getLogger();
48     }
49
50     boolean handleSnapshotMessage(Object message) {
51         if(message instanceof ApplySnapshot ) {
52             onApplySnapshot(((ApplySnapshot) message).getSnapshot());
53             return true;
54         } else if (message instanceof SaveSnapshotSuccess) {
55             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
56             return true;
57         } else if (message instanceof SaveSnapshotFailure) {
58             onSaveSnapshotFailure((SaveSnapshotFailure) message);
59             return true;
60         } else if (message instanceof CaptureSnapshot) {
61             onCaptureSnapshot(message);
62             return true;
63         } else if (message instanceof CaptureSnapshotReply) {
64             onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
65             return true;
66         } else if (message.equals(COMMIT_SNAPSHOT)) {
67             context.getSnapshotManager().commit(persistence, -1);
68             return true;
69         } else {
70             return false;
71         }
72     }
73
74     private void onCaptureSnapshotReply(byte[] snapshotBytes) {
75         log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
76
77         context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
78     }
79
80     private void onCaptureSnapshot(Object message) {
81         log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
82
83         context.getSnapshotManager().create(createSnapshotProcedure);
84     }
85
86     private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
87         log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
88                 context.getId(), saveSnapshotFailure.cause());
89
90         context.getSnapshotManager().rollback();
91     }
92
93     private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
94         log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
95
96         long sequenceNumber = success.metadata().sequenceNr();
97
98         context.getSnapshotManager().commit(persistence, sequenceNumber);
99     }
100
101     private void onApplySnapshot(Snapshot snapshot) {
102         if(log.isDebugEnabled()) {
103             log.debug("{}: ApplySnapshot called on Follower Actor " +
104                     "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
105                 snapshot.getLastAppliedTerm());
106         }
107
108         cohort.applySnapshot(snapshot.getState());
109
110         //clears the followers log, sets the snapshot index to ensure adjusted-index works
111         context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
112                 currentBehavior));
113         context.setLastApplied(snapshot.getLastAppliedIndex());
114     }
115 }