BUG-5280: implement message queueing
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardSnapshotCohort.java
index c59085d61c57961feb915077e996262b92e2ce17..d3d840923bd5bac4d7b599f05c7a3629a2fc5344 100644 (file)
@@ -8,14 +8,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
 import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
@@ -26,21 +29,30 @@ import org.slf4j.Logger;
  * @author Thomas Pantelis
  */
 class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+    private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
+    private static final FrontendType SNAPSHOT_READ = FrontendType.forName("snapshot-read");
 
-    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
-
-    private int createSnapshotTransactionCounter;
     private final ShardTransactionActorFactory transactionActorFactory;
-    private final InMemoryDOMDataStore store;
-    private final Logger log;
+    private final LocalHistoryIdentifier applyHistoryId;
+    private final LocalHistoryIdentifier readHistoryId;
+    private final ShardDataTree store;
     private final String logId;
+    private final Logger log;
 
-    ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store,
+    private long applyCounter;
+    private long readCounter;
+
+    ShardSnapshotCohort(MemberName memberName, ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
             Logger log, String logId) {
-        this.transactionActorFactory = transactionActorFactory;
-        this.store = store;
+        this.transactionActorFactory = Preconditions.checkNotNull(transactionActorFactory);
+        this.store = Preconditions.checkNotNull(store);
         this.log = log;
         this.logId = logId;
+
+        this.applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+            FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
+        this.readHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+            FrontendIdentifier.create(memberName, SNAPSHOT_READ), 0), 0);
     }
 
     @Override
@@ -49,11 +61,8 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
         // so that this actor does not get block building the snapshot. THe transaction actor will
         // after processing the CreateSnapshot message.
 
-        ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier(
-                "createSnapshot" + ++createSnapshotTransactionCounter);
-
         ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
-                TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
+                TransactionType.READ_ONLY, new TransactionIdentifier(readHistoryId, readCounter++));
 
         createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
     }
@@ -67,15 +76,16 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
         log.info("{}: Applying snapshot", logId);
 
         try {
-            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+            ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(
+                new TransactionIdentifier(applyHistoryId, applyCounter++));
 
             NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
             // delete everything first
-            transaction.delete(DATASTORE_ROOT);
+            transaction.getSnapshot().delete(YangInstanceIdentifier.EMPTY);
 
             // Add everything from the remote node back
-            transaction.write(DATASTORE_ROOT, node);
+            transaction.getSnapshot().write(YangInstanceIdentifier.EMPTY, node);
             syncCommitTransaction(transaction);
         } catch (InterruptedException | ExecutionException e) {
             log.error("{}: An exception occurred when applying snapshot", logId, e);
@@ -85,9 +95,9 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
 
     }
 
-    void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+    void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction)
             throws ExecutionException, InterruptedException {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        ShardDataTreeCohort commitCohort = store.finishTransaction(transaction);
         commitCohort.preCommit().get();
         commitCohort.commit().get();
     }