BUG-8494: fix throttling during reconnect
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransaction.java
index ac91f2d0413ad8aca71391ba86921c92dc1b008b..1eff70f6d9299ab9f782bbe1e8ed54dbca259f4d 100644 (file)
@@ -248,11 +248,18 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * variable. It uses pre-allocated objects for fast paths (i.e. no successor present) and a per-transition object
      * for slow paths (when successor is injected/present).
      */
-    private volatile int sealed = 0;
-    private volatile State state = OPEN;
+    private volatile int sealed;
+    private volatile State state;
 
-    AbstractProxyTransaction(final ProxyHistory parent) {
+    AbstractProxyTransaction(final ProxyHistory parent, final boolean isDone) {
         this.parent = Preconditions.checkNotNull(parent);
+        if (isDone) {
+            state = DONE;
+            // DONE implies previous seal operation completed
+            sealed = 1;
+        } else {
+            state = OPEN;
+        }
     }
 
     final void executeInActor(final Runnable command) {
@@ -619,9 +626,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         final SuccessorState local = getSuccessorState();
         final State prevState = local.getPrevState();
 
+        final boolean isDone = DONE.equals(state)
+                || state instanceof SuccessorState && ((SuccessorState) state).isDone();
         final AbstractProxyTransaction successor = successorHistory.createTransactionProxy(getIdentifier(),
-            isSnapshotOnly());
-        LOG.debug("{} created successor transaction proxy {}", this, successor);
+            isSnapshotOnly(), isDone);
+        LOG.debug("{} created successor {}", this, successor);
         local.setSuccessor(successor);
 
         // Replay successful requests first
@@ -636,11 +645,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             for (Object obj : successfulRequests) {
                 if (obj instanceof TransactionRequest) {
                     LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
-                    successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
+                    successor.doReplayRequest((TransactionRequest<?>) obj, resp -> { }, now);
                 } else {
                     Verify.verify(obj instanceof IncrementSequence);
                     final IncrementSequence increment = (IncrementSequence) obj;
-                    successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
+                    successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
                         increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { },
                         now);
                     LOG.debug("Incrementing sequence {} to successor {}", obj, successor);
@@ -659,7 +668,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             if (getIdentifier().equals(req.getTarget())) {
                 Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req);
                 LOG.debug("Replaying queued request {} to successor {}", req, successor);
-                successor.replayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
+                successor.doReplayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
                 it.remove();
             }
         }
@@ -687,7 +696,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @param callback Callback to be invoked once the request completes
      * @param enqueuedTicks ticker-based time stamp when the request was enqueued
      */
-    private void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+    private void doReplayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
             final long enqueuedTicks) {
         if (request instanceof AbstractLocalTransactionRequest) {
             handleReplayedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback, enqueuedTicks);
@@ -727,6 +736,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         }
     }
 
+    final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        getSuccessorState().getSuccessor().doReplayRequest(request, callback, enqueuedTicks);
+    }
+
     abstract boolean isSnapshotOnly();
 
     abstract void doDelete(YangInstanceIdentifier path);