Use Empty instead of Void in cohorts
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / SingleCommitCohortProxy.java
index e340859321c6ae7231fcc52b7b4e6ab80a9efd30..c099c45cdb912a172b87f434378bfb77fe7869f9 100644 (file)
@@ -7,16 +7,17 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
+import static java.util.Objects.requireNonNull;
+
 import akka.dispatch.OnComplete;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.Arrays;
 import java.util.List;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -24,25 +25,24 @@ import scala.concurrent.Future;
 /**
  * A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit
  * to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the
- * shard as an optimization. Otherwise the 3-phase commit to the shard is delegated to a
- * ThreePhaseCommitCohortProxy instance (this is for backwards compatibility with pre-Lithium versions).
+ * shard as an optimization.
  *
  * @author Thomas Pantelis
  */
 class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort<Object> {
     private static final Logger LOG = LoggerFactory.getLogger(SingleCommitCohortProxy.class);
 
-    private final ActorContext actorContext;
+    private final ActorUtils actorUtils;
     private final Future<Object> cohortFuture;
-    private final String transactionId;
+    private final TransactionIdentifier transactionId;
     private volatile DOMStoreThreePhaseCommitCohort delegateCohort = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
     private final OperationCallback.Reference operationCallbackRef;
 
-    SingleCommitCohortProxy(ActorContext actorContext, Future<Object> cohortFuture, String transactionId,
-            OperationCallback.Reference operationCallbackRef) {
-        this.actorContext = actorContext;
+    SingleCommitCohortProxy(final ActorUtils actorUtils, final Future<Object> cohortFuture,
+            final TransactionIdentifier transactionId, final OperationCallback.Reference operationCallbackRef) {
+        this.actorUtils = actorUtils;
         this.cohortFuture = cohortFuture;
-        this.transactionId = transactionId;
+        this.transactionId = requireNonNull(transactionId);
         this.operationCallbackRef = operationCallbackRef;
     }
 
@@ -52,10 +52,10 @@ class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort<Object> {
 
         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
 
-        cohortFuture.onComplete(new OnComplete<Object>() {
+        cohortFuture.onComplete(new OnComplete<>() {
             @Override
-            public void onComplete(Throwable failure, Object cohortResponse) {
-                if(failure != null) {
+            public void onComplete(final Throwable failure, final Object cohortResponse) {
+                if (failure != null) {
                     operationCallbackRef.get().failure();
                     returnFuture.setException(failure);
                     return;
@@ -63,11 +63,6 @@ class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort<Object> {
 
                 operationCallbackRef.get().success();
 
-                if(cohortResponse instanceof ActorSelection) {
-                    handlePreLithiumActorCohort((ActorSelection)cohortResponse, returnFuture);
-                    return;
-                }
-
                 LOG.debug("Tx {} successfully completed direct commit", transactionId);
 
                 // The Future was the result of a direct commit to the shard, essentially eliding the
@@ -77,46 +72,28 @@ class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort<Object> {
                 // immediate success, to complete the 3PC for the front-end.
                 returnFuture.set(Boolean.TRUE);
             }
-        }, actorContext.getClientDispatcher());
+        }, actorUtils.getClientDispatcher());
 
         return returnFuture;
     }
 
     @Override
-    public ListenableFuture<Void> preCommit() {
+    public ListenableFuture<Empty> preCommit() {
         return delegateCohort.preCommit();
     }
 
     @Override
-    public ListenableFuture<Void> abort() {
+    public ListenableFuture<Empty> abort() {
         return delegateCohort.abort();
     }
 
     @Override
-    public ListenableFuture<Void> commit() {
+    public ListenableFuture<? extends CommitInfo> commit() {
         return delegateCohort.commit();
     }
 
     @Override
     List<Future<Object>> getCohortFutures() {
-        return Arrays.asList(cohortFuture);
-    }
-
-    private void handlePreLithiumActorCohort(ActorSelection actorSelection, final SettableFuture<Boolean> returnFuture) {
-        // Handle backwards compatibility. An ActorSelection response would be returned from a
-        // pre-Lithium version. In this case delegate to a ThreePhaseCommitCohortProxy.
-        delegateCohort = new ThreePhaseCommitCohortProxy(actorContext,
-                Arrays.asList(Futures.successful(actorSelection)), transactionId);
-        com.google.common.util.concurrent.Futures.addCallback(delegateCohort.canCommit(), new FutureCallback<Boolean>() {
-            @Override
-            public void onSuccess(Boolean canCommit) {
-                returnFuture.set(canCommit);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                returnFuture.setException(t);
-            }
-        });
+        return List.of(cohortFuture);
     }
 }