BUG-5280: implement message queueing
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardSnapshotCohort.java
index e49aa1e4f37d484d25439fd009affdeeba3a3cd8..d3d840923bd5bac4d7b599f05c7a3629a2fc5344 100644 (file)
@@ -10,7 +10,12 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
@@ -24,19 +29,30 @@ import org.slf4j.Logger;
  * @author Thomas Pantelis
  */
 class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+    private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
+    private static final FrontendType SNAPSHOT_READ = FrontendType.forName("snapshot-read");
+
     private final ShardTransactionActorFactory transactionActorFactory;
+    private final LocalHistoryIdentifier applyHistoryId;
+    private final LocalHistoryIdentifier readHistoryId;
     private final ShardDataTree store;
     private final String logId;
     private final Logger log;
 
-    private int createSnapshotTransactionCounter;
+    private long applyCounter;
+    private long readCounter;
 
-    ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
+    ShardSnapshotCohort(MemberName memberName, ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
             Logger log, String logId) {
-        this.transactionActorFactory = transactionActorFactory;
+        this.transactionActorFactory = Preconditions.checkNotNull(transactionActorFactory);
         this.store = Preconditions.checkNotNull(store);
         this.log = log;
         this.logId = logId;
+
+        this.applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+            FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
+        this.readHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+            FrontendIdentifier.create(memberName, SNAPSHOT_READ), 0), 0);
     }
 
     @Override
@@ -45,11 +61,8 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
         // so that this actor does not get block building the snapshot. THe transaction actor will
         // after processing the CreateSnapshot message.
 
-        ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier(
-                "createSnapshot" + ++createSnapshotTransactionCounter);
-
         ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
-                TransactionType.READ_ONLY, transactionID, "");
+                TransactionType.READ_ONLY, new TransactionIdentifier(readHistoryId, readCounter++));
 
         createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
     }
@@ -63,7 +76,8 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
         log.info("{}: Applying snapshot", logId);
 
         try {
-            ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("snapshot-" + logId, null);
+            ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(
+                new TransactionIdentifier(applyHistoryId, applyCounter++));
 
             NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);