X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=95862ae9d93670de67dbdd8bd99074a9ba49e8b8;hp=5f9f1f83c4dbf15b38ea5a3d13d534354b4527e2;hb=a1ea6554ff025f333b171637b37a9ad87c7846ea;hpb=bc7b4edec3c868a14e0a0de3a3b8e1af2406448b diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 5f9f1f83c4..95862ae9d9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -23,6 +23,8 @@ import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIden import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; +import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.ReadData; @@ -113,6 +115,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return transactionContext(path).readData(path); } + @Override public CheckedFuture exists( + YangInstanceIdentifier path) { + LOG.debug("txn {} exists {}", identifier, path); + + createTransactionIfMissing(actorContext, path); + + return transactionContext(path).dataExists(path); + } + @Override public void write(YangInstanceIdentifier path, NormalizedNode data) { @@ -243,13 +254,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { final YangInstanceIdentifier path); void writeData(YangInstanceIdentifier path, NormalizedNode data); + + CheckedFuture dataExists(YangInstanceIdentifier path); } - private class TransactionContextImpl implements TransactionContext{ + private class TransactionContextImpl implements TransactionContext { private final String shardName; private final String actorPath; - private final ActorSelection actor; + private final ActorSelection actor; private TransactionContextImpl(String shardName, String actorPath, @@ -267,7 +280,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return actor; } - @Override public String getResolvedCohortPath(String cohortPath){ + @Override public String getResolvedCohortPath(String cohortPath) { return actorContext.resolvePath(actorPath, cohortPath); } @@ -288,38 +301,76 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { getActor().tell(new DeleteData(path).toSerializable(), null); } - @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data){ - getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null); + @Override public void mergeData(YangInstanceIdentifier path, + NormalizedNode data) { + getActor() + .tell(new MergeData(path, data, schemaContext).toSerializable(), + null); } - @Override public CheckedFuture>, ReadFailedException> readData( - final YangInstanceIdentifier path) { - - Callable>> call = new Callable>>() { + @Override + public CheckedFuture>, ReadFailedException> readData( + final YangInstanceIdentifier path) { - @Override public Optional> call() throws Exception { - Object response = actorContext - .executeRemoteOperation(getActor(), new ReadData(path).toSerializable(), - ActorContext.ASK_DURATION); - if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){ - ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response); - if(reply.getNormalizedNode() == null){ - return Optional.absent(); + Callable>> call = + new Callable>>() { + + @Override public Optional> call() + throws Exception { + Object response = actorContext + .executeRemoteOperation(getActor(), + new ReadData(path).toSerializable(), + ActorContext.ASK_DURATION); + if (response.getClass() + .equals(ReadDataReply.SERIALIZABLE_CLASS)) { + ReadDataReply reply = ReadDataReply + .fromSerializable(schemaContext, path, + response); + if (reply.getNormalizedNode() == null) { + return Optional.absent(); + } + return Optional.>of( + reply.getNormalizedNode()); } - return Optional.>of(reply.getNormalizedNode()); - } - return Optional.absent(); - } - }; + throw new ReadFailedException("Read Failed " + path); + } + }; - return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER); + return MappingCheckedFuture + .create(executor.submit(call), ReadFailedException.MAPPER); } - @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - getActor().tell(new WriteData(path, data, schemaContext).toSerializable(), null); + @Override public void writeData(YangInstanceIdentifier path, + NormalizedNode data) { + getActor() + .tell(new WriteData(path, data, schemaContext).toSerializable(), + null); } + @Override public CheckedFuture dataExists( + final YangInstanceIdentifier path) { + + Callable call = new Callable() { + + @Override public Boolean call() throws Exception { + Object o = actorContext.executeRemoteOperation(getActor(), + new DataExists(path).toSerializable(), + ActorContext.ASK_DURATION + ); + + + if (DataExistsReply.SERIALIZABLE_CLASS + .equals(o.getClass())) { + return DataExistsReply.fromSerializable(o).exists(); + } + + throw new ReadFailedException("Exists Failed " + path); + } + }; + return MappingCheckedFuture + .create(executor.submit(call), ReadFailedException.MAPPER); + } } private class NoOpTransactionContext implements TransactionContext { @@ -374,6 +425,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { NormalizedNode data) { LOG.warn("txn {} writeData called path = {}", identifier, path); } + + @Override public CheckedFuture dataExists( + YangInstanceIdentifier path) { + LOG.warn("txn {} dataExists called path = {}", identifier, path); + + // Returning false instead of an exception to keep this aligned with + // read + return Futures.immediateCheckedFuture(false); + } }