*/
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
+import static java.util.Objects.requireNonNull;
+
import akka.dispatch.OnComplete;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Arrays;
import java.util.List;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
/**
* A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit
* to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the
- * shard as an optimization. Otherwise the 3-phase commit to the shard is delegated to a
- * ThreePhaseCommitCohortProxy instance (this is for backwards compatibility with pre-Lithium versions).
+ * shard as an optimization.
*
* @author Thomas Pantelis
*/
class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort<Object> {
private static final Logger LOG = LoggerFactory.getLogger(SingleCommitCohortProxy.class);
- private final ActorContext actorContext;
+ private final ActorUtils actorUtils;
private final Future<Object> cohortFuture;
- private final String transactionId;
+ private final TransactionIdentifier transactionId;
private volatile DOMStoreThreePhaseCommitCohort delegateCohort = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
private final OperationCallback.Reference operationCallbackRef;
- SingleCommitCohortProxy(ActorContext actorContext, Future<Object> cohortFuture, String transactionId,
+ SingleCommitCohortProxy(ActorUtils actorUtils, Future<Object> cohortFuture, TransactionIdentifier transactionId,
OperationCallback.Reference operationCallbackRef) {
- this.actorContext = actorContext;
+ this.actorUtils = actorUtils;
this.cohortFuture = cohortFuture;
- this.transactionId = transactionId;
+ this.transactionId = requireNonNull(transactionId);
this.operationCallbackRef = operationCallbackRef;
}
cohortFuture.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object cohortResponse) {
- if(failure != null) {
+ if (failure != null) {
operationCallbackRef.get().failure();
returnFuture.setException(failure);
return;
operationCallbackRef.get().success();
- if(cohortResponse instanceof ActorSelection) {
- handlePreLithiumActorCohort((ActorSelection)cohortResponse, returnFuture);
- return;
- }
-
LOG.debug("Tx {} successfully completed direct commit", transactionId);
// The Future was the result of a direct commit to the shard, essentially eliding the
// immediate success, to complete the 3PC for the front-end.
returnFuture.set(Boolean.TRUE);
}
- }, actorContext.getClientDispatcher());
+ }, actorUtils.getClientDispatcher());
return returnFuture;
}
List<Future<Object>> getCohortFutures() {
return Arrays.asList(cohortFuture);
}
-
- private void handlePreLithiumActorCohort(ActorSelection actorSelection, final SettableFuture<Boolean> returnFuture) {
- // Handle backwards compatibility. An ActorSelection response would be returned from a
- // pre-Lithium version. In this case delegate to a ThreePhaseCommitCohortProxy.
- delegateCohort = new ThreePhaseCommitCohortProxy(actorContext,
- Arrays.asList(Futures.successful(actorSelection)), transactionId);
- com.google.common.util.concurrent.Futures.addCallback(delegateCohort.canCommit(), new FutureCallback<Boolean>() {
- @Override
- public void onSuccess(Boolean canCommit) {
- returnFuture.set(canCommit);
- }
-
- @Override
- public void onFailure(Throwable t) {
- returnFuture.setException(t);
- }
- });
- }
}