X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=74245c42592ca3d6743d0aef3b48bb2e6ace2b45;hp=32bb7d0951964975b850c8a1a685ce7d95c03f47;hb=c1362c86eb19e92e6c64d10099a45deb499c6db1;hpb=7325ba9a87ead9d9070b1ba7018ca20d60da987c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 32bb7d0951..74245c4259 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -34,7 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; /** @@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicLong; *

*/ public class TransactionProxy implements DOMStoreReadWriteTransaction { - public enum TransactionType { READ_ONLY, WRITE_ONLY, @@ -59,20 +58,24 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private static final AtomicLong counter = new AtomicLong(); - private final TransactionType readOnly; + private final TransactionType transactionType; private final ActorContext actorContext; private final Map remoteTransactionPaths = new HashMap<>(); private final String identifier; + private final ExecutorService executor; public TransactionProxy( ActorContext actorContext, - TransactionType readOnly) { + TransactionType transactionType, + ExecutorService executor + ) { - this.identifier = "transaction-" + counter.getAndIncrement(); - this.readOnly = readOnly; + this.identifier = "txn-" + counter.getAndIncrement(); + this.transactionType = transactionType; this.actorContext = actorContext; + this.executor = executor; - Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION); + Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(identifier), ActorContext.ASK_DURATION); if(response instanceof CreateTransactionReply){ CreateTransactionReply reply = (CreateTransactionReply) response; remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionPath())); @@ -91,6 +94,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { ActorContext.ASK_DURATION); if(response instanceof ReadDataReply){ ReadDataReply reply = (ReadDataReply) response; + if(reply.getNormalizedNode() == null){ + return Optional.absent(); + } //FIXME : A cast should not be required here ??? return (Optional>) Optional.of(reply.getNormalizedNode()); } @@ -102,8 +108,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { ListenableFutureTask>> future = ListenableFutureTask.create(call); - //FIXME : Use a thread pool here - Executors.newSingleThreadExecutor().submit(future); + executor.submit(future); return future; } @@ -142,7 +147,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - return new ThreePhaseCommitCohortProxy(cohortPaths); + return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor); } @Override