X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=e30076d09e05930f6441a9454b791a83cfd180d8;hb=c6618cb0df8fc41ccfcb7578b11f5ecfa0403f39;hp=32bb7d0951964975b850c8a1a685ce7d95c03f47;hpb=e2df8aa67238c153f1038eb45f7799442861985b;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 32bb7d0951..e30076d09e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -15,7 +15,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.ReadData; @@ -24,17 +23,19 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; /** @@ -50,7 +51,6 @@ import java.util.concurrent.atomic.AtomicLong; *

*/ public class TransactionProxy implements DOMStoreReadWriteTransaction { - public enum TransactionType { READ_ONLY, WRITE_ONLY, @@ -59,23 +59,30 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private static final AtomicLong counter = new AtomicLong(); - private final TransactionType readOnly; + private final TransactionType transactionType; private final ActorContext actorContext; private final Map remoteTransactionPaths = new HashMap<>(); private final String identifier; + private final ExecutorService executor; + private final SchemaContext schemaContext; public TransactionProxy( ActorContext actorContext, - TransactionType readOnly) { + TransactionType transactionType, + ExecutorService executor, + SchemaContext schemaContext + ) { - this.identifier = "transaction-" + counter.getAndIncrement(); - this.readOnly = readOnly; + this.identifier = "txn-" + counter.getAndIncrement(); + this.transactionType = transactionType; this.actorContext = actorContext; + this.executor = executor; + this.schemaContext = schemaContext; - Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION); + Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(identifier), ActorContext.ASK_DURATION); if(response instanceof CreateTransactionReply){ CreateTransactionReply reply = (CreateTransactionReply) response; - remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionPath())); + remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionActorPath())); } } @@ -87,10 +94,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public Optional> call() throws Exception { Object response = actorContext - .executeRemoteOperation(remoteTransaction, new ReadData(path), + .executeRemoteOperation(remoteTransaction, new ReadData(path).toSerializable(), ActorContext.ASK_DURATION); - if(response instanceof ReadDataReply){ - ReadDataReply reply = (ReadDataReply) response; + if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){ + ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response); + if(reply.getNormalizedNode() == null){ + return Optional.absent(); + } //FIXME : A cast should not be required here ??? return (Optional>) Optional.of(reply.getNormalizedNode()); } @@ -102,8 +112,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { ListenableFutureTask>> future = ListenableFutureTask.create(call); - //FIXME : Use a thread pool here - Executors.newSingleThreadExecutor().submit(future); + executor.submit(future); return future; } @@ -111,19 +120,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void write(InstanceIdentifier path, NormalizedNode data) { final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path); - remoteTransaction.tell(new WriteData(path, data), null); + remoteTransaction.tell(new WriteData(path, data, schemaContext).toSerializable(), null); } @Override public void merge(InstanceIdentifier path, NormalizedNode data) { final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path); - remoteTransaction.tell(new MergeData(path, data), null); + remoteTransaction.tell(new MergeData(path, data, schemaContext).toSerializable(), null); } @Override public void delete(InstanceIdentifier path) { final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path); - remoteTransaction.tell(new DeleteData(path), null); + remoteTransaction.tell(new DeleteData(path).toSerializable(), null); } @Override @@ -142,7 +151,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - return new ThreePhaseCommitCohortProxy(cohortPaths); + return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor); } @Override