2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import akka.japi.Procedure;
11 import akka.persistence.SaveSnapshotFailure;
12 import akka.persistence.SaveSnapshotSuccess;
13 import org.opendaylight.controller.cluster.DataPersistenceProvider;
14 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
15 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
16 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
17 import org.slf4j.Logger;
20 * Handles snapshot related messages for a RaftActor.
22 * @author Thomas Pantelis
24 class RaftActorSnapshotMessageSupport {
25 static final String COMMIT_SNAPSHOT = "commit_snapshot";
27 private final DataPersistenceProvider persistence;
28 private final RaftActorContext context;
29 private final RaftActorBehavior currentBehavior;
30 private final RaftActorSnapshotCohort cohort;
31 private final Logger log;
33 private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
35 public void apply(Void notUsed) throws Exception {
36 cohort.createSnapshot(context.getActor());
40 RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
41 RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort) {
42 this.persistence = persistence;
43 this.context = context;
44 this.currentBehavior = currentBehavior;
46 this.log = context.getLogger();
48 context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
51 boolean handleSnapshotMessage(Object message) {
52 if(message instanceof ApplySnapshot ) {
53 onApplySnapshot(((ApplySnapshot) message).getSnapshot());
55 } else if (message instanceof SaveSnapshotSuccess) {
56 onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
58 } else if (message instanceof SaveSnapshotFailure) {
59 onSaveSnapshotFailure((SaveSnapshotFailure) message);
61 } else if (message instanceof CaptureSnapshotReply) {
62 onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
64 } else if (message.equals(COMMIT_SNAPSHOT)) {
65 context.getSnapshotManager().commit(persistence, -1);
72 private void onCaptureSnapshotReply(byte[] snapshotBytes) {
73 log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
75 context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
78 private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
79 log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
80 context.getId(), saveSnapshotFailure.cause());
82 context.getSnapshotManager().rollback();
85 private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
86 log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
88 long sequenceNumber = success.metadata().sequenceNr();
90 context.getSnapshotManager().commit(persistence, sequenceNumber);
93 private void onApplySnapshot(Snapshot snapshot) {
94 if(log.isDebugEnabled()) {
95 log.debug("{}: ApplySnapshot called on Follower Actor " +
96 "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
97 snapshot.getLastAppliedTerm());
100 cohort.applySnapshot(snapshot.getState());
102 //clears the followers log, sets the snapshot index to ensure adjusted-index works
103 context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
105 context.setLastApplied(snapshot.getLastAppliedIndex());