*/
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
-import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Arrays;
import java.util.List;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.Logger;
/**
* A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit
* to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the
- * shard as an optimization. Otherwise the 3-phase commit to the shard is delegated to a
- * ThreePhaseCommitCohortProxy instance (this is for backwards compatibility with pre-Lithium versions).
+ * shard as an optimization.
*
* @author Thomas Pantelis
*/
private final ActorContext actorContext;
private final Future<Object> cohortFuture;
- private final String transactionId;
+ private final TransactionIdentifier transactionId;
private volatile DOMStoreThreePhaseCommitCohort delegateCohort = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
private final OperationCallback.Reference operationCallbackRef;
- SingleCommitCohortProxy(ActorContext actorContext, Future<Object> cohortFuture, String transactionId,
+ SingleCommitCohortProxy(ActorContext actorContext, Future<Object> cohortFuture, TransactionIdentifier transactionId,
OperationCallback.Reference operationCallbackRef) {
this.actorContext = actorContext;
this.cohortFuture = cohortFuture;
- this.transactionId = transactionId;
+ this.transactionId = Preconditions.checkNotNull(transactionId);
this.operationCallbackRef = operationCallbackRef;
}
cohortFuture.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object cohortResponse) {
- if(failure != null) {
+ if (failure != null) {
operationCallbackRef.get().failure();
returnFuture.setException(failure);
return;
operationCallbackRef.get().success();
- if(cohortResponse instanceof ActorSelection) {
- handlePreLithiumActorCohort((ActorSelection)cohortResponse, returnFuture);
- return;
- }
-
LOG.debug("Tx {} successfully completed direct commit", transactionId);
// The Future was the result of a direct commit to the shard, essentially eliding the
List<Future<Object>> getCohortFutures() {
return Arrays.asList(cohortFuture);
}
-
- private void handlePreLithiumActorCohort(ActorSelection actorSelection, final SettableFuture<Boolean> returnFuture) {
- // Handle backwards compatibility. An ActorSelection response would be returned from a
- // pre-Lithium version. In this case delegate to a ThreePhaseCommitCohortProxy.
- delegateCohort = new ThreePhaseCommitCohortProxy(actorContext,
- Arrays.asList(Futures.successful(actorSelection)), transactionId);
- com.google.common.util.concurrent.Futures.addCallback(delegateCohort.canCommit(), new FutureCallback<Boolean>() {
- @Override
- public void onSuccess(Boolean canCommit) {
- returnFuture.set(canCommit);
- }
-
- @Override
- public void onFailure(Throwable t) {
- returnFuture.setException(t);
- }
- });
- }
}