BUG-8403: do not throttle purge requests
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / LocalProxyTransaction.java
index 9f4b18eaaa73e07d91d12d2f3639c202a1eee436..02f8e5a9532abd1a8ad45c2824c205694177a762 100644 (file)
@@ -70,7 +70,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
 
     abstract DataTreeSnapshot readOnlyView();
 
-    abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
+    abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request,
             @Nullable Consumer<Response<?, ?>> callback);
 
     abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
@@ -87,9 +87,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     }
 
     @Override
-    final void doAbort() {
-        sendAbort(new AbortLocalTransactionRequest(identifier, localActor()),
-            response -> LOG.debug("Transaction {} abort completed with {}", identifier, response));
+    final AbortLocalTransactionRequest abortRequest() {
+        return new AbortLocalTransactionRequest(identifier, localActor());
     }
 
     @Override
@@ -104,15 +103,27 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
 
     private boolean handleReadRequest(final TransactionRequest<?> request,
             final @Nullable Consumer<Response<?, ?>> callback) {
+        // Note we delay completion of read requests to limit the scope at which the client can run, as they have
+        // listeners, which we do not want to execute while we are reconnecting.
         if (request instanceof ReadTransactionRequest) {
             final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
             final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
-            callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+            if (callback != null) {
+                // XXX: FB does not see that callback is final, on stack and has be check for non-null.
+                final Consumer<Response<?, ?>> fbIsStupid = Preconditions.checkNotNull(callback);
+                executeInActor(() -> fbIsStupid.accept(new ReadTransactionSuccess(request.getTarget(),
+                    request.getSequence(), result)));
+            }
             return true;
         } else if (request instanceof ExistsTransactionRequest) {
             final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
             final boolean result = readOnlyView().readNode(path).isPresent();
-            callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+            if (callback != null) {
+                // XXX: FB does not see that callback is final, on stack and has be check for non-null.
+                final Consumer<Response<?, ?>> fbIsStupid = Preconditions.checkNotNull(callback);
+                executeInActor(() -> fbIsStupid.accept(new ExistsTransactionSuccess(request.getTarget(),
+                    request.getSequence(), result)));
+            }
             return true;
         } else {
             return false;
@@ -127,7 +138,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         } else if (handleReadRequest(request, callback)) {
             // No-op
         } else if (request instanceof TransactionPurgeRequest) {
-            enqueuePurge(enqueuedTicks);
+            enqueuePurge(callback, enqueuedTicks);
         } else if (request instanceof IncrementTransactionSequenceRequest) {
             // Local transactions do not have non-replayable requests which would be visible to the backend,
             // hence we can skip sequence increments.
@@ -147,11 +158,11 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
      */
     void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         if (request instanceof ModifyTransactionRequest) {
-            applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+            applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback);
         } else if (handleReadRequest(request, callback)) {
             // No-op
         } else if (request instanceof TransactionPurgeRequest) {
-            sendPurge();
+            enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
@@ -191,7 +202,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
             successor.abort();
         } else if (request instanceof TransactionPurgeRequest) {
             LOG.debug("Forwarding purge {} to successor {}", request, successor);
-            successor.sendPurge();
+            successor.enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
@@ -203,7 +214,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         if (request instanceof AbortLocalTransactionRequest) {
             successor.sendAbort(request, callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            successor.sendPurge();
+            successor.enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }