BUG-5280: introduce request/response Envelope
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / actors / client / SequencedQueueEntry.java
index 54940cd8ff9ac9fe5cf98130175a9e2d1b6392a8..b99b1a3ceb3c9e8cb03a51c252f6e7a41c40e690 100644 (file)
@@ -12,8 +12,10 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,12 +28,12 @@ import org.slf4j.LoggerFactory;
  */
 final class SequencedQueueEntry {
     private static final class LastTry {
-        final Request<?, ?> request;
         final long timeTicks;
+        final long retry;
 
-        LastTry(final Request<?, ?> request, final long when) {
-            this.request = Preconditions.checkNotNull(request);
-            this.timeTicks = when;
+        LastTry(final long retry, final long timeTicks) {
+            this.retry = retry;
+            this.timeTicks = timeTicks;
         }
     }
 
@@ -40,26 +42,28 @@ final class SequencedQueueEntry {
     private final Request<?, ?> request;
     private final RequestCallback callback;
     private final long enqueuedTicks;
+    private final long sequence;
 
     private Optional<LastTry> lastTry = Optional.empty();
 
-    SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback, final long now) {
+    SequencedQueueEntry(final Request<?, ?> request, final long sequence, final RequestCallback callback,
+        final long now) {
         this.request = Preconditions.checkNotNull(request);
         this.callback = Preconditions.checkNotNull(callback);
         this.enqueuedTicks = now;
+        this.sequence = sequence;
     }
 
     long getSequence() {
-        return request.getSequence();
+        return sequence;
     }
 
-    boolean acceptsResponse(final Response<?, ?> response) {
-        return getSequence() == response.getSequence() && request.getTarget().equals(response.getTarget());
+    boolean acceptsResponse(final ResponseEnvelope<?> response) {
+        return getSequence() == response.getSequence() && request.getTarget().equals(response.getMessage().getTarget());
     }
 
     long getCurrentTry() {
-        final Request<?, ?> req = lastTry.isPresent() ? lastTry.get().request : request;
-        return req.getRetry();
+        return lastTry.isPresent() ? lastTry.get().retry : 0;
      }
 
     ClientActorBehavior complete(final Response<?, ?> response) {
@@ -73,20 +77,16 @@ final class SequencedQueueEntry {
     }
 
     boolean isTimedOut(final long now, final long timeoutNanos) {
-        final Request<?, ?> req;
         final long elapsed;
 
         if (lastTry.isPresent()) {
-            final LastTry t = lastTry.get();
-            elapsed = now - t.timeTicks;
-            req = t.request;
+            elapsed = now - lastTry.get().timeTicks;
         } else {
             elapsed = now - enqueuedTicks;
-            req = request;
         }
 
         if (elapsed >= timeoutNanos) {
-            LOG.debug("Request {} timed out after {}ns", req, elapsed);
+            LOG.debug("Request {} timed out after {}ns", request, elapsed);
             return true;
         } else {
             return false;
@@ -94,13 +94,13 @@ final class SequencedQueueEntry {
     }
 
     void retransmit(final BackendInfo backend, final long now) {
-        final Request<?, ?> nextTry = lastTry.isPresent() ? lastTry.get().request.incrementRetry() : request;
-        final Request<?, ?> toSend = nextTry.toVersion(backend.getVersion());
-        final ActorRef actor = backend.getActor();
+        final long retry = lastTry.isPresent() ? lastTry.get().retry + 1 : 0;
+        final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), sequence, retry);
 
+        final ActorRef actor = backend.getActor();
         LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
         actor.tell(toSend, ActorRef.noSender());
-        lastTry = Optional.of(new LastTry(toSend, now));
+        lastTry = Optional.of(new LastTry(retry, now));
     }
 
     @Override