X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=58b37be2a2727babd9b0305e868d85d90c079052;hp=f28a1e5f73d4823fe31ba20d14fa4d986f9700da;hb=6405fa8d6b47e406cdf566b26b15f980d802cad4;hpb=37f0504d391efd8b7d61403759fcc22a6dd3a093 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index f28a1e5f73..58b37be2a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -18,9 +18,9 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -255,14 +255,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); + final SettableFuture>> proxyFuture = SettableFuture.create(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - return txFutureCallback.enqueueReadOperation(new ReadOperation>>() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override - public CheckedFuture>, ReadFailedException> invoke( - TransactionContext transactionContext) { - return transactionContext.readData(path); + public void invoke(TransactionContext transactionContext) { + transactionContext.readData(path, proxyFuture); } }); + + return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } @Override @@ -275,15 +278,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); + final SettableFuture proxyFuture = SettableFuture.create(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - return txFutureCallback.enqueueReadOperation(new ReadOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override - public CheckedFuture invoke(TransactionContext transactionContext) { - return transactionContext.dataExists(path); + public void invoke(TransactionContext transactionContext) { + transactionContext.dataExists(path, proxyFuture); } }); - } + return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); + } private void checkModificationState() { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, @@ -298,7 +304,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private void throttleOperation(int acquirePermits) { try { - if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ + if(!operationLimiter.tryAcquire(acquirePermits, + actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); } } catch (InterruptedException e) { @@ -321,7 +328,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.writeData(path, data); @@ -339,7 +346,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.mergeData(path, data); @@ -357,7 +364,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.deleteData(path); @@ -384,12 +391,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier, txFutureCallback.getShardName(), transactionChainId); - Future future = txFutureCallback.enqueueFutureOperation(new FutureOperation() { - @Override - public Future invoke(TransactionContext transactionContext) { - return transactionContext.readyTransaction(); - } - }); + final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + final Future future; + if (transactionContext != null) { + // avoid the creation of a promise and a TransactionOperation + future = transactionContext.readyTransaction(); + } else { + final Promise promise = akka.dispatch.Futures.promise(); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(transactionContext.readyTransaction()); + } + }); + future = promise.future(); + } cohortFutures.add(future); } @@ -428,7 +444,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void close() { for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.closeTransaction(); @@ -469,7 +485,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { newTxFutureCallback.setPrimaryShard(primaryShard); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } return txFutureCallback; @@ -490,20 +506,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { void invoke(TransactionContext transactionContext); } - /** - * This interface returns a Guava Future - */ - private static interface ReadOperation { - CheckedFuture invoke(TransactionContext transactionContext); - } - - /** - * This interface returns a Scala Future - */ - private static interface FutureOperation { - Future invoke(TransactionContext transactionContext); - } - /** * Implements a Future OnComplete callback for a CreateTransaction message. This class handles * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a @@ -565,75 +567,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Adds a TransactionOperation to be executed after the CreateTransaction completes. */ void addTxOperationOnComplete(TransactionOperation operation) { + boolean invokeOperation = true; synchronized(txOperationsOnComplete) { if(transactionContext == null) { LOG.debug("Tx {} Adding operation on complete {}", identifier); + invokeOperation = false; txOperationsOnComplete.add(operation); - } else { - operation.invoke(transactionContext); } } - } - - - Future enqueueFutureOperation(final FutureOperation op) { - - Future future; - - if (transactionContext != null) { - future = op.invoke(transactionContext); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - final Promise promise = akka.dispatch.Futures.promise(); - addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(op.invoke(transactionContext)); - } - }); - - future = promise.future(); - } - - return future; - } - - CheckedFuture enqueueReadOperation(final ReadOperation op) { - - CheckedFuture future; - - if (transactionContext != null) { - future = op.invoke(transactionContext); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - final SettableFuture proxyFuture = SettableFuture.create(); - addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - Futures.addCallback(op.invoke(transactionContext), new FutureCallback() { - @Override - public void onSuccess(T data) { - proxyFuture.set(data); - } - - @Override - public void onFailure(Throwable t) { - proxyFuture.setException(t); - } - }); - } - }); - future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); + if(invokeOperation) { + operation.invoke(transactionContext); } - - return future; } - void enqueueModifyOperation(final TransactionOperation op) { + void enqueueTransactionOperation(final TransactionOperation op) { if (transactionContext != null) { op.invoke(transactionContext); @@ -653,7 +602,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { TransactionProxy.this.transactionType.ordinal(), getTransactionChainId()).toSerializable()); - createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher()); + createTxFuture.onComplete(this, actorContext.getClientDispatcher()); } @Override @@ -673,55 +622,75 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void run() { tryCreateTransaction(); } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); return; } } - // Create the TransactionContext from the response or failure and execute delayed - // TransactionOperations. This entire section is done atomically (ie synchronized) with - // respect to #addTxOperationOnComplete to handle timing issues and ensure no - // TransactionOperation is missed and that they are processed in the order they occurred. - synchronized(txOperationsOnComplete) { - // Store the new TransactionContext locally until we've completed invoking the - // TransactionOperations. This avoids thread timing issues which could cause - // out-of-order TransactionOperations. Eg, on a modification operation, if the - // TransactionContext is non-null, then we directly call the TransactionContext. - // However, at the same time, the code may be executing the cached - // TransactionOperations. So to avoid thus timing, we don't publish the - // TransactionContext until after we've executed all cached TransactionOperations. - TransactionContext localTransactionContext; - if(failure != null) { - LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, - failure.getMessage()); - - localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter); - } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { - localTransactionContext = createValidTransactionContext( - CreateTransactionReply.fromSerializable(response)); - } else { - IllegalArgumentException exception = new IllegalArgumentException(String.format( + // Create the TransactionContext from the response or failure. Store the new + // TransactionContext locally until we've completed invoking the + // TransactionOperations. This avoids thread timing issues which could cause + // out-of-order TransactionOperations. Eg, on a modification operation, if the + // TransactionContext is non-null, then we directly call the TransactionContext. + // However, at the same time, the code may be executing the cached + // TransactionOperations. So to avoid thus timing, we don't publish the + // TransactionContext until after we've executed all cached TransactionOperations. + TransactionContext localTransactionContext; + if(failure != null) { + LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, + failure.getMessage()); + + localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter); + } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { + localTransactionContext = createValidTransactionContext( + CreateTransactionReply.fromSerializable(response)); + } else { + IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); - localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter); + localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter); + } + + executeTxOperatonsOnComplete(localTransactionContext); + } + + private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) { + while(true) { + // Access to txOperationsOnComplete and transactionContext must be protected and atomic + // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing + // issues and ensure no TransactionOperation is missed and that they are processed + // in the order they occurred. + + // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy + // in case a TransactionOperation results in another transaction operation being + // queued (eg a put operation from a client read Future callback that is notified + // synchronously). + Collection operationsBatch = null; + synchronized(txOperationsOnComplete) { + if(txOperationsOnComplete.isEmpty()) { + // We're done invoking the TransactionOperations so we can now publish the + // TransactionContext. + transactionContext = localTransactionContext; + break; + } + + operationsBatch = new ArrayList<>(txOperationsOnComplete); + txOperationsOnComplete.clear(); } - for(TransactionOperation oper: txOperationsOnComplete) { + // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. + // A slight down-side is that we need to re-acquire the lock below but this should + // be negligible. + for(TransactionOperation oper: operationsBatch) { oper.invoke(localTransactionContext); } - - txOperationsOnComplete.clear(); - - // We're done invoking the TransactionOperations so we can now publish the - // TransactionContext. - transactionContext = localTransactionContext; } } private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { String transactionPath = reply.getTransactionPath(); - LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath); + LOG.debug("Tx {} Received {}", identifier, reply); ActorSelection transactionActor = actorContext.actorSelection(transactionPath); @@ -739,8 +708,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Check if TxActor is created in the same node boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - return new TransactionContextImpl(transactionPath, transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) { + return new TransactionContextImpl(transactionPath, transactionActor, identifier, + actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + } else { + return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier, + actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + } } } }