Deprecate 3PC protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
index 57749a1a736bc935da74222ca984d834083d6206..1e59f0d0970cd247ce4fa4b5c62c9ad1c2d0323d 100644 (file)
@@ -40,6 +40,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
     private final List<Future<ActorSelection>> cohortFutures;
     private volatile List<ActorSelection> cohorts;
     private final String transactionId;
+    private volatile OperationCallback commitOperationCallback;
 
     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
             List<Future<ActorSelection>> cohortFutures, String transactionId) {
@@ -63,7 +64,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
                 }
                 return null;
             }
-        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
+        }, TransactionReadyReplyMapper.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -110,7 +111,12 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
             return;
         }
 
-        final Object message = new CanCommitTransaction(transactionId).toSerializable();
+        commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
+            new TransactionRateLimitingCallback(actorContext);
+
+        commitOperationCallback.run();
+
+        final Object message = new CanCommitTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable();
 
         final Iterator<ActorSelection> iterator = cohorts.iterator();
 
@@ -122,11 +128,16 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
                     }
                     returnFuture.setException(failure);
+                    commitOperationCallback.failure();
                     return;
                 }
 
+                // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
+                // this means we'll only time the first transaction canCommit which should be fine.
+                commitOperationCallback.pause();
+
                 boolean result = true;
-                if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+                if (CanCommitTransactionReply.isSerializedType(response)) {
                     CanCommitTransactionReply reply =
                             CanCommitTransactionReply.fromSerializable(response);
                     if (!reply.getCanCommit()) {
@@ -185,17 +196,17 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
         // exception then that exception will supersede and suppress the original exception. But
         // it's the original exception that is the root cause and of more interest to the client.
 
-        return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
-                AbortTransactionReply.SERIALIZABLE_CLASS, false);
+        return voidOperation("abort", new AbortTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable(),
+                AbortTransactionReply.class, false);
     }
 
     @Override
     public ListenableFuture<Void> commit() {
-        OperationCallback operationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
-                new TransactionRateLimitingCallback(actorContext);
+        OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
+            OperationCallback.NO_OP_CALLBACK;
 
-        return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
-                CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
+        return voidOperation("commit", new CommitTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable(),
+                CommitTransactionReply.class, true, operationCallback);
     }
 
     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
@@ -250,7 +261,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
             LOG.debug("Tx {} finish {}", transactionId, operationName);
         }
 
-        callback.run();
+        callback.resume();
 
         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);