2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import akka.japi.Procedure;
11 import akka.persistence.SaveSnapshotFailure;
12 import akka.persistence.SaveSnapshotSuccess;
13 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
14 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
15 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
16 import org.slf4j.Logger;
19 * Handles snapshot related messages for a RaftActor.
21 * @author Thomas Pantelis
23 class RaftActorSnapshotMessageSupport {
24 static final String COMMIT_SNAPSHOT = "commit_snapshot";
26 private final RaftActorContext context;
27 private final RaftActorBehavior currentBehavior;
28 private final RaftActorSnapshotCohort cohort;
29 private final Logger log;
31 private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
33 public void apply(Void notUsed) throws Exception {
34 cohort.createSnapshot(context.getActor());
38 RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
39 RaftActorSnapshotCohort cohort) {
40 this.context = context;
41 this.currentBehavior = currentBehavior;
43 this.log = context.getLogger();
45 context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
48 boolean handleSnapshotMessage(Object message) {
49 if(message instanceof ApplySnapshot ) {
50 onApplySnapshot(((ApplySnapshot) message).getSnapshot());
52 } else if (message instanceof SaveSnapshotSuccess) {
53 onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
55 } else if (message instanceof SaveSnapshotFailure) {
56 onSaveSnapshotFailure((SaveSnapshotFailure) message);
58 } else if (message instanceof CaptureSnapshotReply) {
59 onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
61 } else if (message.equals(COMMIT_SNAPSHOT)) {
62 context.getSnapshotManager().commit(-1);
69 private void onCaptureSnapshotReply(byte[] snapshotBytes) {
70 log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
72 context.getSnapshotManager().persist(snapshotBytes, currentBehavior, context.getTotalMemory());
75 private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
76 log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
77 context.getId(), saveSnapshotFailure.cause());
79 context.getSnapshotManager().rollback();
82 private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
83 log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
85 long sequenceNumber = success.metadata().sequenceNr();
87 context.getSnapshotManager().commit(sequenceNumber);
90 private void onApplySnapshot(Snapshot snapshot) {
91 if(log.isDebugEnabled()) {
92 log.debug("{}: ApplySnapshot called on Follower Actor " +
93 "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
94 snapshot.getLastAppliedTerm());
97 cohort.applySnapshot(snapshot.getState());
99 //clears the followers log, sets the snapshot index to ensure adjusted-index works
100 context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
101 context.setLastApplied(snapshot.getLastAppliedIndex());