X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=97a9ff0bf379ef3b8f6568ed37603ed285ba35a5;hb=9f61e98b036119694dfef0759a7cafc56aae6e86;hp=f447f3c718980285d32e79c5a9b2579e26309971;hpb=758e3e3b16e73298221a78872149814baf735c7d;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index f447f3c718..97a9ff0bf3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -71,6 +71,11 @@ import java.util.concurrent.atomic.AtomicLong; *

*/ public class TransactionProxy implements DOMStoreReadWriteTransaction { + + private final TransactionChainProxy transactionChainProxy; + + + public enum TransactionType { READ_ONLY, WRITE_ONLY, @@ -177,12 +182,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private boolean inReadyState; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { + this(actorContext, transactionType, null); + } + + @VisibleForTesting + List> getRecordedOperationFutures() { + List> recordedOperationFutures = Lists.newArrayList(); + for(TransactionContext transactionContext : remoteTransactionPaths.values()) { + recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures()); + } + + return recordedOperationFutures; + } + + public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) { this.actorContext = Preconditions.checkNotNull(actorContext, - "actorContext should not be null"); + "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, - "transactionType should not be null"); + "transactionType should not be null"); this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), - "schemaContext should not be null"); + "schemaContext should not be null"); + this.transactionChainProxy = transactionChainProxy; String memberName = actorContext.getCurrentMemberName(); if(memberName == null){ @@ -190,7 +210,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } this.identifier = TransactionIdentifier.builder().memberName(memberName).counter( - counter.getAndIncrement()).build(); + counter.getAndIncrement()).build(); if(transactionType == TransactionType.READ_ONLY) { // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference @@ -201,23 +221,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { remoteTransactionActorsMB = new AtomicBoolean(); TransactionProxyCleanupPhantomReference cleanup = - new TransactionProxyCleanupPhantomReference(this); + new TransactionProxyCleanupPhantomReference(this); phantomReferenceCache.put(cleanup, cleanup); } LOG.debug("Created txn {} of type {}", identifier, transactionType); } - @VisibleForTesting - List> getRecordedOperationFutures() { - List> recordedOperationFutures = Lists.newArrayList(); - for(TransactionContext transactionContext : remoteTransactionPaths.values()) { - recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures()); - } - - return recordedOperationFutures; - } - @Override public CheckedFuture>, ReadFailedException> read( final YangInstanceIdentifier path) { @@ -249,7 +259,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed"); Preconditions.checkState(!inReadyState, - "Transaction is sealed - further modifications are allowed"); + "Transaction is sealed - further modifications are not allowed"); } @Override @@ -308,6 +318,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { cohortPathFutures.add(transactionContext.readyTransaction()); } + if(transactionChainProxy != null){ + transactionChainProxy.onTransactionReady(cohortPathFutures); + } + return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, identifier.toString()); } @@ -340,21 +354,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return ShardStrategyFactory.getStrategy(path).findShard(path); } - private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) { + private void createTransactionIfMissing(ActorContext actorContext, + YangInstanceIdentifier path) { + + if(transactionChainProxy != null){ + transactionChainProxy.waitTillCurrentTransactionReady(); + } + String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); TransactionContext transactionContext = remoteTransactionPaths.get(shardName); - if(transactionContext != null){ + if (transactionContext != null) { // A transaction already exists with that shard return; } try { Object response = actorContext.executeShardOperation(shardName, - new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(), - ActorContext.ASK_DURATION); + new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), + getTransactionChainId()).toSerializable()); if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response); @@ -365,7 +385,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { ActorSelection transactionActor = actorContext.actorSelection(transactionPath); - if(transactionType == TransactionType.READ_ONLY) { + if (transactionType == TransactionType.READ_ONLY) { // Add the actor to the remoteTransactionActors list for access by the // cleanup PhantonReference. remoteTransactionActors.add(transactionActor); @@ -376,19 +396,28 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } transactionContext = new TransactionContextImpl(shardName, transactionPath, - transactionActor, identifier, actorContext, schemaContext); + transactionActor, identifier, actorContext, schemaContext); remoteTransactionPaths.put(shardName, transactionContext); } else { throw new IllegalArgumentException(String.format( - "Invalid reply type {} for CreateTransaction", response.getClass())); + "Invalid reply type {} for CreateTransaction", response.getClass())); } - } catch(Exception e){ + } catch (Exception e) { LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); - remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier)); + remoteTransactionPaths + .put(shardName, new NoOpTransactionContext(shardName, e, identifier)); } } + public String getTransactionChainId() { + if(transactionChainProxy == null){ + return ""; + } + return transactionChainProxy.getTransactionChainId(); + } + + private interface TransactionContext { String getShardName(); @@ -472,7 +501,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Send the ReadyTransaction message to the Tx actor. final Future replyFuture = actorContext.executeRemoteOperationAsync(getActor(), - new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION); + new ReadyTransaction().toSerializable()); // Combine all the previously recorded put/merge/delete operation reply Futures and the // ReadyTransactionReply Future into one Future. If any one fails then the combined @@ -532,23 +561,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void deleteData(YangInstanceIdentifier path) { LOG.debug("Tx {} deleteData called path = {}", identifier, path); recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), - new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION )); + new DeleteData(path).toSerializable() )); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} mergeData called path = {}", identifier, path); recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), - new MergeData(path, data, schemaContext).toSerializable(), - ActorContext.ASK_DURATION)); + new MergeData(path, data, schemaContext).toSerializable())); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} writeData called path = {}", identifier, path); recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), - new WriteData(path, data, schemaContext).toSerializable(), - ActorContext.ASK_DURATION)); + new WriteData(path, data, schemaContext).toSerializable())); } @Override @@ -612,6 +639,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { returnFuture.setException(new ReadFailedException( "Error reading data for path " + path, failure)); + } else { LOG.debug("Tx {} read operation succeeded", identifier, failure); @@ -633,7 +661,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { }; Future readFuture = actorContext.executeRemoteOperationAsync(getActor(), - new ReadData(path).toSerializable(), ActorContext.ASK_DURATION); + new ReadData(path).toSerializable()); readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } @@ -714,7 +742,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { }; Future future = actorContext.executeRemoteOperationAsync(getActor(), - new DataExists(path).toSerializable(), ActorContext.ASK_DURATION); + new DataExists(path).toSerializable()); future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } }