X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardSnapshotCohort.java;h=126e28d9c8c6157794f3b7a12282fb5488d5e2dd;hp=6dc3f03081b71fe18c94e10dfb28d39c7d13d985;hb=987e2e706d0b343304142626bc870f3e8c909b51;hpb=925cb4a228d0fda99c7bfeb432eb25285a223887 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 6dc3f03081..126e28d9c8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -7,10 +7,15 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorContext; import akka.actor.ActorRef; -import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.OutputStream; +import java.util.Optional; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendType; @@ -18,7 +23,10 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.slf4j.Logger; /** @@ -26,7 +34,7 @@ import org.slf4j.Logger; * * @author Thomas Pantelis */ -class ShardSnapshotCohort implements RaftActorSnapshotCohort { +final class ShardSnapshotCohort implements RaftActorSnapshotCohort { private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply"); private final ActorRef snapshotActor; @@ -36,8 +44,8 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor, final ShardDataTree store, final Logger log, final String logId) { - this.snapshotActor = Preconditions.checkNotNull(snapshotActor); - this.store = Preconditions.checkNotNull(store); + this.snapshotActor = requireNonNull(snapshotActor); + this.store = requireNonNull(store); this.log = log; this.logId = logId; } @@ -56,28 +64,29 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { } @Override - public void createSnapshot(final ActorRef actorRef) { + public void createSnapshot(final ActorRef actorRef, final Optional installSnapshotStream) { // Forward the request to the snapshot actor - ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), actorRef); + final ShardDataTreeSnapshot snapshot = store.takeStateSnapshot(); + log.debug("{}: requesting serialization of snapshot {}", logId, snapshot); + + ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot, installSnapshotStream, actorRef); } @Override @SuppressWarnings("checkstyle:IllegalCatch") - public void applySnapshot(final byte[] snapshotBytes) { + public void applySnapshot(final Snapshot.State snapshotState) { + if (!(snapshotState instanceof ShardSnapshotState)) { + log.debug("{}: applySnapshot ignoring snapshot: {}", logId, snapshotState); + } + + final ShardDataTreeSnapshot snapshot = ((ShardSnapshotState)snapshotState).getSnapshot(); + // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower log.info("{}: Applying snapshot", logId); - final ShardDataTreeSnapshot snapshot; - try { - snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes); - } catch (IOException e) { - log.error("{}: Failed to deserialize snapshot", logId, e); - return; - } - try { store.applySnapshot(snapshot); } catch (Exception e) { @@ -87,4 +96,11 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { log.info("{}: Done applying snapshot", logId); } + + @Override + public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException { + try (ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) { + return ShardDataTreeSnapshot.deserialize(in); + } + } }