X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=97a9ff0bf379ef3b8f6568ed37603ed285ba35a5;hp=a8b20c030e1dd34f274ce2e02b56669739824af3;hb=9f61e98b036119694dfef0759a7cafc56aae6e86;hpb=e71922c94cec22e9f37648a2d04bf2eb3274cf2f diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index a8b20c030e..97a9ff0bf3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -71,6 +71,11 @@ import java.util.concurrent.atomic.AtomicLong; *

*/ public class TransactionProxy implements DOMStoreReadWriteTransaction { + + private final TransactionChainProxy transactionChainProxy; + + + public enum TransactionType { READ_ONLY, WRITE_ONLY, @@ -177,12 +182,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private boolean inReadyState; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { + this(actorContext, transactionType, null); + } + + @VisibleForTesting + List> getRecordedOperationFutures() { + List> recordedOperationFutures = Lists.newArrayList(); + for(TransactionContext transactionContext : remoteTransactionPaths.values()) { + recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures()); + } + + return recordedOperationFutures; + } + + public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) { this.actorContext = Preconditions.checkNotNull(actorContext, - "actorContext should not be null"); + "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, - "transactionType should not be null"); + "transactionType should not be null"); this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), - "schemaContext should not be null"); + "schemaContext should not be null"); + this.transactionChainProxy = transactionChainProxy; String memberName = actorContext.getCurrentMemberName(); if(memberName == null){ @@ -190,7 +210,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } this.identifier = TransactionIdentifier.builder().memberName(memberName).counter( - counter.getAndIncrement()).build(); + counter.getAndIncrement()).build(); if(transactionType == TransactionType.READ_ONLY) { // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference @@ -201,23 +221,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { remoteTransactionActorsMB = new AtomicBoolean(); TransactionProxyCleanupPhantomReference cleanup = - new TransactionProxyCleanupPhantomReference(this); + new TransactionProxyCleanupPhantomReference(this); phantomReferenceCache.put(cleanup, cleanup); } LOG.debug("Created txn {} of type {}", identifier, transactionType); } - @VisibleForTesting - List> getRecordedOperationFutures() { - List> recordedOperationFutures = Lists.newArrayList(); - for(TransactionContext transactionContext : remoteTransactionPaths.values()) { - recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures()); - } - - return recordedOperationFutures; - } - @Override public CheckedFuture>, ReadFailedException> read( final YangInstanceIdentifier path) { @@ -308,6 +318,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { cohortPathFutures.add(transactionContext.readyTransaction()); } + if(transactionChainProxy != null){ + transactionChainProxy.onTransactionReady(cohortPathFutures); + } + return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, identifier.toString()); } @@ -340,20 +354,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return ShardStrategyFactory.getStrategy(path).findShard(path); } - private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) { + private void createTransactionIfMissing(ActorContext actorContext, + YangInstanceIdentifier path) { + + if(transactionChainProxy != null){ + transactionChainProxy.waitTillCurrentTransactionReady(); + } + String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); TransactionContext transactionContext = remoteTransactionPaths.get(shardName); - if(transactionContext != null){ + if (transactionContext != null) { // A transaction already exists with that shard return; } try { Object response = actorContext.executeShardOperation(shardName, - new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable()); + new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), + getTransactionChainId()).toSerializable()); if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response); @@ -364,7 +385,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { ActorSelection transactionActor = actorContext.actorSelection(transactionPath); - if(transactionType == TransactionType.READ_ONLY) { + if (transactionType == TransactionType.READ_ONLY) { // Add the actor to the remoteTransactionActors list for access by the // cleanup PhantonReference. remoteTransactionActors.add(transactionActor); @@ -375,19 +396,28 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } transactionContext = new TransactionContextImpl(shardName, transactionPath, - transactionActor, identifier, actorContext, schemaContext); + transactionActor, identifier, actorContext, schemaContext); remoteTransactionPaths.put(shardName, transactionContext); } else { throw new IllegalArgumentException(String.format( - "Invalid reply type {} for CreateTransaction", response.getClass())); + "Invalid reply type {} for CreateTransaction", response.getClass())); } - } catch(Exception e){ + } catch (Exception e) { LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); - remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier)); + remoteTransactionPaths + .put(shardName, new NoOpTransactionContext(shardName, e, identifier)); } } + public String getTransactionChainId() { + if(transactionChainProxy == null){ + return ""; + } + return transactionChainProxy.getTransactionChainId(); + } + + private interface TransactionContext { String getShardName();