BUG-5280: TransactionAbortRequest is used for user aborts
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / LocalProxyTransaction.java
index 3869e66d3e90141b9390491de9c5c20afbd60a91..61f83c2db1e8172c9374665bfe41eba3e2e13d3d 100644 (file)
@@ -17,11 +17,18 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
@@ -118,8 +125,8 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
-        final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(),
-            modification, coordinated);
+        final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, nextSequence(),
+            localActor(), modification, coordinated);
         modification = new FailedDataTreeModification(this::submittedException);
         return ret;
     }
@@ -173,10 +180,24 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
     @Override
     void handleForwardedRemoteRequest(final TransactionRequest<?> request,
             final @Nullable Consumer<Response<?, ?>> callback) {
-        LOG.debug("Applying forwaded request {}", request);
+        LOG.debug("Applying forwarded request {}", request);
 
         if (request instanceof ModifyTransactionRequest) {
             applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+        } else if (request instanceof ReadTransactionRequest) {
+            final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
+            final Optional<NormalizedNode<?, ?>> result = modification.readNode(path);
+            callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+        } else if (request instanceof ExistsTransactionRequest) {
+            final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
+            final boolean result = modification.readNode(path).isPresent();
+            callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+        } else if (request instanceof TransactionPreCommitRequest) {
+            sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+        } else if (request instanceof TransactionDoCommitRequest) {
+            sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+        } else if (request instanceof TransactionAbortRequest) {
+            sendAbort(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }