X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=fa98905a66968b111372d4b893ef35bef6e32bd4;hb=6faa900c307b5d97fc7726d31b6dad0e67077db8;hp=c85d32012fed9ee036a96b2e3663c061eabd0385;hpb=5ffd4b46ca00fc8f3d801050670c890117dc0811;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index c85d32012f..fa98905a66 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -13,9 +13,10 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.ListeningExecutorService; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; @@ -29,8 +30,10 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -42,7 +45,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; /** @@ -74,13 +76,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final ActorContext actorContext; private final Map remoteTransactionPaths = new HashMap<>(); private final String identifier; - private final ExecutorService executor; + private final ListeningExecutorService executor; private final SchemaContext schemaContext; public TransactionProxy( ActorContext actorContext, TransactionType transactionType, - ExecutorService executor, + ListeningExecutorService executor, SchemaContext schemaContext ) { @@ -94,7 +96,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } @Override - public ListenableFuture>> read(final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read( + final YangInstanceIdentifier path) { createTransactionIfMissing(actorContext, path); @@ -177,7 +180,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { try { Object response = actorContext.executeShardOperation(shardName, - new CreateTransaction(identifier).toSerializable(), + new CreateTransaction(identifier,this.transactionType.ordinal() ).toSerializable(), ActorContext.ASK_DURATION); if (response.getClass() .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { @@ -196,8 +199,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { remoteTransactionPaths.put(shardName, transactionContext); } - } catch(TimeoutException e){ - remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName)); + } catch(TimeoutException | PrimaryNotFoundException e){ + LOG.error("Creating NoOpTransaction because of : {}", e.getMessage()); + remoteTransactionPaths.put(shardName, + new NoOpTransactionContext(shardName)); } } @@ -214,7 +219,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { void mergeData(YangInstanceIdentifier path, NormalizedNode data); - ListenableFuture>> readData(final YangInstanceIdentifier path); + CheckedFuture>, ReadFailedException> readData( + final YangInstanceIdentifier path); void writeData(YangInstanceIdentifier path, NormalizedNode data); } @@ -266,9 +272,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null); } - @Override public ListenableFuture>> readData(final YangInstanceIdentifier path) { + @Override public CheckedFuture>, ReadFailedException> readData( + final YangInstanceIdentifier path) { - Callable>> call = new Callable() { + Callable>> call = new Callable>>() { @Override public Optional> call() throws Exception { Object response = actorContext @@ -279,20 +286,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(reply.getNormalizedNode() == null){ return Optional.absent(); } - //FIXME : A cast should not be required here ??? - return (Optional>) Optional.of(reply.getNormalizedNode()); + return Optional.>of(reply.getNormalizedNode()); } return Optional.absent(); } }; - ListenableFutureTask>> - future = ListenableFutureTask.create(call); - - executor.submit(future); - - return future; + return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { @@ -342,10 +343,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } @Override - public ListenableFuture>> readData( + public CheckedFuture>, ReadFailedException> readData( YangInstanceIdentifier path) { LOG.error("readData called path = {}", path); - return Futures.immediateFuture( + return Futures.immediateCheckedFuture( Optional.>absent()); }