X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardSnapshotCohort.java;h=8bef15bbbaea1b663ca376359f602e7839b9a562;hb=a47dd7a5d21ca68804a6d0e2e3ca765f223c2ef4;hp=600509a26b87b480156f4baf843f2b887ed4f655;hpb=e20fff4d018e95cefd1934d2be31e5cd692fe7fa;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 600509a26b..8bef15bbba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -7,12 +7,19 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Preconditions; +import akka.actor.ActorContext; import akka.actor.ActorRef; -import java.util.concurrent.ExecutionException; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.Optional; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendType; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor; +import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -24,69 +31,81 @@ import org.slf4j.Logger; * @author Thomas Pantelis */ class ShardSnapshotCohort implements RaftActorSnapshotCohort { + private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply"); - private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); - - private int createSnapshotTransactionCounter; - private final ShardTransactionActorFactory transactionActorFactory; + private final LocalHistoryIdentifier applyHistoryId; + private final ActorRef snapshotActor; private final ShardDataTree store; - private final Logger log; private final String logId; + private final Logger log; + + private long applyCounter; - ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, ShardDataTree store, - Logger log, String logId) { - this.transactionActorFactory = transactionActorFactory; + private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor, + final ShardDataTree store, final Logger log, final String logId) { + this.applyHistoryId = Preconditions.checkNotNull(applyHistoryId); + this.snapshotActor = Preconditions.checkNotNull(snapshotActor); this.store = Preconditions.checkNotNull(store); this.log = log; this.logId = logId; } - @Override - public void createSnapshot(ActorRef actorRef) { - // Create a transaction actor. We are really going to treat the transaction as a worker - // so that this actor does not get block building the snapshot. THe transaction actor will - // after processing the CreateSnapshot message. - - ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier( - "createSnapshot" + ++createSnapshotTransactionCounter); + static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName, + final ShardDataTree store, final Logger log, final String logId) { + final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create( + FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0); + final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read"; - ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction( - TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION); + // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all + // requests. + final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName); - createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef); + return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId); } @Override - public void applySnapshot(byte[] snapshotBytes) { - // Since this will be done only on Recovery or when this actor is a Follower - // we can safely commit everything in here. We not need to worry about event notifications - // as they would have already been disabled on the follower - - log.info("{}: Applying snapshot", logId); + public void createSnapshot(final ActorRef actorRef) { + // Forward the request to the snapshot actor + ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeRecoverySnapshot(), actorRef); + } + private void deserializeAndApplySnapshot(final byte[] snapshotBytes) { + final ShardDataTreeSnapshot snapshot; try { - ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("snapshot-" + logId, null); + snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes); + } catch (IOException e) { + log.error("{}: Failed to deserialize snapshot", logId, e); + return; + } - NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + try { + final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction( + new TransactionIdentifier(applyHistoryId, applyCounter++)); // delete everything first - transaction.getSnapshot().delete(DATASTORE_ROOT); + transaction.getSnapshot().delete(YangInstanceIdentifier.EMPTY); + + final Optional> maybeNode = snapshot.getRootNode(); + if (maybeNode.isPresent()) { + // Add everything from the remote node back + transaction.getSnapshot().write(YangInstanceIdentifier.EMPTY, maybeNode.get()); + } - // Add everything from the remote node back - transaction.getSnapshot().write(DATASTORE_ROOT, node); - syncCommitTransaction(transaction); - } catch (InterruptedException | ExecutionException e) { + store.applyRecoveryTransaction(transaction); + } catch (Exception e) { log.error("{}: An exception occurred when applying snapshot", logId, e); - } finally { - log.info("{}: Done applying snapshot", logId); } } - void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction) - throws ExecutionException, InterruptedException { - ShardDataTreeCohort commitCohort = store.finishTransaction(transaction); - commitCohort.preCommit().get(); - commitCohort.commit().get(); + @Override + public void applySnapshot(final byte[] snapshotBytes) { + // Since this will be done only on Recovery or when this actor is a Follower + // we can safely commit everything in here. We not need to worry about event notifications + // as they would have already been disabled on the follower + + log.info("{}: Applying snapshot", logId); + deserializeAndApplySnapshot(snapshotBytes); + log.info("{}: Done applying snapshot", logId); } }