X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=7703f484c73e687391ff5dd9ffe1739afd201c0f;hp=239207a60ab58cbbbefc03ee150b9cc250230d86;hb=112e6b1bfeed9c7125a073d1015d05e31f006bbf;hpb=37036770b1cf71c888530adbda097feeea6cdf02 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 239207a60a..7703f484c7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -21,6 +21,14 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; @@ -34,6 +42,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -49,14 +58,6 @@ import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.concurrent.GuardedBy; /** * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard @@ -181,23 +182,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final TransactionType transactionType; private final ActorContext actorContext; private final TransactionIdentifier identifier; - private final TransactionChainProxy transactionChainProxy; + private final String transactionChainId; private final SchemaContext schemaContext; private boolean inReadyState; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { - this(actorContext, transactionType, null); + this(actorContext, transactionType, ""); } public TransactionProxy(ActorContext actorContext, TransactionType transactionType, - TransactionChainProxy transactionChainProxy) { + String transactionChainId) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null"); this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), "schemaContext should not be null"); - this.transactionChainProxy = transactionChainProxy; + this.transactionChainId = transactionChainId; String memberName = actorContext.getCurrentMemberName(); if(memberName == null){ @@ -220,7 +221,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { phantomReferenceCache.put(cleanup, cleanup); } - LOG.debug("Created txn {} of type {}", identifier, transactionType); + LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId); } @VisibleForTesting @@ -236,9 +237,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return recordedOperationFutures; } + @VisibleForTesting + boolean hasTransactionContext() { + for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + if(transactionContext != null) { + return true; + } + } + + return false; + } + @Override - public CheckedFuture>, ReadFailedException> read( - final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, "Read operation on write-only transaction is not allowed"); @@ -246,37 +258,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} read {}", identifier, path); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - - CheckedFuture>, ReadFailedException> future; - if(transactionContext != null) { - future = transactionContext.readData(path); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - final SettableFuture>> proxyFuture = SettableFuture.create(); - txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - Futures.addCallback(transactionContext.readData(path), - new FutureCallback>>() { - @Override - public void onSuccess(Optional> data) { - proxyFuture.set(data); - } - - @Override - public void onFailure(Throwable t) { - proxyFuture.setException(t); - } - }); - } - }); - - future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); - } - - return future; + return txFutureCallback.enqueueReadOperation(new ReadOperation>>() { + @Override + public CheckedFuture>, ReadFailedException> invoke( + TransactionContext transactionContext) { + return transactionContext.readData(path); + } + }); } @Override @@ -288,39 +276,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} exists {}", identifier, path); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - - CheckedFuture future; - if(transactionContext != null) { - future = transactionContext.dataExists(path); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - final SettableFuture proxyFuture = SettableFuture.create(); - txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - Futures.addCallback(transactionContext.dataExists(path), - new FutureCallback() { - @Override - public void onSuccess(Boolean exists) { - proxyFuture.set(exists); - } - - @Override - public void onFailure(Throwable t) { - proxyFuture.setException(t); - } - }); - } - }); - - future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); - } - - return future; + return txFutureCallback.enqueueReadOperation(new ReadOperation() { + @Override + public CheckedFuture invoke(TransactionContext transactionContext) { + return transactionContext.dataExists(path); + } + }); } + private void checkModificationState() { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed"); @@ -336,19 +300,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} write {}", identifier, path); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if(transactionContext != null) { - transactionContext.writeData(path, data); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.writeData(path, data); - } - }); - } + txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.writeData(path, data); + } + }); } @Override @@ -359,19 +316,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} merge {}", identifier, path); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if(transactionContext != null) { - transactionContext.mergeData(path, data); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.mergeData(path, data); - } - }); - } + txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.mergeData(path, data); + } + }); } @Override @@ -382,19 +332,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} delete {}", identifier, path); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if(transactionContext != null) { - transactionContext.deleteData(path); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.deleteData(path); - } - }); - } + txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.deleteData(path); + } + }); } @Override @@ -411,35 +354,45 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - LOG.debug("Tx {} Readying transaction for shard {}", identifier, - txFutureCallback.getShardName()); + LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier, + txFutureCallback.getShardName(), transactionChainId); - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if(transactionContext != null) { - cohortFutures.add(transactionContext.readyTransaction()); - } else { - // The shard Tx hasn't been created yet so create a promise to ready the Tx later - // after it's created. - final Promise cohortPromise = akka.dispatch.Futures.promise(); - txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - cohortPromise.completeWith(transactionContext.readyTransaction()); - } - }); + Future future = txFutureCallback.enqueueFutureOperation(new FutureOperation() { + @Override + public Future invoke(TransactionContext transactionContext) { + return transactionContext.readyTransaction(); + } + }); - cohortFutures.add(cohortPromise.future()); - } + cohortFutures.add(future); } - if(transactionChainProxy != null){ - transactionChainProxy.onTransactionReady(cohortFutures); - } + onTransactionReady(cohortFutures); return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, identifier.toString()); } + /** + * Method for derived classes to be notified when the transaction has been readied. + * + * @param cohortFutures the cohort Futures for each shard transaction. + */ + protected void onTransactionReady(List> cohortFutures) { + } + + /** + * Method called to send a CreateTransaction message to a shard. + * + * @param shard the shard actor to send to + * @param serializedCreateMessage the serialized message to send + * @return the response Future + */ + protected Future sendCreateTransaction(ActorSelection shard, + Object serializedCreateMessage) { + return actorContext.executeOperationAsync(shard, serializedCreateMessage); + } + @Override public Object getIdentifier() { return this.identifier; @@ -447,18 +400,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void close() { - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if(transactionContext != null) { - transactionContext.closeTransaction(); - } else { - txFutureCallback.addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.closeTransaction(); - } - }); - } + for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { + txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.closeTransaction(); + } + }); } txFutureCallbackMap.clear(); @@ -501,19 +449,34 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } public String getTransactionChainId() { - if(transactionChainProxy == null){ - return ""; - } - return transactionChainProxy.getTransactionChainId(); + return transactionChainId; + } + + protected ActorContext getActorContext() { + return actorContext; } /** - * Interface for a transaction operation to be invoked later. + * Interfaces for transaction operations to be invoked later. */ private static interface TransactionOperation { void invoke(TransactionContext transactionContext); } + /** + * This interface returns a Guava Future + */ + private static interface ReadOperation { + CheckedFuture invoke(TransactionContext transactionContext); + } + + /** + * This interface returns a Scala Future + */ + private static interface FutureOperation { + Future invoke(TransactionContext transactionContext); + } + /** * Implements a Future OnComplete callback for a CreateTransaction message. This class handles * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a @@ -586,11 +549,83 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } + + Future enqueueFutureOperation(final FutureOperation op) { + + Future future; + + if (transactionContext != null) { + future = op.invoke(transactionContext); + } else { + // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future + // callback to be executed after the Tx is created. + final Promise promise = akka.dispatch.Futures.promise(); + addTxOperationOnComplete(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(op.invoke(transactionContext)); + } + }); + + future = promise.future(); + } + + return future; + } + + CheckedFuture enqueueReadOperation(final ReadOperation op) { + + CheckedFuture future; + + if (transactionContext != null) { + future = op.invoke(transactionContext); + } else { + // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future + // callback to be executed after the Tx is created. + final SettableFuture proxyFuture = SettableFuture.create(); + addTxOperationOnComplete(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + Futures.addCallback(op.invoke(transactionContext), new FutureCallback() { + @Override + public void onSuccess(T data) { + proxyFuture.set(data); + } + + @Override + public void onFailure(Throwable t) { + proxyFuture.setException(t); + } + }); + } + }); + + future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); + } + + return future; + } + + void enqueueModifyOperation(final TransactionOperation op) { + + if (transactionContext != null) { + op.invoke(transactionContext); + } else { + // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future + // callback to be executed after the Tx is created. + addTxOperationOnComplete(op); + } + } + + + + + /** * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - Future createTxFuture = actorContext.executeOperationAsync(primaryShard, + Future createTxFuture = sendCreateTransaction(primaryShard, new CreateTransaction(identifier.toString(), TransactionProxy.this.transactionType.ordinal(), getTransactionChainId()).toSerializable()); @@ -625,29 +660,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // respect to #addTxOperationOnComplete to handle timing issues and ensure no // TransactionOperation is missed and that they are processed in the order they occurred. synchronized(txOperationsOnComplete) { + // Store the new TransactionContext locally until we've completed invoking the + // TransactionOperations. This avoids thread timing issues which could cause + // out-of-order TransactionOperations. Eg, on a modification operation, if the + // TransactionContext is non-null, then we directly call the TransactionContext. + // However, at the same time, the code may be executing the cached + // TransactionOperations. So to avoid thus timing, we don't publish the + // TransactionContext until after we've executed all cached TransactionOperations. + TransactionContext localTransactionContext; if(failure != null) { LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, failure.getMessage()); - transactionContext = new NoOpTransactionContext(failure, identifier); + localTransactionContext = new NoOpTransactionContext(failure, identifier); } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { - createValidTransactionContext(CreateTransactionReply.fromSerializable(response)); + localTransactionContext = createValidTransactionContext( + CreateTransactionReply.fromSerializable(response)); } else { IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); - transactionContext = new NoOpTransactionContext(exception, identifier); + localTransactionContext = new NoOpTransactionContext(exception, identifier); } for(TransactionOperation oper: txOperationsOnComplete) { - oper.invoke(transactionContext); + oper.invoke(localTransactionContext); } txOperationsOnComplete.clear(); + + // We're done invoking the TransactionOperations so we can now publish the + // TransactionContext. + transactionContext = localTransactionContext; } } - private void createValidTransactionContext(CreateTransactionReply reply) { + private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { String transactionPath = reply.getTransactionPath(); LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath); @@ -666,10 +714,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // TxActor is always created where the leader of the shard is. // Check if TxActor is created in the same node - boolean isTxActorLocal = actorContext.isLocalPath(transactionPath); + boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - transactionContext = new TransactionContextImpl(transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal); + return new TransactionContextImpl(transactionPath, transactionActor, identifier, + actorContext, schemaContext, isTxActorLocal, reply.getVersion()); } } @@ -712,23 +760,31 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final ActorContext actorContext; private final SchemaContext schemaContext; + private final String transactionPath; private final ActorSelection actor; private final boolean isTxActorLocal; + private final int remoteTransactionVersion; - private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, + private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, SchemaContext schemaContext, - boolean isTxActorLocal) { + boolean isTxActorLocal, int remoteTransactionVersion) { super(identifier); + this.transactionPath = transactionPath; this.actor = actor; this.actorContext = actorContext; this.schemaContext = schemaContext; this.isTxActorLocal = isTxActorLocal; + this.remoteTransactionVersion = remoteTransactionVersion; } private ActorSelection getActor() { return actor; } + private Future executeOperationAsync(SerializableMessage msg) { + return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()); + } + @Override public void closeTransaction() { LOG.debug("Tx {} closeTransaction called", identifier); @@ -743,9 +799,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Send the ReadyTransaction message to the Tx actor. - ReadyTransaction readyTransaction = new ReadyTransaction(); - final Future replyFuture = actorContext.executeOperationAsync(getActor(), - isTxActorLocal ? readyTransaction : readyTransaction.toSerializable()); + final Future replyFuture = executeOperationAsync(new ReadyTransaction()); // Combine all the previously recorded put/merge/delete operation reply Futures and the // ReadyTransactionReply Future into one Future. If any one fails then the combined @@ -783,7 +837,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply); - return actorContext.actorSelection(reply.getCohortPath()); + String cohortPath = reply.getCohortPath(); + + // In Helium we used to return the local path of the actor which represented + // a remote ThreePhaseCommitCohort. The local path would then be converted to + // a remote path using this resolvePath method. To maintain compatibility with + // a Helium node we need to continue to do this conversion. + // At some point in the future when upgrades from Helium are not supported + // we could remove this code to resolvePath and just use the cohortPath as the + // resolved cohortPath + if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) { + cohortPath = actorContext.resolvePath(transactionPath, cohortPath); + } + + return actorContext.actorSelection(cohortPath); } else { // Throwing an exception here will fail the Future. @@ -798,27 +865,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void deleteData(YangInstanceIdentifier path) { LOG.debug("Tx {} deleteData called path = {}", identifier, path); - DeleteData deleteData = new DeleteData(path); - recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), - isTxActorLocal ? deleteData : deleteData.toSerializable())); + recordedOperationFutures.add(executeOperationAsync(new DeleteData(path))); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} mergeData called path = {}", identifier, path); - MergeData mergeData = new MergeData(path, data, schemaContext); - recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), - isTxActorLocal ? mergeData : mergeData.toSerializable())); + recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data, schemaContext))); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} writeData called path = {}", identifier, path); - WriteData writeData = new WriteData(path, data, schemaContext); - recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), - isTxActorLocal ? writeData : writeData.toSerializable())); + recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data, schemaContext))); } @Override @@ -901,9 +962,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } }; - ReadData readData = new ReadData(path); - Future readFuture = actorContext.executeOperationAsync(getActor(), - isTxActorLocal ? readData : readData.toSerializable()); + Future readFuture = executeOperationAsync(new ReadData(path)); readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } @@ -985,9 +1044,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } }; - DataExists dataExists = new DataExists(path); - Future future = actorContext.executeOperationAsync(getActor(), - isTxActorLocal ? dataExists : dataExists.toSerializable()); + Future future = executeOperationAsync(new DataExists(path)); future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); }