Merge "Bug 1576: Handle remote failures for write Tx async"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 6183c489c4cdbc56a1ba9ac0c3c3939963277edb..2e7b2feb85bd97c7b80d09b8156f4b44c6fb343e 100644 (file)
@@ -9,13 +9,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorPath;
-import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.actor.Props;
 import akka.dispatch.OnComplete;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
@@ -45,9 +45,10 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.Function1;
 import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -72,6 +73,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         READ_WRITE
     }
 
+    static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
+                                                                          Throwable, Throwable>() {
+        @Override
+        public Throwable apply(Throwable failure) {
+            return failure;
+        }
+    };
+
     private static final AtomicLong counter = new AtomicLong();
 
     private static final Logger
@@ -103,6 +112,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     }
 
+    @VisibleForTesting
+    List<Future<Object>> getRecordedOperationFutures() {
+        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+        for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+            recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+        }
+
+        return recordedOperationFutures;
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final YangInstanceIdentifier path) {
@@ -110,7 +129,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Read operation on write-only transaction is not allowed");
 
-        LOG.debug("txn {} read {}", identifier, path);
+        LOG.debug("Tx {} read {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -123,7 +142,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Exists operation on write-only transaction is not allowed");
 
-        LOG.debug("txn {} exists {}", identifier, path);
+        LOG.debug("Tx {} exists {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -142,7 +161,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("txn {} write {}", identifier, path);
+        LOG.debug("Tx {} write {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -154,7 +173,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("txn {} merge {}", identifier, path);
+        LOG.debug("Tx {} merge {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -166,7 +185,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("txn {} delete {}", identifier, path);
+        LOG.debug("Tx {} delete {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -180,31 +199,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         inReadyState = true;
 
-        List<ActorPath> cohortPaths = new ArrayList<>();
-
-        LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier,
+        LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
                 remoteTransactionPaths.size());
 
+        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+
         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
 
-            LOG.debug("txn {} Readying transaction for shard {}", identifier,
+            LOG.debug("Tx {} Readying transaction for shard {}", identifier,
                     transactionContext.getShardName());
 
-            Object result = transactionContext.readyTransaction();
-
-            if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
-                ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
-                        actorContext.getActorSystem(),result);
-                String resolvedCohortPath = transactionContext.getResolvedCohortPath(
-                        reply.getCohortPath().toString());
-                cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
-            } else {
-                LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS,
-                        result.getClass());
-            }
+            cohortPathFutures.add(transactionContext.readyTransaction());
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString());
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+                identifier.toString());
     }
 
     @Override
@@ -249,7 +258,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 String transactionPath = reply.getTransactionPath();
 
-                LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
+                LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
 
                 ActorSelection transactionActor =
                     actorContext.actorSelection(transactionPath);
@@ -259,11 +268,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 remoteTransactionPaths.put(shardName, transactionContext);
             } else {
-                LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS,
-                        response.getClass());
+                throw new IllegalArgumentException(String.format(
+                        "Invalid reply type {} for CreateTransaction", response.getClass()));
             }
         } catch(Exception e){
-            LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+            LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
             remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
         }
     }
@@ -271,11 +280,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private interface TransactionContext {
         String getShardName();
 
-        String getResolvedCohortPath(String cohortPath);
+        void closeTransaction();
 
-        public void closeTransaction();
+        Future<ActorPath> readyTransaction();
 
-        public Object readyTransaction();
+        void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
         void deleteData(YangInstanceIdentifier path);
 
@@ -284,23 +293,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
                 final YangInstanceIdentifier path);
 
-        void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
-
         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
-    }
 
+        List<Future<Object>> getRecordedOperationFutures();
+    }
 
