X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardSnapshotCohort.java;h=c7bc20f7546599876d4f99a9ca07fc174ed836ed;hp=7812d70917757083dce3dace6d52b14d2c1994ec;hb=3402cfce32b05957219e54754dd7ca5b0a54cd0e;hpb=b70b396725749d3fd6ca761f02f4b630f6f4f1ce diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 7812d70917..c7bc20f754 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -7,23 +7,27 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorContext; import akka.actor.ActorRef; -import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.OutputStream; import java.util.Optional; -import java.util.concurrent.ExecutionException; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendType; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.io.InputOutputStreamFactory; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.slf4j.Logger; /** @@ -31,89 +35,78 @@ import org.slf4j.Logger; * * @author Thomas Pantelis */ -class ShardSnapshotCohort implements RaftActorSnapshotCohort { +final class ShardSnapshotCohort implements RaftActorSnapshotCohort { private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply"); - private final LocalHistoryIdentifier applyHistoryId; + private final InputOutputStreamFactory streamFactory; private final ActorRef snapshotActor; private final ShardDataTree store; private final String logId; private final Logger log; - private long applyCounter; - - private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor, - final ShardDataTree store, final Logger log, final String logId) { - this.applyHistoryId = Preconditions.checkNotNull(applyHistoryId); - this.snapshotActor = Preconditions.checkNotNull(snapshotActor); - this.store = Preconditions.checkNotNull(store); + ShardSnapshotCohort(final InputOutputStreamFactory streamFactory, final LocalHistoryIdentifier applyHistoryId, + final ActorRef snapshotActor, final ShardDataTree store, final Logger log, final String logId) { + this.streamFactory = requireNonNull(streamFactory); + this.snapshotActor = requireNonNull(snapshotActor); + this.store = requireNonNull(store); this.log = log; this.logId = logId; } static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName, - final ShardDataTree store, final Logger log, final String logId) { + final ShardDataTree store, final Logger log, final String logId, final DatastoreContext context) { final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create( FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0); final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read"; + final InputOutputStreamFactory streamFactory = context.isUseLz4Compression() + ? InputOutputStreamFactory.lz4("256KB") : InputOutputStreamFactory.simple(); // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all // requests. - final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName); + final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(streamFactory), + snapshotActorName); - return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId); + return new ShardSnapshotCohort(streamFactory, applyHistoryId, snapshotActor, store, log, logId); } @Override - public void createSnapshot(final ActorRef actorRef) { + public void createSnapshot(final ActorRef actorRef, final Optional installSnapshotStream) { // Forward the request to the snapshot actor - ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeRecoverySnapshot(), actorRef); + final ShardDataTreeSnapshot snapshot = store.takeStateSnapshot(); + log.debug("{}: requesting serialization of snapshot {}", logId, snapshot); + + ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot, installSnapshotStream, actorRef); } - private void deserializeAndApplySnapshot(final byte[] snapshotBytes) { - final ShardDataTreeSnapshot snapshot; - try { - snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes); - } catch (IOException e) { - log.error("{}: Failed to deserialize snapshot", logId, e); - return; + @Override + @SuppressWarnings("checkstyle:IllegalCatch") + public void applySnapshot(final Snapshot.State snapshotState) { + if (!(snapshotState instanceof ShardSnapshotState)) { + log.debug("{}: applySnapshot ignoring snapshot: {}", logId, snapshotState); } - try { - final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction( - new TransactionIdentifier(applyHistoryId, applyCounter++)); + final ShardDataTreeSnapshot snapshot = ((ShardSnapshotState)snapshotState).getSnapshot(); - // delete everything first - transaction.getSnapshot().delete(YangInstanceIdentifier.EMPTY); + // Since this will be done only on Recovery or when this actor is a Follower + // we can safely commit everything in here. We not need to worry about event notifications + // as they would have already been disabled on the follower - final Optional> maybeNode = snapshot.getRootNode(); - if (maybeNode.isPresent()) { - // Add everything from the remote node back - transaction.getSnapshot().write(YangInstanceIdentifier.EMPTY, maybeNode.get()); - } + log.info("{}: Applying snapshot", logId); - store.applyRecoveryTransaction(transaction); + try { + store.applySnapshot(snapshot); } catch (Exception e) { - log.error("{}: An exception occurred when applying snapshot", logId, e); + log.error("{}: Failed to apply snapshot {}", logId, snapshot, e); + return; } - } - - void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction) - throws ExecutionException, InterruptedException { - ShardDataTreeCohort commitCohort = store.finishTransaction(transaction); - commitCohort.preCommit().get(); - commitCohort.commit().get(); + log.info("{}: Done applying snapshot", logId); } @Override - public void applySnapshot(final byte[] snapshotBytes) { - // Since this will be done only on Recovery or when this actor is a Follower - // we can safely commit everything in here. We not need to worry about event notifications - // as they would have already been disabled on the follower - - log.info("{}: Applying snapshot", logId); - deserializeAndApplySnapshot(snapshotBytes); - log.info("{}: Done applying snapshot", logId); + public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException { + try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(snapshotBytes))) { + return ShardDataTreeSnapshot.deserialize(in); + } } }