import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
return transactionContext(path).readData(path);
}
+ @Override public CheckedFuture<Boolean, ReadFailedException> exists(
+ YangInstanceIdentifier path) {
+ LOG.debug("txn {} exists {}", identifier, path);
+
+ createTransactionIfMissing(actorContext, path);
+
+ return transactionContext(path).dataExists(path);
+ }
+
@Override
public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
final YangInstanceIdentifier path);
void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
+
+ CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
}
- private class TransactionContextImpl implements TransactionContext{
+ private class TransactionContextImpl implements TransactionContext {
private final String shardName;
private final String actorPath;
- private final ActorSelection actor;
+ private final ActorSelection actor;
private TransactionContextImpl(String shardName, String actorPath,
return actor;
}
- @Override public String getResolvedCohortPath(String cohortPath){
+ @Override public String getResolvedCohortPath(String cohortPath) {
return actorContext.resolvePath(actorPath, cohortPath);
}
getActor().tell(new DeleteData(path).toSerializable(), null);
}
- @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data){
- getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
+ @Override public void mergeData(YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data) {
+ getActor()
+ .tell(new MergeData(path, data, schemaContext).toSerializable(),
+ null);
}
- @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
- final YangInstanceIdentifier path) {
-
- Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path) {
- @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
- Object response = actorContext
- .executeRemoteOperation(getActor(), new ReadData(path).toSerializable(),
- ActorContext.ASK_DURATION);
- if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
- ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
- if(reply.getNormalizedNode() == null){
- return Optional.absent();
+ Callable<Optional<NormalizedNode<?, ?>>> call =
+ new Callable<Optional<NormalizedNode<?, ?>>>() {
+
+ @Override public Optional<NormalizedNode<?, ?>> call()
+ throws Exception {
+ Object response = actorContext
+ .executeRemoteOperation(getActor(),
+ new ReadData(path).toSerializable(),
+ ActorContext.ASK_DURATION);
+ if (response.getClass()
+ .equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+ ReadDataReply reply = ReadDataReply
+ .fromSerializable(schemaContext, path,
+ response);
+ if (reply.getNormalizedNode() == null) {
+ return Optional.absent();
+ }
+ return Optional.<NormalizedNode<?, ?>>of(
+ reply.getNormalizedNode());
}
- return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
- }
- return Optional.absent();
- }
- };
+ throw new ReadFailedException("Read Failed " + path);
+ }
+ };
- return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
+ return MappingCheckedFuture
+ .create(executor.submit(call), ReadFailedException.MAPPER);
}
- @Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- getActor().tell(new WriteData(path, data, schemaContext).toSerializable(), null);
+ @Override public void writeData(YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data) {
+ getActor()
+ .tell(new WriteData(path, data, schemaContext).toSerializable(),
+ null);
}
+ @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
+ final YangInstanceIdentifier path) {
+
+ Callable<Boolean> call = new Callable<Boolean>() {
+
+ @Override public Boolean call() throws Exception {
+ Object o = actorContext.executeRemoteOperation(getActor(),
+ new DataExists(path).toSerializable(),
+ ActorContext.ASK_DURATION
+ );
+
+
+ if (DataExistsReply.SERIALIZABLE_CLASS
+ .equals(o.getClass())) {
+ return DataExistsReply.fromSerializable(o).exists();
+ }
+
+ throw new ReadFailedException("Exists Failed " + path);
+ }
+ };
+ return MappingCheckedFuture
+ .create(executor.submit(call), ReadFailedException.MAPPER);
+ }
}
private class NoOpTransactionContext implements TransactionContext {
NormalizedNode<?, ?> data) {
LOG.warn("txn {} writeData called path = {}", identifier, path);
}
+
+ @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
+ YangInstanceIdentifier path) {
+ LOG.warn("txn {} dataExists called path = {}", identifier, path);
+
+ // Returning false instead of an exception to keep this aligned with
+ // read
+ return Futures.immediateCheckedFuture(false);
+ }
}