-    private class TransactionContextImpl implements TransactionContext {
-        private final String shardName;
-        private final String actorPath;
-        private final ActorSelection actor;
+    private abstract class AbstractTransactionContext implements TransactionContext {
 
+        protected final String shardName;
+        protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
 
-        private TransactionContextImpl(String shardName, String actorPath,
-            ActorSelection actor) {
+        AbstractTransactionContext(String shardName) {
             this.shardName = shardName;
-            this.actorPath = actorPath;
-            this.actor = actor;
         }
 
         @Override
@@ -308,53 +312,193 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             return shardName;
         }
 
+        @Override
+        public List<Future<Object>> getRecordedOperationFutures() {
+            return recordedOperationFutures;
+        }
+    }
+
+    private class TransactionContextImpl extends AbstractTransactionContext {
+        private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+
+        private final String actorPath;
+        private final ActorSelection actor;
+
+        private TransactionContextImpl(String shardName, String actorPath,
+            ActorSelection actor) {
+            super(shardName);
+            this.actorPath = actorPath;
+            this.actor = actor;
+        }
+
         private ActorSelection getActor() {
             return actor;
         }
 
-        @Override
-        public String getResolvedCohortPath(String cohortPath) {
+        private String getResolvedCohortPath(String cohortPath) {
             return actorContext.resolvePath(actorPath, cohortPath);
         }
 
         @Override
         public void closeTransaction() {
+            LOG.debug("Tx {} closeTransaction called", identifier);
             actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
         }
 
         @Override
-        public Object readyTransaction() {
-            return actorContext.executeRemoteOperation(getActor(),
+        public Future<ActorPath> readyTransaction() {
+            LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+                    identifier, recordedOperationFutures.size());
+
+            // Send the ReadyTransaction message to the Tx actor.
+
+            final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
                     new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+
+            // Combine all the previously recorded put/merge/delete operation reply Futures and the
+            // ReadyTransactionReply Future into one Future. If any one fails then the combined
+            // Future will fail. We need all prior operations and the ready operation to succeed
+            // in order to attempt commit.
+
+            List<Future<Object>> futureList =
+                    Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
+            futureList.addAll(recordedOperationFutures);
+            futureList.add(replyFuture);
+
+            Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
+                    actorContext.getActorSystem().dispatcher());
+
+            // Transform the combined Future into a Future that returns the cohort actor path from
+            // the ReadyTransactionReply. That's the end result of the ready operation.
+
+            return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
+                @Override
+                public ActorPath apply(Iterable<Object> notUsed) {
+
+                    LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
+                            identifier);
+
+                    // At this point all the Futures succeeded and we need to extract the cohort
+                    // actor path from the ReadyTransactionReply. For the recorded operations, they
+                    // don't return any data so we're only interested that they completed
+                    // successfully. We could be paranoid and verify the correct reply types but
+                    // that really should never happen so it's not worth the overhead of
+                    // de-serializing each reply.
+
+                    // Note the Future get call here won't block as it's complete.
+                    Object serializedReadyReply = replyFuture.value().get().get();
+                    if(serializedReadyReply.getClass().equals(
+                                                     ReadyTransactionReply.SERIALIZABLE_CLASS)) {
+                        ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
+                                actorContext.getActorSystem(), serializedReadyReply);
+
+                        String resolvedCohortPath = getResolvedCohortPath(
+                                reply.getCohortPath().toString());
+
+                        LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
+                                identifier, resolvedCohortPath);
+
+                        return actorContext.actorFor(resolvedCohortPath);
+                    } else {
+                        // Throwing an exception here will fail the Future.
+
+                        throw new IllegalArgumentException(String.format("Invalid reply type {}",
+                                serializedReadyReply.getClass()));
+                    }
+                }
+            }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
         }
 
         @Override
         public void deleteData(YangInstanceIdentifier path) {
-            actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() );
+            LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+                    new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            actorContext.sendRemoteOperationAsync(getActor(),
-                    new MergeData(path, data, schemaContext).toSerializable());
+            LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+                    new MergeData(path, data, schemaContext).toSerializable(),
+                    ActorContext.ASK_DURATION));
+        }
+
+        @Override
+        public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+                    new WriteData(path, data, schemaContext).toSerializable(),
+                    ActorContext.ASK_DURATION));
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
-            final YangInstanceIdentifier path) {
+                final YangInstanceIdentifier path) {
+
+            LOG.debug("Tx {} readData called path = {}", identifier, path);
 
             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
 
+            // If there were any previous recorded put/merge/delete operation reply Futures then we
+            // must wait for them to successfully complete. This is necessary to honor the read
+            // uncommitted semantics of the public API contract. If any one fails then fail the read.
+
+            if(recordedOperationFutures.isEmpty()) {
+                finishReadData(path, returnFuture);
+            } else {
+                LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+                        identifier, recordedOperationFutures.size());
+
+                // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+                // Futures#sequence accesses the passed List on a different thread, as
+                // recordedOperationFutures is not synchronized.
+
+                Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+                        Lists.newArrayList(recordedOperationFutures),
+                        actorContext.getActorSystem().dispatcher());
+                OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+                    @Override
+                    public void onComplete(Throwable failure, Iterable<Object> notUsed)
+                            throws Throwable {
+                        if(failure != null) {
+                            LOG.debug("Tx {} readData: a recorded operation failed: {}",
+                                    identifier, failure);
+
+                            returnFuture.setException(new ReadFailedException(
+                                    "The read could not be performed because a previous put, merge,"
+                                    + "or delete operation failed", failure));
+                        } else {
+                            finishReadData(path, returnFuture);
+                        }
+                    }
+                };
+
+                combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            }
+
+            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+        }
+
+        private void finishReadData(final YangInstanceIdentifier path,
+                final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
+
+            LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+
             OnComplete<Object> onComplete = new OnComplete<Object>() {
                 @Override
-                public void onComplete(Throwable failure, Object response) throws Throwable {
+                public void onComplete(Throwable failure, Object readResponse) throws Throwable {
                     if(failure != null) {
+                        LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+
                         returnFuture.setException(new ReadFailedException(
                                 "Error reading data for path " + path, failure));
                     } else {
-                        if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+                        LOG.debug("Tx {} read operation succeeded", identifier, failure);
+
+                        if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
-                                    path, response);
+                                    path, readResponse);
                             if (reply.getNormalizedNode() == null) {
                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
                             } else {
@@ -369,32 +513,76 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             };
 
-            Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+            Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
                     new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
-            future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
-            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
-        }
-
-        @Override
-        public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            actorContext.sendRemoteOperationAsync(getActor(),
-                    new WriteData(path, data, schemaContext).toSerializable());
+            readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
 
         @Override
         public CheckedFuture<Boolean, ReadFailedException> dataExists(
                 final YangInstanceIdentifier path) {
 
+            LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+
             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
 
+            // If there were any previous recorded put/merge/delete operation reply Futures then we
+            // must wait for them to successfully complete. This is necessary to honor the read
+            // uncommitted semantics of the public API contract. If any one fails then fail this
+            // request.
+
+            if(recordedOperationFutures.isEmpty()) {
+                finishDataExists(path, returnFuture);
+            } else {
+                LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+                        identifier, recordedOperationFutures.size());
+
+                // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+                // Futures#sequence accesses the passed List on a different thread, as
+                // recordedOperationFutures is not synchronized.
+
+                Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+                        Lists.newArrayList(recordedOperationFutures),
+                        actorContext.getActorSystem().dispatcher());
+                OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+                    @Override
+                    public void onComplete(Throwable failure, Iterable<Object> notUsed)
+                            throws Throwable {
+                        if(failure != null) {
+                            LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+                                    identifier, failure);
+
+                            returnFuture.setException(new ReadFailedException(
+                                    "The data exists could not be performed because a previous "
+                                    + "put, merge, or delete operation failed", failure));
+                        } else {
+                            finishDataExists(path, returnFuture);
+                        }
+                    }
+                };
+
+                combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            }
+
+            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+        }
+
+        private void finishDataExists(final YangInstanceIdentifier path,
+                final SettableFuture<Boolean> returnFuture) {
+
+            LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+
             OnComplete<Object> onComplete = new OnComplete<Object>() {
                 @Override
                 public void onComplete(Throwable failure, Object response) throws Throwable {
                     if(failure != null) {
+                        LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+
                         returnFuture.setException(new ReadFailedException(
-                                "Error checking exists for path " + path, failure));
+                                "Error checking data exists for path " + path, failure));
                     } else {
+                        LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+
                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
                             returnFuture.set(Boolean.valueOf(DataExistsReply.
                                         fromSerializable(response).exists()));
@@ -409,80 +597,60 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
                     new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
-            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
         }
     }
 
