X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardSnapshotCohort.java;h=7812d70917757083dce3dace6d52b14d2c1994ec;hp=600509a26b87b480156f4baf843f2b887ed4f655;hb=b70b396725749d3fd6ca761f02f4b630f6f4f1ce;hpb=2b2517144e4eb9c17d9b41e9d9ec20d0264f5e12 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 600509a26b..7812d70917 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -7,12 +7,20 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Preconditions; +import akka.actor.ActorContext; import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.Optional; import java.util.concurrent.ExecutionException; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendType; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor; +import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -24,61 +32,69 @@ import org.slf4j.Logger; * @author Thomas Pantelis */ class ShardSnapshotCohort implements RaftActorSnapshotCohort { + private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply"); - private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); - - private int createSnapshotTransactionCounter; - private final ShardTransactionActorFactory transactionActorFactory; + private final LocalHistoryIdentifier applyHistoryId; + private final ActorRef snapshotActor; private final ShardDataTree store; - private final Logger log; private final String logId; + private final Logger log; + + private long applyCounter; - ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, ShardDataTree store, - Logger log, String logId) { - this.transactionActorFactory = transactionActorFactory; + private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor, + final ShardDataTree store, final Logger log, final String logId) { + this.applyHistoryId = Preconditions.checkNotNull(applyHistoryId); + this.snapshotActor = Preconditions.checkNotNull(snapshotActor); this.store = Preconditions.checkNotNull(store); this.log = log; this.logId = logId; } - @Override - public void createSnapshot(ActorRef actorRef) { - // Create a transaction actor. We are really going to treat the transaction as a worker - // so that this actor does not get block building the snapshot. THe transaction actor will - // after processing the CreateSnapshot message. + static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName, + final ShardDataTree store, final Logger log, final String logId) { + final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create( + FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0); + final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read"; - ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier( - "createSnapshot" + ++createSnapshotTransactionCounter); + // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all + // requests. + final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName); - ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction( - TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION); - - createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef); + return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId); } @Override - public void applySnapshot(byte[] snapshotBytes) { - // Since this will be done only on Recovery or when this actor is a Follower - // we can safely commit everything in here. We not need to worry about event notifications - // as they would have already been disabled on the follower - - log.info("{}: Applying snapshot", logId); + public void createSnapshot(final ActorRef actorRef) { + // Forward the request to the snapshot actor + ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeRecoverySnapshot(), actorRef); + } + private void deserializeAndApplySnapshot(final byte[] snapshotBytes) { + final ShardDataTreeSnapshot snapshot; try { - ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("snapshot-" + logId, null); + snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes); + } catch (IOException e) { + log.error("{}: Failed to deserialize snapshot", logId, e); + return; + } - NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + try { + final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction( + new TransactionIdentifier(applyHistoryId, applyCounter++)); // delete everything first - transaction.getSnapshot().delete(DATASTORE_ROOT); + transaction.getSnapshot().delete(YangInstanceIdentifier.EMPTY); - // Add everything from the remote node back - transaction.getSnapshot().write(DATASTORE_ROOT, node); - syncCommitTransaction(transaction); - } catch (InterruptedException | ExecutionException e) { + final Optional> maybeNode = snapshot.getRootNode(); + if (maybeNode.isPresent()) { + // Add everything from the remote node back + transaction.getSnapshot().write(YangInstanceIdentifier.EMPTY, maybeNode.get()); + } + + store.applyRecoveryTransaction(transaction); + } catch (Exception e) { log.error("{}: An exception occurred when applying snapshot", logId, e); - } finally { - log.info("{}: Done applying snapshot", logId); } } @@ -89,4 +105,15 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { commitCohort.preCommit().get(); commitCohort.commit().get(); } + + @Override + public void applySnapshot(final byte[] snapshotBytes) { + // Since this will be done only on Recovery or when this actor is a Follower + // we can safely commit everything in here. We not need to worry about event notifications + // as they would have already been disabled on the follower + + log.info("{}: Applying snapshot", logId); + deserializeAndApplySnapshot(snapshotBytes); + log.info("{}: Done applying snapshot", logId); + } }