/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorContext;
import akka.actor.ActorRef;
-import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
import org.slf4j.Logger;
/**
* @author Thomas Pantelis
*/
class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+ private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
- private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
-
- private int createSnapshotTransactionCounter;
- private final ShardTransactionActorFactory transactionActorFactory;
- private final InMemoryDOMDataStore store;
- private final Logger log;
+ private final ActorRef snapshotActor;
+ private final ShardDataTree store;
private final String logId;
+ private final Logger log;
- ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store,
- Logger log, String logId) {
- this.transactionActorFactory = transactionActorFactory;
- this.store = store;
+ private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor,
+ final ShardDataTree store, final Logger log, final String logId) {
+ this.snapshotActor = Preconditions.checkNotNull(snapshotActor);
+ this.store = Preconditions.checkNotNull(store);
this.log = log;
this.logId = logId;
}
- @Override
- public void createSnapshot(ActorRef actorRef) {
- // Create a transaction actor. We are really going to treat the transaction as a worker
- // so that this actor does not get block building the snapshot. THe transaction actor will
- // after processing the CreateSnapshot message.
+ static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName,
+ final ShardDataTree store, final Logger log, final String logId) {
+ final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+ FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
+ final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read";
- ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier(
- "createSnapshot" + ++createSnapshotTransactionCounter);
+ // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all
+ // requests.
+ final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName);
- ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
- TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
+ return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId);
+ }
+
+ @Override
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
+ // Forward the request to the snapshot actor
+ final ShardDataTreeSnapshot snapshot = store.takeStateSnapshot();
+ log.debug("{}: requesting serialization of snapshot {}", logId, snapshot);
- createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
+ ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot, installSnapshotStream, actorRef);
}
@Override
- public void applySnapshot(byte[] snapshotBytes) {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void applySnapshot(final Snapshot.State snapshotState) {
+ if (!(snapshotState instanceof ShardSnapshotState)) {
+ log.debug("{}: applySnapshot ignoring snapshot: {}", snapshotState);
+ }
+
+ final ShardDataTreeSnapshot snapshot = ((ShardSnapshotState)snapshotState).getSnapshot();
+
// Since this will be done only on Recovery or when this actor is a Follower
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
log.info("{}: Applying snapshot", logId);
try {
- DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-
- NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
- // delete everything first
- transaction.delete(DATASTORE_ROOT);
-
- // Add everything from the remote node back
- transaction.write(DATASTORE_ROOT, node);
- syncCommitTransaction(transaction);
- } catch (InterruptedException | ExecutionException e) {
- log.error("{}: An exception occurred when applying snapshot", logId, e);
- } finally {
- log.info("{}: Done applying snapshot", logId);
+ store.applySnapshot(snapshot);
+ } catch (Exception e) {
+ log.error("{}: Failed to apply snapshot {}", logId, snapshot, e);
+ return;
}
+ log.info("{}: Done applying snapshot", logId);
}
- void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
- throws ExecutionException, InterruptedException {
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
- commitCohort.preCommit().get();
- commitCohort.commit().get();
+ @Override
+ public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
+ try (ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) {
+ return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
+ }
}
}