*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Preconditions;
import akka.actor.ActorRef;
import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
private int createSnapshotTransactionCounter;
private final ShardTransactionActorFactory transactionActorFactory;
- private final InMemoryDOMDataStore store;
+ private final ShardDataTree store;
private final Logger log;
private final String logId;
- ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store,
+ ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
Logger log, String logId) {
this.transactionActorFactory = transactionActorFactory;
- this.store = store;
+ this.store = Preconditions.checkNotNull(store);
this.log = log;
this.logId = logId;
}
"createSnapshot" + ++createSnapshotTransactionCounter);
ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
- TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
+ TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
}
log.info("{}: Applying snapshot", logId);
try {
- DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+ ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("snapshot-" + logId, null);
NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
// delete everything first
- transaction.delete(DATASTORE_ROOT);
+ transaction.getSnapshot().delete(DATASTORE_ROOT);
// Add everything from the remote node back
- transaction.write(DATASTORE_ROOT, node);
+ transaction.getSnapshot().write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
log.error("{}: An exception occurred when applying snapshot", logId, e);
}
- void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+ void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction)
throws ExecutionException, InterruptedException {
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ ShardDataTreeCohort commitCohort = store.finishTransaction(transaction);
commitCohort.preCommit().get();
commitCohort.commit().get();
}