import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
*/
public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
- public static enum TransactionType {
- READ_ONLY,
- WRITE_ONLY,
- READ_WRITE;
-
- // Cache all values
- private static final TransactionType[] VALUES = values();
-
- public static TransactionType fromInt(final int type) {
- try {
- return VALUES[type];
- } catch (IndexOutOfBoundsException e) {
- throw new IllegalArgumentException("In TransactionType enum value " + type, e);
- }
- }
- }
-
private static enum TransactionState {
OPEN,
READY,
private final TransactionType transactionType;
final ActorContext actorContext;
- private final String transactionChainId;
private final SchemaContext schemaContext;
private TransactionState state = TransactionState.OPEN;
}
public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
- super(createIdentifier(actorContext));
+ super(createIdentifier(actorContext, transactionChainId));
this.actorContext = Preconditions.checkNotNull(actorContext,
"actorContext should not be null");
this.transactionType = Preconditions.checkNotNull(transactionType,
"transactionType should not be null");
this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
"schemaContext should not be null");
- this.transactionChainId = transactionChainId;
LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
}
- private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
+ private static TransactionIdentifier createIdentifier(ActorContext actorContext, String transactionChainId) {
String memberName = actorContext.getCurrentMemberName();
if (memberName == null) {
memberName = "UNKNOWN-MEMBER";
}
- return new TransactionIdentifier(memberName, counter.getAndIncrement());
+ return TransactionIdentifier.create(memberName, counter.getAndIncrement(), transactionChainId);
}
@VisibleForTesting
return false;
}
- private boolean isRootPath(YangInstanceIdentifier path){
+ private static boolean isRootPath(YangInstanceIdentifier path) {
return !path.getPathArguments().iterator().hasNext();
}
}
@Override
- public AbstractThreePhaseCommitCohort ready() {
+ public AbstractThreePhaseCommitCohort<?> ready() {
Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
"Read-only transactions cannot be readied");
throttleOperation(txFutureCallbackMap.size());
+ final boolean isSingleShard = txFutureCallbackMap.size() == 1;
+ return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
+ TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
+
+ LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
+ txFutureCallback.getShardName(), getTransactionChainId());
+
+ final OperationCallback.Reference operationCallbackRef =
+ new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
+ final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ final Future future;
+ if (transactionContext != null) {
+ // avoid the creation of a promise and a TransactionOperation
+ future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
+ } else {
+ final Promise promise = akka.dispatch.Futures.promise();
+ txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
+ }
+ });
+ future = promise.future();
+ }
+
+ return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
+ }
+
+ private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
+ OperationCallback.Reference operationCallbackRef) {
+ if(transactionContext.supportsDirectCommit()) {
+ TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
+ operationCallbackRef.set(rateLimitingCallback);
+ rateLimitingCallback.run();
+ return transactionContext.directCommit();
+ } else {
+ return transactionContext.readyTransaction();
+ }
+ }
+
+ private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
- LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
- txFutureCallback.getShardName(), transactionChainId);
+ LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
+ txFutureCallback.getShardName(), getTransactionChainId());
final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
final Future<ActorSelection> future;
cohortFutures.add(future);
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
- getIdentifier().toString());
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
}
@Override
return ShardStrategyFactory.getStrategy(path).findShard(path);
}
- protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+ protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
return actorContext.findPrimaryShardAsync(shardName);
}
private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
if(txFutureCallback == null) {
- Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
+ Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
txFutureCallback = newTxFutureCallback;
txFutureCallbackMap.put(shardName, txFutureCallback);
- findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
- public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
if(failure != null) {
newTxFutureCallback.createTransactionContext(failure, null);
} else {
- newTxFutureCallback.setPrimaryShard(primaryShard);
+ newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
}
}
}, actorContext.getClientDispatcher());
}
String getTransactionChainId() {
- return transactionChainId;
+ return getIdentifier().getChainId();
}
protected ActorContext getActorContext() {
if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
- transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+ actorContext, isTxActorLocal, remoteTransactionVersion,
operationCompleter);
} else {
- return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
- actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+ return new TransactionContextImpl(transactionActor, getIdentifier(),
+ actorContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
}
}
}