/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import scala.concurrent.duration.FiniteDuration;
/**
- * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
- * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
- * retry task after a short delay.
+ * Handles creation of TransactionContext instances for remote transactions. This class creates
+ * remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit,
+ * if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay.
* <p>
* The end result from a completed CreateTransaction message is a TransactionContext that is
* used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cache and executed once the CreateTransaction completes,
- * successfully or not.
+ * CreateTransaction completes are cache via a TransactionContextWrapper and executed once the
+ * CreateTransaction completes, successfully or not.
*/
-final class TransactionFutureCallback extends OnComplete<Object> {
- private static final Logger LOG = LoggerFactory.getLogger(TransactionFutureCallback.class);
+final class RemoteTransactionContextSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class);
/**
* Time interval in between transaction create retries.
*/
private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS);
- /**
- * The list of transaction operations to execute once the CreateTransaction completes.
- */
- @GuardedBy("txOperationsOnComplete")
- private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
- private final TransactionProxy proxy;
+ private final TransactionProxy parent;
private final String shardName;
- /**
- * The TransactionContext resulting from the CreateTransaction reply.
- */
- private volatile TransactionContext transactionContext;
-
/**
* The target primary shard.
*/
private volatile ActorSelection primaryShard;
private volatile int createTxTries;
- TransactionFutureCallback(final TransactionProxy proxy, final String shardName) {
- this.proxy = Preconditions.checkNotNull(proxy);
+ private final TransactionContextWrapper transactionContextAdapter;
+
+ RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextAdapter, final TransactionProxy parent,
+ final String shardName) {
+ this.parent = Preconditions.checkNotNull(parent);
this.shardName = shardName;
- createTxTries = (int) (proxy.getActorContext().getDatastoreContext().
+ this.transactionContextAdapter = transactionContextAdapter;
+ createTxTries = (int) (parent.getActorContext().getDatastoreContext().
getShardLeaderElectionTimeout().duration().toMillis() /
CREATE_TX_TRY_INTERVAL.toMillis());
}
return shardName;
}
- TransactionContext getTransactionContext() {
- return transactionContext;
- }
-
private TransactionType getTransactionType() {
- return proxy.getTransactionType();
- }
-
- private TransactionIdentifier getIdentifier() {
- return proxy.getIdentifier();
+ return parent.getType();
}
private ActorContext getActorContext() {
- return proxy.getActorContext();
+ return parent.getActorContext();
}
private Semaphore getOperationLimiter() {
- return proxy.getOperationLimiter();
+ return parent.getLimiter();
+ }
+
+ private TransactionIdentifier getIdentifier() {
+ return parent.getIdentifier();
}
/**
// For write-only Tx's we prepare the transaction modifications directly on the shard actor
// to avoid the overhead of creating a separate transaction actor.
// FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
- executeTxOperatonsOnComplete(proxy.createValidTransactionContext(this.primaryShard,
+ transactionContextAdapter.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard,
this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
} else {
tryCreateTransaction();
}
}
- /**
- * Adds a TransactionOperation to be executed after the CreateTransaction completes.
- */
- private void addTxOperationOnComplete(TransactionOperation operation) {
- boolean invokeOperation = true;
- synchronized(txOperationsOnComplete) {
- if(transactionContext == null) {
- LOG.debug("Tx {} Adding operation on complete", getIdentifier());
-
- invokeOperation = false;
- txOperationsOnComplete.add(operation);
- }
- }
-
- if(invokeOperation) {
- operation.invoke(transactionContext);
- }
- }
-
- void enqueueTransactionOperation(final TransactionOperation op) {
-
- if (transactionContext != null) {
- op.invoke(transactionContext);
- } else {
- // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
- // callback to be executed after the Tx is created.
- addTxOperationOnComplete(op);
- }
- }
-
/**
* Performs a CreateTransaction try async.
*/
}
Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
- getTransactionType().ordinal(), proxy.getTransactionChainId()).toSerializable();
+ getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable();
Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage);
- createTxFuture.onComplete(this, getActorContext().getClientDispatcher());
+ createTxFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ onCreateTransactionComplete(failure, response);
+ }
+ }, getActorContext().getClientDispatcher());
}
- @Override
- public void onComplete(Throwable failure, Object response) {
+ private void onCreateTransactionComplete(Throwable failure, Object response) {
if(failure instanceof NoShardLeaderException) {
// There's no leader for the shard yet - schedule and try again, unless we're out
// of retries. Note: createTxTries is volatile as it may be written by different
createTransactionContext(failure, response);
}
- void createTransactionContext(Throwable failure, Object response) {
- // Mainly checking for state violation here to perform a volatile read of "initialized" to
- // ensure updates to operationLimter et al are visible to this thread (ie we're doing
- // "piggy-back" synchronization here).
- proxy.ensureInitializied();
-
+ private void createTransactionContext(Throwable failure, Object response) {
// Create the TransactionContext from the response or failure. Store the new
// TransactionContext locally until we've completed invoking the
// TransactionOperations. This avoids thread timing issues which could cause
localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
}
- executeTxOperatonsOnComplete(localTransactionContext);
- }
-
- private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
- while(true) {
- // Access to txOperationsOnComplete and transactionContext must be protected and atomic
- // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
- // issues and ensure no TransactionOperation is missed and that they are processed
- // in the order they occurred.
-
- // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
- // in case a TransactionOperation results in another transaction operation being
- // queued (eg a put operation from a client read Future callback that is notified
- // synchronously).
- Collection<TransactionOperation> operationsBatch = null;
- synchronized(txOperationsOnComplete) {
- if(txOperationsOnComplete.isEmpty()) {
- // We're done invoking the TransactionOperations so we can now publish the
- // TransactionContext.
- transactionContext = localTransactionContext;
- break;
- }
-
- operationsBatch = new ArrayList<>(txOperationsOnComplete);
- txOperationsOnComplete.clear();
- }
-
- // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
- // A slight down-side is that we need to re-acquire the lock below but this should
- // be negligible.
- for(TransactionOperation oper: operationsBatch) {
- oper.invoke(localTransactionContext);
- }
- }
+ transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
}
private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
LOG.debug("Tx {} Received {}", getIdentifier(), reply);
- return proxy.createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
+ return createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
reply.getTransactionPath(), reply.getVersion());
}
-}
\ No newline at end of file
+ private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath,
+ short remoteTransactionVersion) {
+ // TxActor is always created where the leader of the shard is.
+ // Check if TxActor is created in the same node
+ boolean isTxActorLocal = getActorContext().isPathLocal(transactionPath);
+ final TransactionContext ret;
+
+ if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
+ ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
+ getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+ } else {
+ ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(),
+ isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+ }
+
+ TransactionContextCleanup.track(this, ret);
+ return ret;
+ }
+}
+