X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=f1ba4eabb999a6455688d93532877ae490df88df;hp=e5119cf299c4d5e8b10974c25e5f9f02cc12c7f4;hb=08dd5c2c443ff53f56af88a0e8dc8f34e36d2245;hpb=95589305be51630beab2f6b80c098ebf72bca4b9 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index e5119cf299..f1ba4eabb9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -70,7 +70,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public static enum TransactionType { READ_ONLY, WRITE_ONLY, - READ_WRITE + READ_WRITE; + + public static TransactionType fromInt(int type) { + if(type == WRITE_ONLY.ordinal()) { + return WRITE_ONLY; + } else if(type == READ_WRITE.ordinal()) { + return READ_WRITE; + } else if(type == READ_ONLY.ordinal()) { + return READ_ONLY; + } else { + throw new IllegalArgumentException("In TransactionType enum value" + type); + } + } } static final Mapper SAME_FAILURE_TRANSFORMER = @@ -381,6 +393,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(txFutureCallbackMap.size() == 0) { onTransactionReady(Collections.>emptyList()); + TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext); return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; } @@ -426,18 +439,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { protected void onTransactionReady(List> cohortFutures) { } - /** - * Method called to send a CreateTransaction message to a shard. - * - * @param shard the shard actor to send to - * @param serializedCreateMessage the serialized message to send - * @return the response Future - */ - protected Future sendCreateTransaction(ActorSelection shard, - Object serializedCreateMessage) { - return actorContext.executeOperationAsync(shard, serializedCreateMessage); - } - @Override public Object getIdentifier() { return this.identifier; @@ -466,14 +467,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return ShardStrategyFactory.getStrategy(path).findShard(path); } + protected Future sendFindPrimaryShardAsync(String shardName) { + return actorContext.findPrimaryShardAsync(shardName); + } + private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { String shardName = shardNameFromIdentifier(path); TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); if(txFutureCallback == null) { - Future findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName); + Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); - final TransactionFutureCallback newTxFutureCallback = - new TransactionFutureCallback(shardName); + final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName); txFutureCallback = newTxFutureCallback; txFutureCallbackMap.put(shardName, txFutureCallback); @@ -482,7 +486,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void onComplete(Throwable failure, ActorSelection primaryShard) { if(failure != null) { - newTxFutureCallback.onComplete(failure, null); + newTxFutureCallback.createTransactionContext(failure, null); } else { newTxFutureCallback.setPrimaryShard(primaryShard); } @@ -559,10 +563,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Sets the target primary shard and initiates a CreateTransaction try. */ void setPrimaryShard(ActorSelection primaryShard) { - LOG.debug("Tx {} Primary shard found - trying create transaction", identifier); - this.primaryShard = primaryShard; - tryCreateTransaction(); + + if(transactionType == TransactionType.WRITE_ONLY && + actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { + LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", + identifier, primaryShard); + + // For write-only Tx's we prepare the transaction modifications directly on the shard actor + // to avoid the overhead of creating a separate transaction actor. + // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow. + executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard, + this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION)); + } else { + tryCreateTransaction(); + } } /** @@ -572,7 +587,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { boolean invokeOperation = true; synchronized(txOperationsOnComplete) { if(transactionContext == null) { - LOG.debug("Tx {} Adding operation on complete {}", identifier); + LOG.debug("Tx {} Adding operation on complete", identifier); invokeOperation = false; txOperationsOnComplete.add(operation); @@ -599,10 +614,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - Future createTxFuture = sendCreateTransaction(primaryShard, - new CreateTransaction(identifier.toString(), - TransactionProxy.this.transactionType.ordinal(), - getTransactionChainId()).toSerializable()); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard); + } + + Object serializedCreateMessage = new CreateTransaction(identifier.toString(), + TransactionProxy.this.transactionType.ordinal(), + getTransactionChainId()).toSerializable(); + + Future createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage); createTxFuture.onComplete(this, actorContext.getClientDispatcher()); } @@ -629,6 +649,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } + createTransactionContext(failure, response); + } + + private void createTransactionContext(Throwable failure, Object response) { // Mainly checking for state violation here to perform a volatile read of "initialized" to // ensure updates to operationLimter et al are visible to this thread (ie we're doing // "piggy-back" synchronization here). @@ -644,8 +668,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // TransactionContext until after we've executed all cached TransactionOperations. TransactionContext localTransactionContext; if(failure != null) { - LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, - failure.getMessage()); + LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure); localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter); } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { @@ -695,11 +718,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { - String transactionPath = reply.getTransactionPath(); - LOG.debug("Tx {} Received {}", identifier, reply); - ActorSelection transactionActor = actorContext.actorSelection(transactionPath); + return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()), + reply.getTransactionPath(), reply.getVersion()); + } + + private TransactionContext createValidTransactionContext(ActorSelection transactionActor, + String transactionPath, short remoteTransactionVersion) { if (transactionType == TransactionType.READ_ONLY) { // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference @@ -728,12 +754,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Check if TxActor is created in the same node boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) { - return new TransactionContextImpl(transactionPath, transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); - } else { + if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, + operationCompleter); + } else if (transactionType == TransactionType.WRITE_ONLY && + actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { + return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId, + actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); + } else { + return new TransactionContextImpl(transactionActor, identifier, transactionChainId, + actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); } } }