BUG-5280: implement message queueing
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardSnapshotCohort.java
index c4ac727cb3b59fa3efa77b2ef3ccf7427c2a81a7..d3d840923bd5bac4d7b599f05c7a3629a2fc5344 100644 (file)
@@ -7,10 +7,15 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Preconditions;
 import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
 import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
@@ -24,21 +29,30 @@ import org.slf4j.Logger;
  * @author Thomas Pantelis
  */
 class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+    private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
+    private static final FrontendType SNAPSHOT_READ = FrontendType.forName("snapshot-read");
 
-    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
-
-    private int createSnapshotTransactionCounter;
     private final ShardTransactionActorFactory transactionActorFactory;
+    private final LocalHistoryIdentifier applyHistoryId;
+    private final LocalHistoryIdentifier readHistoryId;
     private final ShardDataTree store;
-    private final Logger log;
     private final String logId;
+    private final Logger log;
+
+    private long applyCounter;
+    private long readCounter;
 
-    ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
+    ShardSnapshotCohort(MemberName memberName, ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
             Logger log, String logId) {
-        this.transactionActorFactory = transactionActorFactory;
+        this.transactionActorFactory = Preconditions.checkNotNull(transactionActorFactory);
         this.store = Preconditions.checkNotNull(store);
         this.log = log;
         this.logId = logId;
+
+        this.applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+            FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
+        this.readHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+            FrontendIdentifier.create(memberName, SNAPSHOT_READ), 0), 0);
     }
 
     @Override
@@ -47,11 +61,8 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
         // so that this actor does not get block building the snapshot. THe transaction actor will
         // after processing the CreateSnapshot message.
 
-        ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier(
-                "createSnapshot" + ++createSnapshotTransactionCounter);
-
         ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
-                TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
+                TransactionType.READ_ONLY, new TransactionIdentifier(readHistoryId, readCounter++));
 
         createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
     }
@@ -65,15 +76,16 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
         log.info("{}: Applying snapshot", logId);
 
         try {
-            ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("snapshot-" + logId, null);
+            ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(
+                new TransactionIdentifier(applyHistoryId, applyCounter++));
 
             NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
             // delete everything first
-            transaction.getSnapshot().delete(DATASTORE_ROOT);
+            transaction.getSnapshot().delete(YangInstanceIdentifier.EMPTY);
 
             // Add everything from the remote node back
-            transaction.getSnapshot().write(DATASTORE_ROOT, node);
+            transaction.getSnapshot().write(YangInstanceIdentifier.EMPTY, node);
             syncCommitTransaction(transaction);
         } catch (InterruptedException | ExecutionException e) {
             log.error("{}: An exception occurred when applying snapshot", logId, e);