2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import akka.actor.ActorRef;
11 import akka.japi.Procedure;
12 import akka.persistence.SaveSnapshotFailure;
13 import akka.persistence.SaveSnapshotSuccess;
14 import org.opendaylight.controller.cluster.DataPersistenceProvider;
15 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
16 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
17 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
18 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
19 import org.slf4j.Logger;
22 * Handles snapshot related messages for a RaftActor.
24 * @author Thomas Pantelis
26 class RaftActorSnapshotMessageSupport {
27 static final String COMMIT_SNAPSHOT = "commit_snapshot";
29 private final DataPersistenceProvider persistence;
30 private final RaftActorContext context;
31 private final RaftActorBehavior currentBehavior;
32 private final RaftActorSnapshotCohort cohort;
33 private final ActorRef raftActorRef;
34 private final Logger log;
36 private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
38 public void apply(Void notUsed) throws Exception {
39 cohort.createSnapshot(raftActorRef);
43 RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
44 RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort, ActorRef raftActorRef) {
45 this.persistence = persistence;
46 this.context = context;
47 this.currentBehavior = currentBehavior;
49 this.raftActorRef = raftActorRef;
50 this.log = context.getLogger();
53 boolean handleSnapshotMessage(Object message) {
54 if(message instanceof ApplySnapshot ) {
55 onApplySnapshot(((ApplySnapshot) message).getSnapshot());
57 } else if (message instanceof SaveSnapshotSuccess) {
58 onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
60 } else if (message instanceof SaveSnapshotFailure) {
61 onSaveSnapshotFailure((SaveSnapshotFailure) message);
63 } else if (message instanceof CaptureSnapshot) {
64 onCaptureSnapshot(message);
66 } else if (message instanceof CaptureSnapshotReply) {
67 onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
69 } else if (message.equals(COMMIT_SNAPSHOT)) {
70 context.getSnapshotManager().commit(persistence, -1);
77 private void onCaptureSnapshotReply(byte[] snapshotBytes) {
78 log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
80 context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
83 private void onCaptureSnapshot(Object message) {
84 log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
86 context.getSnapshotManager().create(createSnapshotProcedure);
89 private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
90 log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
91 context.getId(), saveSnapshotFailure.cause());
93 context.getSnapshotManager().rollback();
96 private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
97 log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
99 long sequenceNumber = success.metadata().sequenceNr();
101 context.getSnapshotManager().commit(persistence, sequenceNumber);
104 private void onApplySnapshot(Snapshot snapshot) {
105 if(log.isDebugEnabled()) {
106 log.debug("{}: ApplySnapshot called on Follower Actor " +
107 "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
108 snapshot.getLastAppliedTerm());
111 cohort.applySnapshot(snapshot.getState());
113 //clears the followers log, sets the snapshot index to ensure adjusted-index works
114 context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
116 context.setLastApplied(snapshot.getLastAppliedIndex());