X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardSnapshotCohort.java;h=93e1d873579f0c02446b666f897cfcae640db525;hp=c59085d61c57961feb915077e996262b92e2ce17;hb=1d7e8fd9d781f630dee9dfb1b509067dd7fb9caa;hpb=f89552de4942d3709d6ee84415e672c6c7de489f diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index c59085d61c..93e1d87357 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -7,17 +7,25 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorContext; import akka.actor.ActorRef; -import java.util.concurrent.ExecutionException; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.OutputStream; +import java.util.Optional; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendType; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor; +import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; import org.slf4j.Logger; /** @@ -26,40 +34,49 @@ import org.slf4j.Logger; * @author Thomas Pantelis */ class ShardSnapshotCohort implements RaftActorSnapshotCohort { + private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply"); - private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); - - private int createSnapshotTransactionCounter; - private final ShardTransactionActorFactory transactionActorFactory; - private final InMemoryDOMDataStore store; - private final Logger log; + private final ActorRef snapshotActor; + private final ShardDataTree store; private final String logId; + private final Logger log; - ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store, - Logger log, String logId) { - this.transactionActorFactory = transactionActorFactory; - this.store = store; + private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor, + final ShardDataTree store, final Logger log, final String logId) { + this.snapshotActor = Preconditions.checkNotNull(snapshotActor); + this.store = Preconditions.checkNotNull(store); this.log = log; this.logId = logId; } - @Override - public void createSnapshot(ActorRef actorRef) { - // Create a transaction actor. We are really going to treat the transaction as a worker - // so that this actor does not get block building the snapshot. THe transaction actor will - // after processing the CreateSnapshot message. + static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName, + final ShardDataTree store, final Logger log, final String logId) { + final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create( + FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0); + final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read"; - ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier( - "createSnapshot" + ++createSnapshotTransactionCounter); + // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all + // requests. + final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName); - ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction( - TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION); + return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId); + } - createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef); + @Override + public void createSnapshot(final ActorRef actorRef, final Optional installSnapshotStream) { + // Forward the request to the snapshot actor + ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), installSnapshotStream, actorRef); } @Override - public void applySnapshot(byte[] snapshotBytes) { + @SuppressWarnings("checkstyle:IllegalCatch") + public void applySnapshot(final Snapshot.State snapshotState) { + if (!(snapshotState instanceof ShardSnapshotState)) { + log.debug("{}: applySnapshot ignoring snapshot: {}", snapshotState); + } + + final ShardDataTreeSnapshot snapshot = ((ShardSnapshotState)snapshotState).getSnapshot(); + // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower @@ -67,28 +84,19 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { log.info("{}: Applying snapshot", logId); try { - DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); - - NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); - - // delete everything first - transaction.delete(DATASTORE_ROOT); - - // Add everything from the remote node back - transaction.write(DATASTORE_ROOT, node); - syncCommitTransaction(transaction); - } catch (InterruptedException | ExecutionException e) { - log.error("{}: An exception occurred when applying snapshot", logId, e); - } finally { - log.info("{}: Done applying snapshot", logId); + store.applySnapshot(snapshot); + } catch (Exception e) { + log.error("{}: Failed to apply snapshot {}", logId, snapshot, e); + return; } + log.info("{}: Done applying snapshot", logId); } - void syncCommitTransaction(final DOMStoreWriteTransaction transaction) - throws ExecutionException, InterruptedException { - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); - commitCohort.preCommit().get(); - commitCohort.commit().get(); + @Override + public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException { + try (final ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) { + return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in)); + } } }