2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorContext;
11 import akka.actor.ActorRef;
12 import com.google.common.base.Preconditions;
13 import com.google.common.io.ByteSource;
14 import java.io.IOException;
15 import java.io.InputStream;
16 import java.io.OutputStream;
17 import java.util.Optional;
18 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
19 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
20 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
21 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
22 import org.opendaylight.controller.cluster.access.concepts.MemberName;
23 import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor;
24 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
25 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
26 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
27 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
28 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
29 import org.slf4j.Logger;
32 * Participates in raft snapshotting on behalf of a Shard actor.
34 * @author Thomas Pantelis
36 class ShardSnapshotCohort implements RaftActorSnapshotCohort {
37 private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
39 private final ActorRef snapshotActor;
40 private final ShardDataTree store;
41 private final String logId;
42 private final Logger log;
44 private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor,
45 final ShardDataTree store, final Logger log, final String logId) {
46 this.snapshotActor = Preconditions.checkNotNull(snapshotActor);
47 this.store = Preconditions.checkNotNull(store);
52 static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName,
53 final ShardDataTree store, final Logger log, final String logId) {
54 final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
55 FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
56 final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read";
58 // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all
60 final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName);
62 return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId);
66 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
67 // Forward the request to the snapshot actor
68 ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), installSnapshotStream, actorRef);
72 @SuppressWarnings("checkstyle:IllegalCatch")
73 public void applySnapshot(final Snapshot.State snapshotState) {
74 if (!(snapshotState instanceof ShardSnapshotState)) {
75 log.debug("{}: applySnapshot ignoring snapshot: {}", snapshotState);
78 final ShardDataTreeSnapshot snapshot = ((ShardSnapshotState)snapshotState).getSnapshot();
80 // Since this will be done only on Recovery or when this actor is a Follower
81 // we can safely commit everything in here. We not need to worry about event notifications
82 // as they would have already been disabled on the follower
84 log.info("{}: Applying snapshot", logId);
87 store.applySnapshot(snapshot);
88 } catch (Exception e) {
89 log.error("{}: Failed to apply snapshot {}", logId, snapshot, e);
93 log.info("{}: Done applying snapshot", logId);
97 public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
98 try (final InputStream is = snapshotBytes.openStream()) {
99 return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(is));