X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=58b37be2a2727babd9b0305e868d85d90c079052;hp=d79cd6f69f4b0e3e4f1171a332b45c36adbbd515;hb=6405fa8d6b47e406cdf566b26b15f980d802cad4;hpb=4a2e88ee3ff4d07a980083586328d3a52639e204 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index d79cd6f69f..58b37be2a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -18,8 +18,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; @@ -257,14 +255,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); + final SettableFuture>> proxyFuture = SettableFuture.create(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - return txFutureCallback.enqueueReadOperation(new ReadOperation>>() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override - public CheckedFuture>, ReadFailedException> invoke( - TransactionContext transactionContext) { - return transactionContext.readData(path); + public void invoke(TransactionContext transactionContext) { + transactionContext.readData(path, proxyFuture); } }); + + return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } @Override @@ -277,15 +278,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); + final SettableFuture proxyFuture = SettableFuture.create(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - return txFutureCallback.enqueueReadOperation(new ReadOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override - public CheckedFuture invoke(TransactionContext transactionContext) { - return transactionContext.dataExists(path); + public void invoke(TransactionContext transactionContext) { + transactionContext.dataExists(path, proxyFuture); } }); - } + return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); + } private void checkModificationState() { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, @@ -300,7 +304,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private void throttleOperation(int acquirePermits) { try { - if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ + if(!operationLimiter.tryAcquire(acquirePermits, + actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); } } catch (InterruptedException e) { @@ -323,7 +328,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.writeData(path, data); @@ -341,7 +346,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.mergeData(path, data); @@ -359,7 +364,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.deleteData(path); @@ -386,12 +391,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier, txFutureCallback.getShardName(), transactionChainId); - Future future = txFutureCallback.enqueueFutureOperation(new FutureOperation() { - @Override - public Future invoke(TransactionContext transactionContext) { - return transactionContext.readyTransaction(); - } - }); + final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + final Future future; + if (transactionContext != null) { + // avoid the creation of a promise and a TransactionOperation + future = transactionContext.readyTransaction(); + } else { + final Promise promise = akka.dispatch.Futures.promise(); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(transactionContext.readyTransaction()); + } + }); + future = promise.future(); + } cohortFutures.add(future); } @@ -430,7 +444,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void close() { for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.closeTransaction(); @@ -471,7 +485,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { newTxFutureCallback.setPrimaryShard(primaryShard); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } return txFutureCallback; @@ -492,20 +506,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { void invoke(TransactionContext transactionContext); } - /** - * This interface returns a Guava Future - */ - private static interface ReadOperation { - CheckedFuture invoke(TransactionContext transactionContext); - } - - /** - * This interface returns a Scala Future - */ - private static interface FutureOperation { - Future invoke(TransactionContext transactionContext); - } - /** * Implements a Future OnComplete callback for a CreateTransaction message. This class handles * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a @@ -582,64 +582,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - - Future enqueueFutureOperation(final FutureOperation op) { - - Future future; - - if (transactionContext != null) { - future = op.invoke(transactionContext); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - final Promise promise = akka.dispatch.Futures.promise(); - addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(op.invoke(transactionContext)); - } - }); - - future = promise.future(); - } - - return future; - } - - CheckedFuture enqueueReadOperation(final ReadOperation op) { - - CheckedFuture future; - - if (transactionContext != null) { - future = op.invoke(transactionContext); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - final SettableFuture proxyFuture = SettableFuture.create(); - addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - Futures.addCallback(op.invoke(transactionContext), new FutureCallback() { - @Override - public void onSuccess(T data) { - proxyFuture.set(data); - } - - @Override - public void onFailure(Throwable t) { - proxyFuture.setException(t); - } - }); - } - }); - - future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); - } - - return future; - } - - void enqueueModifyOperation(final TransactionOperation op) { + void enqueueTransactionOperation(final TransactionOperation op) { if (transactionContext != null) { op.invoke(transactionContext); @@ -659,7 +602,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { TransactionProxy.this.transactionType.ordinal(), getTransactionChainId()).toSerializable()); - createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher()); + createTxFuture.onComplete(this, actorContext.getClientDispatcher()); } @Override @@ -679,7 +622,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void run() { tryCreateTransaction(); } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); return; } } @@ -747,7 +690,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { String transactionPath = reply.getTransactionPath(); - LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath); + LOG.debug("Tx {} Received {}", identifier, reply); ActorSelection transactionActor = actorContext.actorSelection(transactionPath); @@ -765,8 +708,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Check if TxActor is created in the same node boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - return new TransactionContextImpl(transactionPath, transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) { + return new TransactionContextImpl(transactionPath, transactionActor, identifier, + actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + } else { + return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier, + actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + } } } }