BUG-5280: introduce request/response Envelope 50/41150/10
authorRobert Varga <rovarga@cisco.com>
Thu, 30 Jun 2016 09:57:38 +0000 (11:57 +0200)
committerRobert Varga <rovarga@cisco.com>
Wed, 6 Jul 2016 09:52:24 +0000 (11:52 +0200)
This is a follow-up patch to move sequence information from
request/response structure and making it part of an Envelope,
which is allocated by the SequencedQueue.

Change-Id: I341118850d9c5835bab0b491f59b95264f31e5ef
Signed-off-by: Robert Varga <rovarga@cisco.com>
48 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbortLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CommitLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailure.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailureProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractMessageProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractRequestFailureProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractRequestProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseEnvelopeProxy.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractSuccessProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Envelope.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Message.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Request.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestFailure.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Response.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueue.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntry.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntryTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueTest.java

index 46351fa..1045306 100644 (file)
@@ -23,22 +23,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class AbortLocalTransactionRequest extends AbstractLocalTransactionRequest<AbortLocalTransactionRequest> {
     private static final long serialVersionUID = 1L;
 
-    public AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
+    public AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
             final @Nonnull ActorRef replyTo) {
-        this(identifier, sequence, 0, replyTo);
-    }
-
-    AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
-            final long retry, final @Nonnull ActorRef replyTo) {
-        super(identifier, sequence, retry, replyTo);
-    }
-
-    private AbortLocalTransactionRequest(final @Nonnull AbortLocalTransactionRequest request, final long retry) {
-        super(request, retry);
-    }
-
-    @Override
-    protected AbortLocalTransactionRequest cloneAsRetry(final long retry) {
-        return new AbortLocalTransactionRequest(this, retry);
+        super(identifier, replyTo);
     }
 }
\ No newline at end of file
index 9579ba4..7730c5d 100644 (file)
@@ -23,13 +23,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 abstract class AbstractLocalTransactionRequest<T extends AbstractLocalTransactionRequest<T>> extends TransactionRequest<T> {
     private static final long serialVersionUID = 1L;
 
-    AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final long sequence, final long retry,
-        final ActorRef replyTo) {
-        super(identifier, sequence, retry, replyTo);
-    }
-
-    AbstractLocalTransactionRequest(final T request, final long retry) {
-        super(request, retry);
+    AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) {
+        super(identifier, replyTo);
     }
 
     @Override
index ddc5fef..d23430f 100644 (file)
@@ -32,9 +32,9 @@ public abstract class AbstractReadTransactionRequest<T extends AbstractReadTrans
     private static final long serialVersionUID = 1L;
     private final YangInstanceIdentifier path;
 
-    AbstractReadTransactionRequest(final TransactionIdentifier identifier, final long sequence, final long retry,
-        final ActorRef replyTo, final YangInstanceIdentifier path) {
-        super(identifier, sequence, retry, replyTo);
+    AbstractReadTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo,
+        final YangInstanceIdentifier path) {
+        super(identifier, replyTo);
         this.path = Preconditions.checkNotNull(path);
     }
 
@@ -43,11 +43,6 @@ public abstract class AbstractReadTransactionRequest<T extends AbstractReadTrans
         this.path = request.getPath();
     }
 
-    AbstractReadTransactionRequest(final T request, final long retry) {
-        super(request, retry);
-        this.path = request.getPath();
-    }
-
     public final @Nonnull YangInstanceIdentifier getPath() {
         return path;
     }
index 11665a7..13b03a2 100644 (file)
@@ -51,11 +51,9 @@ abstract class AbstractReadTransactionRequestProxyV1<T extends AbstractReadTrans
     }
 
     @Override
-    protected final T createRequest(final TransactionIdentifier target, final long sequence, final long retry,
-            final ActorRef replyTo) {
-        return createReadRequest(target, sequence, retry, replyTo, path);
+    protected final T createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+        return createReadRequest(target, replyTo, path);
     }
 
-    abstract T createReadRequest(TransactionIdentifier target, long sequence, long retry, ActorRef replyTo,
-            YangInstanceIdentifier path);
+    abstract T createReadRequest(TransactionIdentifier target, ActorRef replyTo, YangInstanceIdentifier path);
 }
index 97381cc..1f73951 100644 (file)
@@ -27,26 +27,13 @@ public final class CommitLocalTransactionRequest extends AbstractLocalTransactio
     private final DataTreeModification mod;
     private final boolean coordinated;
 
-    public CommitLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
+    public CommitLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
             final @Nonnull ActorRef replyTo, final @Nonnull DataTreeModification mod, final boolean coordinated) {
-        this(identifier, sequence, 0, replyTo, mod, coordinated);
-    }
-
-    CommitLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
-            final long retry, final @Nonnull ActorRef replyTo, final @Nonnull DataTreeModification mod,
-            final boolean coordinated) {
-        super(identifier, sequence, retry, replyTo);
+        super(identifier, replyTo);
         this.mod = Preconditions.checkNotNull(mod);
         this.coordinated = coordinated;
     }
 
-
-    private CommitLocalTransactionRequest(final CommitLocalTransactionRequest request, final long retry) {
-        super(request, retry);
-        this.mod = request.mod;
-        this.coordinated = request.coordinated;
-    }
-
     public DataTreeModification getModification() {
         return mod;
     }
@@ -67,9 +54,4 @@ public final class CommitLocalTransactionRequest extends AbstractLocalTransactio
     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
         return super.addToStringAttributes(toStringHelper).add("coordinated", coordinated);
     }
-
-    @Override
-    protected CommitLocalTransactionRequest cloneAsRetry(final long retry) {
-        return new CommitLocalTransactionRequest(this, retry);
-    }
 }
\ No newline at end of file
index 7da9df5..cbb612f 100644 (file)
@@ -23,24 +23,15 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public final class ExistsTransactionRequest extends AbstractReadTransactionRequest<ExistsTransactionRequest> {
     private static final long serialVersionUID = 1L;
 
-    public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
-            final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
-        this(identifier, sequence, 0, replyTo, path);
-    }
-
-    ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
-            final long retry, final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
-        super(identifier, sequence, retry, replyTo, path);
+    public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo,
+        final @Nonnull YangInstanceIdentifier path) {
+        super(identifier, replyTo, path);
     }
 
     private ExistsTransactionRequest(final ExistsTransactionRequest request, final ABIVersion version) {
         super(request, version);
     }
 
-    private ExistsTransactionRequest(final ExistsTransactionRequest request, final long retry) {
-        super(request, retry);
-    }
-
     @Override
     protected ExistsTransactionRequest cloneAsVersion(final ABIVersion version) {
         return new ExistsTransactionRequest(this, version);
@@ -50,9 +41,4 @@ public final class ExistsTransactionRequest extends AbstractReadTransactionReque
     protected ExistsTransactionRequestProxyV1 externalizableProxy(final ABIVersion version) {
         return new ExistsTransactionRequestProxyV1(this);
     }
-
-    @Override
-    protected ExistsTransactionRequest cloneAsRetry(final long retry) {
-        return new ExistsTransactionRequest(this, retry);
-    }
 }
index ee6d3fd..4f5bd1c 100644 (file)
@@ -29,8 +29,8 @@ final class ExistsTransactionRequestProxyV1 extends AbstractReadTransactionReque
     }
 
     @Override
-    ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
-            final long retry, final ActorRef replyTo, final YangInstanceIdentifier path) {
-        return new ExistsTransactionRequest(target, sequence, retry, replyTo, path);
+    ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo,
+            final YangInstanceIdentifier path) {
+        return new ExistsTransactionRequest(target, replyTo, path);
     }
 }
\ No newline at end of file
index 5f081c7..3316792 100644 (file)
@@ -23,13 +23,8 @@ public final class ExistsTransactionSuccess extends TransactionSuccess<ExistsTra
     private static final long serialVersionUID = 1L;
     private final boolean exists;
 
-    public ExistsTransactionSuccess(final TransactionIdentifier target, final long sequence, final boolean exists) {
-        this(target, sequence, 0, exists);
-    }
-
-    ExistsTransactionSuccess(final TransactionIdentifier target, final long sequence, final long retry,
-            final boolean exists) {
-        super(target, sequence, retry);
+    public ExistsTransactionSuccess(final TransactionIdentifier target, final boolean exists) {
+        super(target);
         this.exists = exists;
     }
 
index 44a1897..6932d24 100644 (file)
@@ -44,8 +44,7 @@ final class ExistsTransactionSuccessProxyV1 extends AbstractTransactionSuccessPr
     }
 
     @Override
-    protected ExistsTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence,
-            final long retry) {
-        return new ExistsTransactionSuccess(target, sequence, retry, exists);
+    protected ExistsTransactionSuccess createSuccess(final TransactionIdentifier target) {
+        return new ExistsTransactionSuccess(target, exists);
     }
 }
index 171e5dc..fd1b1b8 100644 (file)
@@ -28,15 +28,9 @@ public final class ModifyTransactionRequest extends TransactionRequest<ModifyTra
     private final List<TransactionModification> modifications;
     private final PersistenceProtocol protocol;
 
-    private ModifyTransactionRequest(final ModifyTransactionRequest request, final long retry) {
-        super(request, retry);
-        this.modifications = request.modifications;
-        this.protocol = request.protocol;
-    }
-
-    ModifyTransactionRequest(final TransactionIdentifier target, final long sequence, final long retry,
-        final ActorRef replyTo, final List<TransactionModification> modifications, final PersistenceProtocol protocol) {
-        super(target, sequence, retry, replyTo);
+    ModifyTransactionRequest(final TransactionIdentifier target, final ActorRef replyTo,
+        final List<TransactionModification> modifications, final PersistenceProtocol protocol) {
+        super(target, replyTo);
         this.modifications = ImmutableList.copyOf(modifications);
         this.protocol = protocol;
     }
@@ -59,14 +53,8 @@ public final class ModifyTransactionRequest extends TransactionRequest<ModifyTra
         return new ModifyTransactionRequestProxyV1(this);
     }
 
-    @Override
-    protected ModifyTransactionRequest cloneAsRetry(final long retry) {
-        return new ModifyTransactionRequest(this, retry);
-    }
-
     @Override
     protected ModifyTransactionRequest cloneAsVersion(final ABIVersion version) {
         return this;
     }
-
 }
index 88c6a8e..336902e 100644 (file)
@@ -31,7 +31,6 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
     private final TransactionIdentifier identifier;
     private final ActorRef replyTo;
     private PersistenceProtocol protocol = null;
-    private long sequence;
 
     public ModifyTransactionRequestBuilder(final TransactionIdentifier identifier, final ActorRef replyTo) {
         this.identifier = Preconditions.checkNotNull(identifier);
@@ -47,12 +46,6 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
         Preconditions.checkState(protocol != null, "Batch has already been finished");
     }
 
-    public void setSequence(final long sequence) {
-        checkFinished();
-        Preconditions.checkState(modifications.isEmpty(), "Sequence must be set first");
-        this.sequence = sequence;
-    }
-
     public void addModification(final TransactionModification modification) {
         checkFinished();
         modifications.add(Preconditions.checkNotNull(modification));
@@ -76,12 +69,9 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
 
     @Override
     public ModifyTransactionRequest build() {
-        final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, sequence, 0, replyTo,
-            modifications, protocol);
+        final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, replyTo, modifications, protocol);
         modifications.clear();
         protocol = null;
-        sequence = 0;
         return ret;
     }
-
 }
index f493e13..6fbc035 100644 (file)
@@ -77,8 +77,7 @@ final class ModifyTransactionRequestProxyV1 extends AbstractTransactionRequestPr
     }
 
     @Override
-    protected ModifyTransactionRequest createRequest(final TransactionIdentifier target, final long sequence,
-            final long retry, final ActorRef replyTo) {
-        return new ModifyTransactionRequest(target, sequence, retry, replyTo, modifications, protocol.orElse(null));
+    protected ModifyTransactionRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+        return new ModifyTransactionRequest(target, replyTo, modifications, protocol.orElse(null));
     }
 }
index 0ee07c0..20b679f 100644 (file)
@@ -23,29 +23,15 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public final class ReadTransactionRequest extends AbstractReadTransactionRequest<ReadTransactionRequest> {
     private static final long serialVersionUID = 1L;
 
-    public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
-            final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
-        this(identifier, sequence, 0, replyTo, path);
-    }
-
-    ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
-            final long retry, final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
-        super(identifier, sequence, retry, replyTo, path);
+    public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo,
+            final @Nonnull YangInstanceIdentifier path) {
+        super(identifier, replyTo, path);
     }
 
     private ReadTransactionRequest(final ReadTransactionRequest request, final ABIVersion version) {
         super(request, version);
     }
 
-    private ReadTransactionRequest(final ReadTransactionRequest request, final long retry) {
-        super(request, retry);
-    }
-
-    @Override
-    protected ReadTransactionRequest cloneAsRetry(final long retry) {
-        return new ReadTransactionRequest(this, retry);
-    }
-
     @Override
     protected ReadTransactionRequest cloneAsVersion(final ABIVersion version) {
         return new ReadTransactionRequest(this, version);
index 461b1c2..ae0f6f6 100644 (file)
@@ -29,8 +29,8 @@ final class ReadTransactionRequestProxyV1 extends AbstractReadTransactionRequest
     }
 
     @Override
-    ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence, final long retry,
-            final ActorRef replyTo, final YangInstanceIdentifier path) {
-        return new ReadTransactionRequest(target, sequence, retry, replyTo, path);
+    ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo,
+            final YangInstanceIdentifier path) {
+        return new ReadTransactionRequest(target, replyTo, path);
     }
 }
\ No newline at end of file
index e83adee..45e7631 100644 (file)
@@ -24,14 +24,8 @@ public final class ReadTransactionSuccess extends TransactionSuccess<ReadTransac
     private static final long serialVersionUID = 1L;
     private final Optional<NormalizedNode<?, ?>> data;
 
-    public ReadTransactionSuccess(final TransactionIdentifier identifier, final long sequence,
-            final Optional<NormalizedNode<?, ?>> data) {
-        this(identifier, sequence, 0, data);
-    }
-
-    ReadTransactionSuccess(final TransactionIdentifier identifier, final long sequence, final long retry,
-            final Optional<NormalizedNode<?, ?>> data) {
-        super(identifier, sequence, retry);
+    public ReadTransactionSuccess(final TransactionIdentifier identifier, final Optional<NormalizedNode<?, ?>> data) {
+        super(identifier);
         this.data = Preconditions.checkNotNull(data);
     }
 
index 13e1cf0..9232842 100644 (file)
@@ -63,8 +63,7 @@ final class ReadTransactionSuccessProxyV1 extends AbstractTransactionSuccessProx
     }
 
     @Override
-    protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence,
-            final long retry) {
-        return new ReadTransactionSuccess(target, sequence, retry, data);
+    protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target) {
+        return new ReadTransactionSuccess(target, data);
     }
 }
index 7dc4b19..e31d75f 100644 (file)
@@ -22,9 +22,8 @@ public final class TransactionCanCommitSuccess extends TransactionSuccess<Transa
     private static final long serialVersionUID = 1L;
     private final ActorRef cohort;
 
-    public TransactionCanCommitSuccess(final TransactionIdentifier identifier, final long sequence, final long retry,
-            final ActorRef cohort) {
-        super(identifier, sequence, retry);
+    public TransactionCanCommitSuccess(final TransactionIdentifier identifier, final ActorRef cohort) {
+        super(identifier);
         this.cohort = Preconditions.checkNotNull(cohort);
     }
 
index 87a6c9a..a6d54f5 100644 (file)
@@ -47,8 +47,7 @@ final class TransactionCanCommitSuccessProxyV1 extends AbstractTransactionSucces
     }
 
     @Override
-    protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence,
-            final long retry) {
-        return new TransactionCanCommitSuccess(target, sequence, retry, cohort);
+    protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target) {
+        return new TransactionCanCommitSuccess(target, cohort);
     }
 }
index d3a8611..750d327 100644 (file)
@@ -22,9 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionFailure extends RequestFailure<TransactionIdentifier, TransactionFailure> {
     private static final long serialVersionUID = 1L;
 
-    TransactionFailure(final TransactionIdentifier target, final long sequence, final long retry,
-        final RequestException cause) {
-        super(target, sequence, retry, cause);
+    TransactionFailure(final TransactionIdentifier target, final RequestException cause) {
+        super(target, cause);
     }
 
     @Override
index 1a35ec1..fd99749 100644 (file)
@@ -31,9 +31,8 @@ final class TransactionFailureProxyV1 extends AbstractRequestFailureProxy<Transa
     }
 
     @Override
-    protected TransactionFailure createFailure(final TransactionIdentifier target, final long sequence,
-            final long retry,  final RequestException cause) {
-        return new TransactionFailure(target, sequence, retry, cause);
+    protected TransactionFailure createFailure(final TransactionIdentifier target, final RequestException cause) {
+        return new TransactionFailure(target, cause);
     }
 
     @Override
index 6a4a26b..154d4b3 100644 (file)
@@ -26,22 +26,17 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public abstract class TransactionRequest<T extends TransactionRequest<T>> extends Request<TransactionIdentifier, T> {
     private static final long serialVersionUID = 1L;
 
-    TransactionRequest(final TransactionIdentifier identifier, final long sequence, final long retry,
-        final ActorRef replyTo) {
-        super(identifier, sequence, retry, replyTo);
+    TransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) {
+        super(identifier, replyTo);
     }
 
     TransactionRequest(final T request, final ABIVersion version) {
         super(request, version);
     }
 
-    TransactionRequest(final T request, final long retry) {
-        super(request, retry);
-    }
-
     @Override
     public final TransactionFailure toRequestFailure(final RequestException cause) {
-        return new TransactionFailure(getTarget(), getSequence(), getRetry(), cause);
+        return new TransactionFailure(getTarget(), cause);
     }
 
     @Override
index ff56340..daa4ba1 100644 (file)
@@ -24,8 +24,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public abstract class TransactionSuccess<T extends TransactionSuccess<T>> extends RequestSuccess<TransactionIdentifier, T> {
     private static final long serialVersionUID = 1L;
 
-    TransactionSuccess(final TransactionIdentifier identifier, final long sequence, final long retry) {
-        super(identifier, sequence, retry);
+    TransactionSuccess(final TransactionIdentifier identifier) {
+        super(identifier);
     }
 
     @Override
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java
new file mode 100644 (file)
index 0000000..89458b8
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.yangtools.concepts.WritableObjects;
+
+abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externalizable {
+    private static final long serialVersionUID = 1L;
+    private T message;
+    private long sequence;
+    private long retry;
+
+    public AbstractEnvelopeProxy() {
+        // for Externalizable
+    }
+
+    AbstractEnvelopeProxy(final Envelope<T> envelope) {
+        message = envelope.getMessage();
+        sequence = envelope.getSequence();
+        retry = envelope.getRetry();
+    }
+
+    @Override
+    public final void writeExternal(final ObjectOutput out) throws IOException {
+        WritableObjects.writeLongs(out, sequence, retry);
+        out.writeObject(message);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+        final byte header = WritableObjects.readLongHeader(in);
+        sequence = WritableObjects.readFirstLong(in, header);
+        retry = WritableObjects.readSecondLong(in, header);
+        message = (T) in.readObject();
+    }
+
+    abstract Envelope<T> createEnvelope(T message, long sequence, long retry);
+
+    final Object readResolve() {
+        return createEnvelope(message, sequence, retry);
+    }
+}
\ No newline at end of file
index 04e7037..4b60aec 100644 (file)
@@ -15,7 +15,6 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import javax.annotation.Nonnull;
 import org.opendaylight.yangtools.concepts.WritableIdentifier;
-import org.opendaylight.yangtools.concepts.WritableObjects;
 
 /**
  * Abstract Externalizable proxy for use with {@link Message} subclasses.
@@ -28,8 +27,6 @@ import org.opendaylight.yangtools.concepts.WritableObjects;
 abstract class AbstractMessageProxy<T extends WritableIdentifier, C extends Message<T, C>> implements Externalizable {
     private static final long serialVersionUID = 1L;
     private T target;
-    private long sequence;
-    private long retry;
 
     protected AbstractMessageProxy() {
         // For Externalizable
@@ -37,29 +34,22 @@ abstract class AbstractMessageProxy<T extends WritableIdentifier, C extends Mess
 
     AbstractMessageProxy(final @Nonnull C message) {
         this.target = message.getTarget();
-        this.sequence = message.getSequence();
-        this.retry = message.getRetry();
     }
 
     @Override
     public void writeExternal(final ObjectOutput out) throws IOException {
         target.writeTo(out);
-        WritableObjects.writeLongs(out, sequence, retry);
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         target = Verify.verifyNotNull(readTarget(in));
-
-        final byte header = WritableObjects.readLongHeader(in);
-        sequence = WritableObjects.readFirstLong(in, header);
-        retry = WritableObjects.readSecondLong(in, header);
     }
 
     protected final Object readResolve() {
-        return Verify.verifyNotNull(createMessage(target, sequence, retry));
+        return Verify.verifyNotNull(createMessage(target));
     }
 
     protected abstract @Nonnull T readTarget(@Nonnull DataInput in) throws IOException;
-    abstract @Nonnull C createMessage(@Nonnull T target, long sequence, long retry);
+    abstract @Nonnull C createMessage(@Nonnull T target);
 }
\ No newline at end of file
index 51e1c72..97a5f05 100644 (file)
@@ -49,10 +49,9 @@ public abstract class AbstractRequestFailureProxy<T extends WritableIdentifier,
     }
 
     @Override
-    final C createResponse(final T target, final long sequence, final long retry) {
-        return createFailure(target, sequence, retry, cause);
+    final C createResponse(final T target) {
+        return createFailure(target, cause);
     }
 
-    protected abstract @Nonnull C createFailure(@Nonnull T target, long sequence, long retry,
-            @Nonnull RequestException cause);
+    protected abstract @Nonnull C createFailure(@Nonnull T target, @Nonnull RequestException cause);
 }
\ No newline at end of file
index 48b1602..c0bf959 100644 (file)
@@ -52,9 +52,9 @@ public abstract class AbstractRequestProxy<T extends WritableIdentifier, C exten
     }
 
     @Override
-    protected final C createMessage(final T target, final long sequence, final long retry) {
-        return createRequest(target, sequence, retry, replyTo);
+    final @Nonnull C createMessage(@Nonnull final T target) {
+        return createRequest(target, replyTo);
     }
 
-    protected abstract @Nonnull C createRequest(@Nonnull T target, long sequence, long retry, @Nonnull ActorRef replyTo);
+    protected abstract @Nonnull C createRequest(@Nonnull T target, @Nonnull ActorRef replyTo);
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseEnvelopeProxy.java
new file mode 100644 (file)
index 0000000..7363368
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+abstract class AbstractResponseEnvelopeProxy<T extends Response<?, ?>> extends AbstractEnvelopeProxy<T> {
+    private static final long serialVersionUID = 1L;
+
+    public AbstractResponseEnvelopeProxy() {
+        // for Externalizable
+    }
+
+    AbstractResponseEnvelopeProxy(final ResponseEnvelope<T> envelope) {
+        super(envelope);
+    }
+
+    @Override
+    abstract ResponseEnvelope<T> createEnvelope(T message, long sequence, long retry);
+}
\ No newline at end of file
index 08c3135..8262bb2 100644 (file)
@@ -31,9 +31,9 @@ abstract class AbstractResponseProxy<T extends WritableIdentifier, C extends Res
     }
 
     @Override
-    final C createMessage(final T target, final long sequence, final long retry) {
-        return createResponse(target, sequence, retry);
+    final C createMessage(final T target) {
+        return createResponse(target);
     }
 
-    abstract @Nonnull C createResponse(@Nonnull T target, long sequence, long retry);
+    abstract @Nonnull C createResponse(@Nonnull T target);
 }
index bb58785..639b20c 100644 (file)
@@ -33,9 +33,9 @@ public abstract class AbstractSuccessProxy<T extends WritableIdentifier, C exten
     }
 
     @Override
-    final C createResponse(final T target, final long sequence, final long retry) {
-        return createSuccess(target, sequence, retry);
+    final C createResponse(final T target) {
+        return createSuccess(target);
     }
 
-    protected abstract @Nonnull C createSuccess(@Nonnull T target, long sequence, long retry);
+    protected abstract @Nonnull C createSuccess(@Nonnull T target);
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Envelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Envelope.java
new file mode 100644 (file)
index 0000000..cd674b4
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import org.opendaylight.yangtools.concepts.Immutable;
+
+public abstract class Envelope<T extends Message<?, ?>> implements Immutable, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final T message;
+    private final long sequence;
+    private final long retry;
+
+    Envelope(final T message, final long sequence, final long retry) {
+        this.message = Preconditions.checkNotNull(message);
+        this.sequence = sequence;
+        this.retry = retry;
+    }
+
+    /**
+     * Get the enclosed message
+     *
+     * @return enclose message
+     */
+    public T getMessage() {
+        return message;
+    }
+
+    /**
+     * Get the message sequence of this envelope.
+     *
+     * @return Message sequence
+     */
+    public long getSequence() {
+        return sequence;
+    }
+
+    /**
+     * Get the message retry counter.
+     *
+     * @return Retry counter
+     */
+    public long getRetry() {
+        return retry;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(Envelope.class).add("sequence", Long.toUnsignedString(sequence, 16)).
+                add("retry", retry).add("message", message).toString();
+    }
+
+    final Object writeReplace() {
+        return createProxy();
+    }
+
+    abstract AbstractEnvelopeProxy<T> createProxy();
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java
new file mode 100644 (file)
index 0000000..8da54f5
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
+    private static final long serialVersionUID = 1L;
+
+    public FailureEnvelope(final RequestFailure<?, ?> message, final long sequence, final long retry) {
+        super(message, sequence, retry);
+    }
+
+    @Override
+    FailureEnvelopeProxy createProxy() {
+        return new FailureEnvelopeProxy(this);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java
new file mode 100644 (file)
index 0000000..a14e698
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+final class FailureEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestFailure<?, ?>> {
+    private static final long serialVersionUID = 1L;
+
+    public FailureEnvelopeProxy() {
+        // for Externalizable
+    }
+
+    FailureEnvelopeProxy(final FailureEnvelope envelope) {
+        super(envelope);
+    }
+
+    @Override
+    FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sequence, final long retry) {
+        return new FailureEnvelope(message, sequence, retry);
+    }
+}
index 87b0e6e..5070b7c 100644 (file)
@@ -51,27 +51,19 @@ public abstract class Message<T extends WritableIdentifier, C extends Message<T,
         Serializable {
     private static final long serialVersionUID = 1L;
     private final T target;
-    private final long sequence;
     private final ABIVersion version;
-    private final long retry;
 
-    private Message(final ABIVersion version, final T target, final long sequence, final long retry) {
+    private Message(final ABIVersion version, final T target) {
         this.target = Preconditions.checkNotNull(target);
         this.version = Preconditions.checkNotNull(version);
-        this.sequence = sequence;
-        this.retry = retry;
     }
 
-    Message(final T target, final long sequence, final long retry) {
-        this(ABIVersion.current(), target, sequence, retry);
+    Message(final T target) {
+        this(ABIVersion.current(), target);
     }
 
     Message(final C msg, final ABIVersion version) {
-        this(version, msg.getTarget(), msg.getSequence(), msg.getRetry());
-    }
-
-    Message(final C msg, final long retry) {
-        this(msg.getVersion(), msg.getTarget(), msg.getSequence(), retry);
+        this(version, msg.getTarget());
     }
 
     /**
@@ -83,29 +75,11 @@ public abstract class Message<T extends WritableIdentifier, C extends Message<T,
         return target;
     }
 
-    /**
-     * Get the message sequence of this message.
-     *
-     * @return Message sequence
-     */
-    public final long getSequence() {
-        return sequence;
-    }
-
     @VisibleForTesting
     public final ABIVersion getVersion() {
         return version;
     }
 
-    /**
-     * Get the message retry counter.
-     *
-     * @return Retry counter
-     */
-    public final long getRetry() {
-        return retry;
-    }
-
     /**
      * Return a message which will end up being serialized in the specified {@link ABIVersion}.
      *
@@ -141,24 +115,6 @@ public abstract class Message<T extends WritableIdentifier, C extends Message<T,
      */
     protected abstract @Nonnull C cloneAsVersion(@Nonnull ABIVersion version);
 
-    /**
-     * Return a message which will have the retry counter incremented by one.
-     *
-     * @return A message with the specified retry counter
-     */
-    public final @Nonnull C incrementRetry() {
-        return Verify.verifyNotNull(cloneAsRetry(retry +1));
-    }
-
-    /**
-     * Create a copy of this message which will have its retry count bumped. This method should be implemented by
-     * the concrete final message class and should invoked the equivalent of {@link #Message(Message, long)}.
-     *
-     * @param retry new retry count
-     * @return A message with the specified retry counter
-     */
-    protected abstract @Nonnull C cloneAsRetry(long retry);
-
     @Override
     public final String toString() {
         return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues()).toString();
@@ -173,7 +129,7 @@ public abstract class Message<T extends WritableIdentifier, C extends Message<T,
      * @throws NullPointerException if toStringHelper is null
      */
     protected @Nonnull ToStringHelper addToStringAttributes(final @Nonnull ToStringHelper toStringHelper) {
-        return toStringHelper.add("target", target).add("sequence", Long.toUnsignedString(sequence, 16));
+        return toStringHelper.add("target", target);
     }
 
     /**
index d0d6324..0892368 100644 (file)
@@ -29,8 +29,8 @@ public abstract class Request<T extends WritableIdentifier, C extends Request<T,
     private static final long serialVersionUID = 1L;
     private final ActorRef replyTo;
 
-    protected Request(final @Nonnull T target, final long sequence, final long retry, final @Nonnull ActorRef replyTo) {
-        super(target, sequence, retry);
+    protected Request(final @Nonnull T target, final @Nonnull ActorRef replyTo) {
+        super(target);
         this.replyTo = Preconditions.checkNotNull(replyTo);
     }
 
@@ -39,11 +39,6 @@ public abstract class Request<T extends WritableIdentifier, C extends Request<T,
         this.replyTo = Preconditions.checkNotNull(request.getReplyTo());
     }
 
-    protected Request(final C request, final long retry) {
-        super(request, retry);
-        this.replyTo = Preconditions.checkNotNull(request.getReplyTo());
-    }
-
     /**
      * Return the return address where responses to this request should be directed to.
      *
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java
new file mode 100644 (file)
index 0000000..263664b
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import akka.actor.ActorRef;
+
+public final class RequestEnvelope extends Envelope<Request<?, ?>> {
+    private static final long serialVersionUID = 1L;
+
+    public RequestEnvelope(final Request<?, ?> message, final long sequence, final long retry) {
+        super(message, sequence, retry);
+    }
+
+    @Override
+    RequestEnvelopeProxy createProxy() {
+        return new RequestEnvelopeProxy(this);
+    }
+
+    /**
+     * Respond to this envelope with a {@link RequestFailure} caused by specified {@link RequestException}.
+     *
+     * @param cause Cause of this {@link RequestFailure}
+     * @throws NullPointerException if cause is null
+     */
+    public void sendFailure(final RequestException cause) {
+        sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSequence(), getRetry()));
+    }
+
+    /**
+     * Respond to this envelope with a {@link RequestSuccess}.
+     *
+     * @param success Successful response
+     * @throws NullPointerException if success is null
+     */
+    public void sendSuccess(final RequestSuccess<?, ?> success) {
+        sendResponse(new SuccessEnvelope(success, getSequence(), getRetry()));
+    }
+
+    private void sendResponse(final ResponseEnvelope<?> envelope) {
+        getMessage().getReplyTo().tell(envelope, ActorRef.noSender());
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java
new file mode 100644 (file)
index 0000000..1b499d0
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+final class RequestEnvelopeProxy extends AbstractEnvelopeProxy<Request<?, ?>> {
+    private static final long serialVersionUID = 1L;
+
+    public RequestEnvelopeProxy() {
+        // for Externalizable
+    }
+
+    RequestEnvelopeProxy(final RequestEnvelope envelope) {
+        super(envelope);
+    }
+
+    @Override
+    RequestEnvelope createEnvelope(final Request<?, ?> message, final long sequence, final long retry) {
+        return new RequestEnvelope(message, sequence, retry);
+    }
+}
index 7fd9009..301b54e 100644 (file)
@@ -32,9 +32,8 @@ public abstract class RequestFailure<T extends WritableIdentifier, C extends Req
         this.cause = Preconditions.checkNotNull(failure.getCause());
     }
 
-    protected RequestFailure(final @Nonnull T target, final long sequence, final long retry,
-            final @Nonnull RequestException cause) {
-        super(target, sequence, retry);
+    protected RequestFailure(final @Nonnull T target, final @Nonnull RequestException cause) {
+        super(target);
         this.cause = Preconditions.checkNotNull(cause);
     }
 
index 04f8ca3..6ed4598 100644 (file)
@@ -28,8 +28,8 @@ public abstract class RequestSuccess<T extends WritableIdentifier, C extends Req
         super(success, version);
     }
 
-    protected RequestSuccess(final @Nonnull T target, final long sequence, final long retry) {
-        super(target, sequence, retry);
+    protected RequestSuccess(final @Nonnull T target) {
+        super(target);
     }
 
     @Override
index 07acb27..1284525 100644 (file)
@@ -26,17 +26,12 @@ import org.opendaylight.yangtools.concepts.WritableIdentifier;
 public abstract class Response<T extends WritableIdentifier, C extends Response<T, C>> extends Message<T, C> {
     private static final long serialVersionUID = 1L;
 
-    Response(final @Nonnull C response, final @Nonnull ABIVersion version) {
-        super(response, version);
-    }
-
-    Response(final @Nonnull T target, final long sequence, final long retry) {
-        super(target, sequence, retry);
+    Response(final @Nonnull T target) {
+        super(target);
     }
 
-    @Override
-    protected final C cloneAsRetry(final long retry) {
-        throw new UnsupportedOperationException("Responses do not support retries");
+    Response(final @Nonnull C response, final @Nonnull ABIVersion version) {
+        super(response, version);
     }
 
     @Override
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java
new file mode 100644 (file)
index 0000000..a3625f6
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
+    private static final long serialVersionUID = 1L;
+
+    ResponseEnvelope(final T message, final long sequence, final long retry) {
+        super(message, sequence, retry);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java
new file mode 100644 (file)
index 0000000..be37dae
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
+    private static final long serialVersionUID = 1L;
+
+    public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sequence, final long retry) {
+        super(message, sequence, retry);
+    }
+
+    @Override
+    SuccessEnvelopeProxy createProxy() {
+        return new SuccessEnvelopeProxy(this);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java
new file mode 100644 (file)
index 0000000..9f0fc2f
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+final class SuccessEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestSuccess<?, ?>> {
+    private static final long serialVersionUID = 1L;
+
+    public SuccessEnvelopeProxy() {
+        // for Externalizable
+    }
+
+    SuccessEnvelopeProxy(final SuccessEnvelope envelope) {
+        super(envelope);
+    }
+
+    @Override
+    SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sequence, final long retry) {
+        return new SuccessEnvelope(message, sequence, retry);
+    }
+}
index c3df604..110a414 100644 (file)
@@ -14,10 +14,11 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
-import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
+import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,21 +50,22 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
         if (command instanceof InternalCommand) {
             return ((InternalCommand) command).execute(this);
         }
-        if (command instanceof RequestSuccess) {
-            return onRequestSuccess((RequestSuccess<?, ?>) command);
+        if (command instanceof SuccessEnvelope) {
+            return onRequestSuccess((SuccessEnvelope) command);
         }
-        if (command instanceof RequestFailure) {
-            return onRequestFailure((RequestFailure<?, ?>) command);
+        if (command instanceof FailureEnvelope) {
+            return onRequestFailure((FailureEnvelope) command);
         }
 
         return onCommand(command);
     }
 
-    private ClientActorBehavior onRequestSuccess(final RequestSuccess<?, ?> success) {
-        return context().completeRequest(this, success);
+    private ClientActorBehavior onRequestSuccess(final SuccessEnvelope command) {
+        return context().completeRequest(this, command);
     }
 
-    private ClientActorBehavior onRequestFailure(final RequestFailure<?, ?> failure) {
+    private ClientActorBehavior onRequestFailure(final FailureEnvelope command) {
+        final RequestFailure<?, ?> failure = command.getMessage();
         final RequestException cause = failure.getCause();
         if (cause instanceof RetiredGenerationException) {
             LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
@@ -73,22 +75,23 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
         }
 
         if (failure.isHardFailure()) {
-            return context().completeRequest(this, failure);
+            return context().completeRequest(this, command);
         }
 
         // TODO: add instanceof checks on cause to detect more problems
 
-        LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), failure);
-        return context().completeRequest(this, failure);
+        LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), command);
+        return context().completeRequest(this, command);
     }
 
     // This method is executing in the actor context, hence we can safely interact with the queue
-    private ClientActorBehavior doSendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
+    private ClientActorBehavior doSendRequest(final long sequence, final TransactionRequest<?> request,
+            final RequestCallback callback) {
         // Get or allocate queue for the request
         final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
 
         // Note this is a tri-state return and can be null
-        final Optional<FiniteDuration> result = queue.enqueueRequest(request, callback);
+        final Optional<FiniteDuration> result = queue.enqueueRequest(sequence, request, callback);
         if (result == null) {
             // Happy path: we are done here
             return this;
@@ -188,7 +191,7 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
      * @param request Request to send
      * @param callback Callback to invoke
      */
-    public final void sendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
-        context().executeInActor(cb -> cb.doSendRequest(request, callback));
+    public final void sendRequest(final long sequence, final TransactionRequest<?> request, final RequestCallback callback) {
+        context().executeInActor(cb -> cb.doSendRequest(sequence, request, callback));
     }
 }
index 9f4fd13..530d7e2 100644 (file)
@@ -19,7 +19,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.WritableIdentifier;
@@ -94,8 +94,8 @@ public class ClientActorContext extends AbstractClientActorContext implements Id
         queues.remove(queue.getCookie(), queue);
     }
 
-    ClientActorBehavior completeRequest(final ClientActorBehavior current, final Response<?, ?> response) {
-        final WritableIdentifier id = response.getTarget();
+    ClientActorBehavior completeRequest(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
+        final WritableIdentifier id = response.getMessage().getTarget();
 
         // FIXME: this will need to be updated for other Request/Response types to extract cookie
         Preconditions.checkArgument(id instanceof TransactionIdentifier);
index 8cae0e1..50c600f 100644 (file)
@@ -20,7 +20,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
@@ -105,12 +105,13 @@ final class SequencedQueue {
      * @param callback Callback to be invoked
      * @return Optional duration with semantics described above.
      */
-    @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
+    @Nullable Optional<FiniteDuration> enqueueRequest(final long sequence, final Request<?, ?> request,
+            final RequestCallback callback) {
+        checkNotClosed();
+
         final long now = ticker.read();
-        final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
+        final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now);
 
-        // We could have check first, but argument checking needs to happen first
-        checkNotClosed();
         queue.add(e);
         LOG.debug("Enqueued request {} to queue {}", request, this);
 
@@ -127,7 +128,7 @@ final class SequencedQueue {
         }
     }
 
-    ClientActorBehavior complete(final ClientActorBehavior current, final Response<?, ?> response) {
+    ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
         // Responses to different targets may arrive out of order, hence we use an iterator
         final Iterator<SequencedQueueEntry> it = queue.iterator();
         while (it.hasNext()) {
@@ -136,7 +137,7 @@ final class SequencedQueue {
                 lastProgress = ticker.read();
                 it.remove();
                 LOG.debug("Completing request {} with {}", e, response);
-                return e.complete(response);
+                return e.complete(response.getMessage());
             }
         }
 
index 54940cd..b99b1a3 100644 (file)
@@ -12,8 +12,10 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,12 +28,12 @@ import org.slf4j.LoggerFactory;
  */
 final class SequencedQueueEntry {
     private static final class LastTry {
-        final Request<?, ?> request;
         final long timeTicks;
+        final long retry;
 
-        LastTry(final Request<?, ?> request, final long when) {
-            this.request = Preconditions.checkNotNull(request);
-            this.timeTicks = when;
+        LastTry(final long retry, final long timeTicks) {
+            this.retry = retry;
+            this.timeTicks = timeTicks;
         }
     }
 
@@ -40,26 +42,28 @@ final class SequencedQueueEntry {
     private final Request<?, ?> request;
     private final RequestCallback callback;
     private final long enqueuedTicks;
+    private final long sequence;
 
     private Optional<LastTry> lastTry = Optional.empty();
 
-    SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback, final long now) {
+    SequencedQueueEntry(final Request<?, ?> request, final long sequence, final RequestCallback callback,
+        final long now) {
         this.request = Preconditions.checkNotNull(request);
         this.callback = Preconditions.checkNotNull(callback);
         this.enqueuedTicks = now;
+        this.sequence = sequence;
     }
 
     long getSequence() {
-        return request.getSequence();
+        return sequence;
     }
 
-    boolean acceptsResponse(final Response<?, ?> response) {
-        return getSequence() == response.getSequence() && request.getTarget().equals(response.getTarget());
+    boolean acceptsResponse(final ResponseEnvelope<?> response) {
+        return getSequence() == response.getSequence() && request.getTarget().equals(response.getMessage().getTarget());
     }
 
     long getCurrentTry() {
-        final Request<?, ?> req = lastTry.isPresent() ? lastTry.get().request : request;
-        return req.getRetry();
+        return lastTry.isPresent() ? lastTry.get().retry : 0;
      }
 
     ClientActorBehavior complete(final Response<?, ?> response) {
@@ -73,20 +77,16 @@ final class SequencedQueueEntry {
     }
 
     boolean isTimedOut(final long now, final long timeoutNanos) {
-        final Request<?, ?> req;
         final long elapsed;
 
         if (lastTry.isPresent()) {
-            final LastTry t = lastTry.get();
-            elapsed = now - t.timeTicks;
-            req = t.request;
+            elapsed = now - lastTry.get().timeTicks;
         } else {
             elapsed = now - enqueuedTicks;
-            req = request;
         }
 
         if (elapsed >= timeoutNanos) {
-            LOG.debug("Request {} timed out after {}ns", req, elapsed);
+            LOG.debug("Request {} timed out after {}ns", request, elapsed);
             return true;
         } else {
             return false;
@@ -94,13 +94,13 @@ final class SequencedQueueEntry {
     }
 
     void retransmit(final BackendInfo backend, final long now) {
-        final Request<?, ?> nextTry = lastTry.isPresent() ? lastTry.get().request.incrementRetry() : request;
-        final Request<?, ?> toSend = nextTry.toVersion(backend.getVersion());
-        final ActorRef actor = backend.getActor();
+        final long retry = lastTry.isPresent() ? lastTry.get().retry + 1 : 0;
+        final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), sequence, retry);
 
+        final ActorRef actor = backend.getActor();
         LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
         actor.tell(toSend, ActorRef.noSender());
-        lastTry = Optional.of(new LastTry(toSend, now));
+        lastTry = Optional.of(new LastTry(retry, now));
     }
 
     @Override
index 68a665e..86e0531 100644 (file)
@@ -31,6 +31,7 @@ import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
 import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.Response;
@@ -47,8 +48,8 @@ public class SequencedQueueEntryTest {
     private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
         private static final long serialVersionUID = 1L;
 
-        MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) {
-            super(target, sequence, retry, cause);
+        MockFailure(final WritableIdentifier target, final RequestException cause) {
+            super(target, cause);
         }
 
         @Override
@@ -65,18 +66,13 @@ public class SequencedQueueEntryTest {
     private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
         private static final long serialVersionUID = 1L;
 
-        MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) {
-            super(target, sequence, 0, replyTo);
-        }
-
-
-        MockRequest(final MockRequest request, final long retry) {
-            super(request, retry);
+        MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
+            super(target, replyTo);
         }
 
         @Override
         public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
-            return new MockFailure(getTarget(), getSequence(), getRetry(), cause);
+            return new MockFailure(getTarget(), cause);
         }
 
         @Override
@@ -88,11 +84,6 @@ public class SequencedQueueEntryTest {
         protected MockRequest cloneAsVersion(final ABIVersion version) {
             return this;
         }
-
-        @Override
-        protected MockRequest cloneAsRetry(final long retry) {
-            return new MockRequest(this, retry);
-        }
     };
 
     @Mock
@@ -137,10 +128,10 @@ public class SequencedQueueEntryTest {
 
         mockActor = TestProbe.apply(actorSystem);
         mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
-        mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo);
+        mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
 
-        entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read());
+        entry = new SequencedQueueEntry(mockRequest, 0, mockCallback, ticker.read());
     }
 
     @After
@@ -150,7 +141,7 @@ public class SequencedQueueEntryTest {
 
     @Test
     public void testGetSequence() {
-        assertEquals(mockRequest.getSequence(), entry.getSequence());
+        assertEquals(0, entry.getSequence());
     }
 
     @Test
@@ -207,9 +198,11 @@ public class SequencedQueueEntryTest {
     }
 
      private static void assertRequestEquals(final Request<?, ?> expected, final Object o) {
-         final Request<?, ?> actual = (Request<?, ?>) o;
-         assertEquals(expected.getRetry(), actual.getRetry());
-         assertEquals(expected.getSequence(), actual.getSequence());
-         assertEquals(expected.getTarget(), actual.getTarget());
+         assertTrue(o instanceof RequestEnvelope);
+
+         final RequestEnvelope actual = (RequestEnvelope) o;
+         assertEquals(0, actual.getRetry());
+         assertEquals(0, actual.getSequence());
+         assertEquals(expected.getTarget(), actual.getMessage().getTarget());
     }
 }
index f1caeb5..1762650 100644 (file)
@@ -35,10 +35,11 @@ import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
+import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
-import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.common.actor.TestTicker;
 import org.opendaylight.yangtools.concepts.WritableIdentifier;
 import scala.concurrent.duration.FiniteDuration;
@@ -52,8 +53,8 @@ public class SequencedQueueTest {
     private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
         private static final long serialVersionUID = 1L;
 
-        MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) {
-            super(target, sequence, retry, cause);
+        MockFailure(final WritableIdentifier target, final RequestException cause) {
+            super(target, cause);
         }
 
         @Override
@@ -70,18 +71,13 @@ public class SequencedQueueTest {
     private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
         private static final long serialVersionUID = 1L;
 
-        MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) {
-            super(target, sequence, 0, replyTo);
-        }
-
-
-        MockRequest(final MockRequest request, final long retry) {
-            super(request, retry);
+        MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
+            super(target, replyTo);
         }
 
         @Override
         public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
-            return new MockFailure(getTarget(), getSequence(), getRetry(), cause);
+            return new MockFailure(getTarget(), cause);
         }
 
         @Override
@@ -93,11 +89,6 @@ public class SequencedQueueTest {
         protected MockRequest cloneAsVersion(final ABIVersion version) {
             return this;
         }
-
-        @Override
-        protected MockRequest cloneAsRetry(final long retry) {
-            return new MockRequest(this, retry);
-        }
     };
 
     @Mock
@@ -115,7 +106,8 @@ public class SequencedQueueTest {
     private BackendInfo mockBackendInfo;
     private MockRequest mockRequest;
     private MockRequest mockRequest2;
-    private Response<WritableIdentifier, ?> mockResponse;
+    private RequestFailure<WritableIdentifier, ?> mockResponse;
+    private FailureEnvelope mockResponseEnvelope;
     private Long mockCookie;
 
     private static ActorSystem actorSystem;
@@ -144,9 +136,10 @@ public class SequencedQueueTest {
 
         mockActor = TestProbe.apply(actorSystem);
         mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
-        mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo);
-        mockRequest2 = new MockRequest(mockIdentifier, mockRequest.getSequence() + 1, mockReplyTo);
+        mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
+        mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
+        mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0);
         mockCookie = ThreadLocalRandom.current().nextLong();
 
         queue = new SequencedQueue(mockCookie, ticker);
@@ -174,7 +167,7 @@ public class SequencedQueueTest {
         queue.close();
 
         // Kaboom
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
     }
 
     @Test
@@ -185,7 +178,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testPoison() {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
         queue.poison(mockCause);
 
         final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
@@ -199,7 +192,7 @@ public class SequencedQueueTest {
         queue.poison(mockCause);
 
         // Kaboom
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
     }
 
     @Test
@@ -210,7 +203,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testEnqueueRequestNeedsBackend() {
-        final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+        final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
 
         assertNotNull(ret);
         assertFalse(ret.isPresent());
@@ -232,7 +225,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testSetBackendWithNoResolution() {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
@@ -242,7 +235,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testSetBackendWithWrongProof() {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         assertTrue(queue.expectProof(proof));
@@ -260,7 +253,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testSetbackedWithRequestsNoTimer() {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         assertTrue(queue.expectProof(proof));
@@ -270,17 +263,17 @@ public class SequencedQueueTest {
         assertNotNull(ret);
         assertTrue(ret.isPresent());
 
-        assertTransmit(mockRequest);
+        assertTransmit(mockRequest, 0);
     }
 
     @Test
     public void testEnqueueRequestNeedsTimer() {
         setupBackend();
 
-        final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+        final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
         assertNotNull(ret);
         assertTrue(ret.isPresent());
-        assertTransmit(mockRequest);
+        assertTransmit(mockRequest, 0);
     }
 
     @Test
@@ -288,15 +281,15 @@ public class SequencedQueueTest {
         setupBackend();
 
         // First request
-        Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+        Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
         assertNotNull(ret);
         assertTrue(ret.isPresent());
-        assertTransmit(mockRequest);
+        assertTransmit(mockRequest, 0);
 
         // Second request, no timer fired
-        ret = queue.enqueueRequest(mockRequest2, mockCallback);
+        ret = queue.enqueueRequest(1, mockRequest2, mockCallback);
         assertNull(ret);
-        assertTransmit(mockRequest2);
+        assertTransmit(mockRequest2, 1);
     }
 
     @Test
@@ -307,14 +300,14 @@ public class SequencedQueueTest {
 
     @Test
     public void testRunTimeoutWithoutShift() throws NoProgressException {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
         final boolean ret = queue.runTimeout();
         assertFalse(ret);
     }
 
     @Test
     public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1);
 
@@ -324,7 +317,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS);
 
@@ -334,7 +327,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1);
 
@@ -344,7 +337,7 @@ public class SequencedQueueTest {
 
     @Test(expected=NoProgressException.class)
     public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
 
@@ -354,7 +347,7 @@ public class SequencedQueueTest {
 
     @Test(expected=NoProgressException.class)
     public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
 
@@ -382,31 +375,31 @@ public class SequencedQueueTest {
 
     @Test
     public void testCompleteEmpty() {
-        final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+        final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
         assertSame(mockBehavior, ret);
         verifyNoMoreInteractions(mockCallback);
     }
 
     @Test
     public void testCompleteSingle() {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
 
-        ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+        ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
         verify(mockCallback).complete(mockResponse);
         assertSame(mockBehavior, ret);
 
-        ret = queue.complete(mockBehavior, mockResponse);
+        ret = queue.complete(mockBehavior, mockResponseEnvelope);
         assertSame(mockBehavior, ret);
         verifyNoMoreInteractions(mockCallback);
     }
 
     @Test
     public void testCompleteNull() {
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
 
         doReturn(null).when(mockCallback).complete(mockResponse);
 
-        ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+        ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
         verify(mockCallback).complete(mockResponse);
         assertNull(ret);
     }
@@ -415,11 +408,11 @@ public class SequencedQueueTest {
     public void testProgressRecord() throws NoProgressException {
         setupBackend();
 
-        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.enqueueRequest(0, mockRequest, mockCallback);
 
         ticker.increment(10);
-        queue.enqueueRequest(mockRequest2, mockCallback);
-        queue.complete(mockBehavior, mockResponse);
+        queue.enqueueRequest(1, mockRequest2, mockCallback);
+        queue.complete(mockBehavior, mockResponseEnvelope);
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11);
         assertTrue(queue.runTimeout());
@@ -434,15 +427,17 @@ public class SequencedQueueTest {
         assertFalse(mockActor.msgAvailable());
     }
 
-    private void assertTransmit(final Request<?, ?> expected) {
+    private void assertTransmit(final Request<?, ?> expected, final long sequence) {
         assertTrue(mockActor.msgAvailable());
-        assertRequestEquals(expected, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
+        assertRequestEquals(expected, sequence, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
     }
 
-    private static void assertRequestEquals(final Request<?, ?> expected, final Object o) {
-        final Request<?, ?> actual = (Request<?, ?>) o;
-        assertEquals(expected.getRetry(), actual.getRetry());
-        assertEquals(expected.getSequence(), actual.getSequence());
-        assertEquals(expected.getTarget(), actual.getTarget());
+    private static void assertRequestEquals(final Request<?, ?> expected, final long sequence, final Object o) {
+        assertTrue(o instanceof RequestEnvelope);
+
+        final RequestEnvelope actual = (RequestEnvelope) o;
+        assertEquals(0, actual.getRetry());
+        assertEquals(sequence, actual.getSequence());
+        assertSame(expected, actual.getMessage());
     }
 }