-    private class NoOpTransactionContext implements TransactionContext {
+    private class NoOpTransactionContext extends AbstractTransactionContext {
 
-        private final Logger
-            LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
+        private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
-        private final String shardName;
         private final Exception failure;
 
-        private ActorRef cohort;
-
         public NoOpTransactionContext(String shardName, Exception failure){
-            this.shardName = shardName;
+            super(shardName);
             this.failure = failure;
         }
 
         @Override
-        public String getShardName() {
-            return  shardName;
-
+        public void closeTransaction() {
+            LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
         }
 
         @Override
-        public String getResolvedCohortPath(String cohortPath) {
-            return cohort.path().toString();
+        public Future<ActorPath> readyTransaction() {
+            LOG.debug("Tx {} readyTransaction called", identifier);
+            return akka.dispatch.Futures.failed(failure);
         }
 
         @Override
-        public void closeTransaction() {
-            LOG.warn("txn {} closeTransaction called", identifier);
-        }
-
-        @Override public Object readyTransaction() {
-            LOG.warn("txn {} readyTransaction called", identifier);
-            cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
-            return new ReadyTransactionReply(cohort.path()).toSerializable();
+        public void deleteData(YangInstanceIdentifier path) {
+            LOG.debug("Tx {} deleteData called path = {}", identifier, path);
         }
 
         @Override
-        public void deleteData(YangInstanceIdentifier path) {
-            LOG.warn("txt {} deleteData called path = {}", identifier, path);
+        public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            LOG.debug("Tx {} mergeData called path = {}", identifier, path);
         }
 
         @Override
-        public void mergeData(YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data) {
-            LOG.warn("txn {} mergeData called path = {}", identifier, path);
+        public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            LOG.debug("Tx {} writeData called path = {}", identifier, path);
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
             YangInstanceIdentifier path) {
-            LOG.warn("txn {} readData called path = {}", identifier, path);
+            LOG.debug("Tx {} readData called path = {}", identifier, path);
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error reading data for path " + path, failure));
         }
 
-        @Override public void writeData(YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data) {
-            LOG.warn("txn {} writeData called path = {}", identifier, path);
-        }
-
-        @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
+        @Override
+        public CheckedFuture<Boolean, ReadFailedException> dataExists(
             YangInstanceIdentifier path) {
-            LOG.warn("txn {} dataExists called path = {}", identifier, path);
+            LOG.debug("Tx {} dataExists called path = {}", identifier, path);
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error checking exists for path " + path, failure));
         }
     }
-
-
-
 }