2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import akka.japi.Procedure;
11 import akka.persistence.SaveSnapshotFailure;
12 import akka.persistence.SaveSnapshotSuccess;
13 import org.opendaylight.controller.cluster.DataPersistenceProvider;
14 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
15 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
16 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
17 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
18 import org.slf4j.Logger;
21 * Handles snapshot related messages for a RaftActor.
23 * @author Thomas Pantelis
25 class RaftActorSnapshotMessageSupport {
26 static final String COMMIT_SNAPSHOT = "commit_snapshot";
28 private final DataPersistenceProvider persistence;
29 private final RaftActorContext context;
30 private final RaftActorBehavior currentBehavior;
31 private final RaftActorSnapshotCohort cohort;
32 private final Logger log;
34 private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
36 public void apply(Void notUsed) throws Exception {
37 cohort.createSnapshot(context.getActor());
41 RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
42 RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort) {
43 this.persistence = persistence;
44 this.context = context;
45 this.currentBehavior = currentBehavior;
47 this.log = context.getLogger();
50 boolean handleSnapshotMessage(Object message) {
51 if(message instanceof ApplySnapshot ) {
52 onApplySnapshot(((ApplySnapshot) message).getSnapshot());
54 } else if (message instanceof SaveSnapshotSuccess) {
55 onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
57 } else if (message instanceof SaveSnapshotFailure) {
58 onSaveSnapshotFailure((SaveSnapshotFailure) message);
60 } else if (message instanceof CaptureSnapshot) {
61 onCaptureSnapshot(message);
63 } else if (message instanceof CaptureSnapshotReply) {
64 onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
66 } else if (message.equals(COMMIT_SNAPSHOT)) {
67 context.getSnapshotManager().commit(persistence, -1);
74 private void onCaptureSnapshotReply(byte[] snapshotBytes) {
75 log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
77 context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
80 private void onCaptureSnapshot(Object message) {
81 log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
83 context.getSnapshotManager().create(createSnapshotProcedure);
86 private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
87 log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
88 context.getId(), saveSnapshotFailure.cause());
90 context.getSnapshotManager().rollback();
93 private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
94 log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
96 long sequenceNumber = success.metadata().sequenceNr();
98 context.getSnapshotManager().commit(persistence, sequenceNumber);
101 private void onApplySnapshot(Snapshot snapshot) {
102 if(log.isDebugEnabled()) {
103 log.debug("{}: ApplySnapshot called on Follower Actor " +
104 "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
105 snapshot.getLastAppliedTerm());
108 cohort.applySnapshot(snapshot.getState());
110 //clears the followers log, sets the snapshot index to ensure adjusted-index works
111 context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
113 context.setLastApplied(snapshot.getLastAppliedIndex());