BUG-8618: add pause/unpause mechanics for tell-based protocol
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendReadOnlyTransaction.java
index 465cfcbee10af4a97b47e9cee17e84869433bcd6..b245ceee2d249ae0f841a69342412ce1eab8272e 100644 (file)
@@ -13,18 +13,19 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Read-only frontend transaction state as observed by the shard leader.
@@ -33,6 +34,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  */
 @NotThreadSafe
 final class FrontendReadOnlyTransaction extends FrontendTransaction {
+    private static final Logger LOG = LoggerFactory.getLogger(FrontendReadOnlyTransaction.class);
+
     private final ReadOnlyShardDataTreeTransaction openTransaction;
 
     private FrontendReadOnlyTransaction(final AbstractFrontendHistory history,
@@ -48,26 +51,35 @@ final class FrontendReadOnlyTransaction extends FrontendTransaction {
 
     // Sequence has already been checked
     @Override
-    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+    @Nullable TransactionSuccess<?> doHandleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
             final long now) throws RequestException {
         if (request instanceof ExistsTransactionRequest) {
             return handleExistsTransaction((ExistsTransactionRequest) request);
         } else if (request instanceof ReadTransactionRequest) {
             return handleReadTransaction((ReadTransactionRequest) request);
-        } else if (request instanceof TransactionAbortRequest) {
-            return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
-        } else if (request instanceof TransactionPurgeRequest) {
-            // No-op for now
-            return new TransactionPurgeResponse(request.getTarget(), request.getSequence());
+        } else if (request instanceof ModifyTransactionRequest) {
+            handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
+            return null;
         } else {
+            LOG.warn("Rejecting unsupported request {}", request);
             throw new UnsupportedRequestException(request);
         }
     }
 
-    private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        openTransaction.abort();
-        return new TransactionAbortSuccess(openTransaction.getIdentifier(), request.getSequence());
+    @Override
+    void retire() {
+        // No-op
+    }
+
+    private void handleModifyTransaction(final ModifyTransactionRequest request, final RequestEnvelope envelope,
+            final long now) {
+        // The only valid request here is with abort protocol
+        final java.util.Optional<PersistenceProtocol> optProto = request.getPersistenceProtocol();
+        Preconditions.checkArgument(optProto.isPresent(), "Commit protocol is missing in %s", request);
+        Preconditions.checkArgument(optProto.get() == PersistenceProtocol.ABORT, "Unsupported commit protocol in %s",
+                request);
+        openTransaction.abort(() -> recordAndSendSuccess(envelope, now,
+            new ModifyTransactionSuccess(request.getTarget(), request.getSequence())));
     }
 
     private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)