BUG-5280: separate request sequence and transmit sequence 33/43333/28
authorRobert Varga <rovarga@cisco.com>
Mon, 8 Aug 2016 15:04:54 +0000 (17:04 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 24 Aug 2016 01:51:03 +0000 (01:51 +0000)
Clean up the confusion in sequence numbers. There are actually
two sequences:
- logical request sequence, which indicates the order of requests
  in which they should be applied to the target entity. It is
  assigned by logic emitting the request.
- transmit sequence within a connection to the backend, as
  initiated by ConnectClientRequest. It is assigned by SequencedQueue
  as it is transmitting requests and reset when a new connection
  to the backend is made.

This requires establishing the concept of a session, which is
a single conversation between frontend and backend. It is severed
whenever the frontend times out and re-established when the leader
is found and it responds with ConnectClientSuccess.

The sending of ConnectClientRequest is not done via the queue,
as it is part of backend resolution process. Since this is not
a performance-critical path, we use simple Patterns.ask() to
send the request and get completion notification -- which we then
translate to ShardBackendInfo.

ConnectClientSuccess gives us backend-preferred version and
backend-specified cap on the number of outstanding messages then
it can handle concurrently. This maximum is used to limit the
transmit path of SequencedQueue, so that it does not attempt
to send more requests at any given time.

Internal queue for unsent requests is kept unbounded for now,
subject to a Semaphore-driven throttle in a follow-up patch.

Change-Id: I61663073bf6632c1ed8c036dee37f1ac39cf7794
Signed-off-by: Robert Varga <rovarga@cisco.com>
85 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbortLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CommitLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientFailure.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientFailureProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailure.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailure.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailureProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractMessageProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractRequestFailureProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractRequestProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractSuccessProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Envelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Message.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Request.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestFailure.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Response.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfo.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TxDetails.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntryTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueTest.java
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ShardBackendInfo.java

index 104530651dd56598f4fcf460274544aef38b4280..5dc630cac5f0796025f3b90ab25b7bae88288cab 100644 (file)
@@ -25,6 +25,6 @@ public final class AbortLocalTransactionRequest extends AbstractLocalTransaction
 
     public AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
             final @Nonnull ActorRef replyTo) {
 
     public AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
             final @Nonnull ActorRef replyTo) {
-        super(identifier, replyTo);
+        super(identifier, 0, replyTo);
     }
 }
\ No newline at end of file
     }
 }
\ No newline at end of file
index 7730c5d615e2b27d628030ee99ba50856a5ebef9..bc9bb6225b6ac99a998f257c124b031e3b0a29e4 100644 (file)
@@ -23,8 +23,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 abstract class AbstractLocalTransactionRequest<T extends AbstractLocalTransactionRequest<T>> extends TransactionRequest<T> {
     private static final long serialVersionUID = 1L;
 
 abstract class AbstractLocalTransactionRequest<T extends AbstractLocalTransactionRequest<T>> extends TransactionRequest<T> {
     private static final long serialVersionUID = 1L;
 
-    AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) {
-        super(identifier, replyTo);
+    AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo) {
+        super(identifier, sequence, replyTo);
     }
 
     @Override
     }
 
     @Override
index d23430fa038b435cfeb13e3a316d7f034e717af5..a8fbbcfb3300433b2176ae9bbabdac1da8d70226 100644 (file)
@@ -32,9 +32,9 @@ public abstract class AbstractReadTransactionRequest<T extends AbstractReadTrans
     private static final long serialVersionUID = 1L;
     private final YangInstanceIdentifier path;
 
     private static final long serialVersionUID = 1L;
     private final YangInstanceIdentifier path;
 
-    AbstractReadTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo,
+    AbstractReadTransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo,
         final YangInstanceIdentifier path) {
         final YangInstanceIdentifier path) {
-        super(identifier, replyTo);
+        super(identifier, sequence, replyTo);
         this.path = Preconditions.checkNotNull(path);
     }
 
         this.path = Preconditions.checkNotNull(path);
     }
 
index 13b03a2454e612d88da83c92b30eeeb9359f96ae..e2d07f13b604e03bea930e5fef25f240e404ecf2 100644 (file)
@@ -51,9 +51,10 @@ abstract class AbstractReadTransactionRequestProxyV1<T extends AbstractReadTrans
     }
 
     @Override
     }
 
     @Override
-    protected final T createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        return createReadRequest(target, replyTo, path);
+    protected final T createRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+        return createReadRequest(target, sequence, replyTo, path);
     }
 
     }
 
-    abstract T createReadRequest(TransactionIdentifier target, ActorRef replyTo, YangInstanceIdentifier path);
+    abstract T createReadRequest(TransactionIdentifier target, long sequence, ActorRef replyTo,
+            YangInstanceIdentifier path);
 }
 }
index 1f739515c99c6f0c6eb1c4e960a97ebc4f3b113f..1cb7bb1c695e91b1dfeb6367c65150465e1118bb 100644 (file)
@@ -29,7 +29,7 @@ public final class CommitLocalTransactionRequest extends AbstractLocalTransactio
 
     public CommitLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
             final @Nonnull ActorRef replyTo, final @Nonnull DataTreeModification mod, final boolean coordinated) {
 
     public CommitLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
             final @Nonnull ActorRef replyTo, final @Nonnull DataTreeModification mod, final boolean coordinated) {
-        super(identifier, replyTo);
+        super(identifier, 0, replyTo);
         this.mod = Preconditions.checkNotNull(mod);
         this.coordinated = coordinated;
     }
         this.mod = Preconditions.checkNotNull(mod);
         this.coordinated = coordinated;
     }
index c1380cc980c640168ddf36dbe19157807ac5427a..46b460ac0d364ec137113fcb65cea599ac724058 100644 (file)
@@ -23,8 +23,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 public final class ConnectClientFailure extends RequestFailure<ClientIdentifier, ConnectClientFailure> {
     private static final long serialVersionUID = 1L;
 
 public final class ConnectClientFailure extends RequestFailure<ClientIdentifier, ConnectClientFailure> {
     private static final long serialVersionUID = 1L;
 
-    ConnectClientFailure(final ClientIdentifier target, final RequestException cause) {
-        super(target, cause);
+    ConnectClientFailure(final ClientIdentifier target, final long sequence, final RequestException cause) {
+        super(target, sequence, cause);
     }
 
     private ConnectClientFailure(final ConnectClientFailure failure, final ABIVersion version) {
     }
 
     private ConnectClientFailure(final ConnectClientFailure failure, final ABIVersion version) {
index 5635754aee3f6cfeb1169d222e4a0709c0c0e42b..7059fe7ec1180430ee197a2da4971b6e6c28f625 100644 (file)
@@ -29,8 +29,9 @@ final class ConnectClientFailureProxyV1 extends AbstractRequestFailureProxy<Clie
     }
 
     @Override
     }
 
     @Override
-    protected ConnectClientFailure createFailure(final ClientIdentifier target, final RequestException cause) {
-        return new ConnectClientFailure(target, cause);
+    protected ConnectClientFailure createFailure(final ClientIdentifier target, final long sequence,
+            final RequestException cause) {
+        return new ConnectClientFailure(target, sequence, cause);
     }
 
     @Override
     }
 
     @Override
index 4800cea5e2a292db468103c1585ad7353185e2e2..45e7ba89b6c8ce8e78155bf190415e7a4ec01048 100644 (file)
@@ -34,26 +34,23 @@ public final class ConnectClientRequest extends Request<ClientIdentifier, Connec
 
     private final ABIVersion minVersion;
     private final ABIVersion maxVersion;
 
     private final ABIVersion minVersion;
     private final ABIVersion maxVersion;
-    private final long resumeSequence;
 
 
-    public ConnectClientRequest(final ClientIdentifier identifier, final ActorRef replyTo, final ABIVersion minVersion,
-            final ABIVersion maxVersion) {
-        this(identifier, replyTo, minVersion, maxVersion, 0);
+    ConnectClientRequest(final ClientIdentifier identifier, final long txSequence, final ActorRef replyTo,
+            final ABIVersion minVersion, final ABIVersion maxVersion) {
+        super(identifier, txSequence, replyTo);
+        this.minVersion = Preconditions.checkNotNull(minVersion);
+        this.maxVersion = Preconditions.checkNotNull(maxVersion);
     }
 
     public ConnectClientRequest(final ClientIdentifier identifier, final ActorRef replyTo, final ABIVersion minVersion,
     }
 
     public ConnectClientRequest(final ClientIdentifier identifier, final ActorRef replyTo, final ABIVersion minVersion,
-            final ABIVersion maxVersion, final long resumeSequence) {
-        super(identifier, replyTo);
-        this.minVersion = Preconditions.checkNotNull(minVersion);
-        this.maxVersion = Preconditions.checkNotNull(maxVersion);
-        this.resumeSequence = resumeSequence;
+            final ABIVersion maxVersion) {
+        this(identifier, 0, replyTo, minVersion, maxVersion);
     }
 
     private ConnectClientRequest(final ConnectClientRequest request, final ABIVersion version) {
         super(request, version);
         this.minVersion = request.minVersion;
         this.maxVersion = request.maxVersion;
     }
 
     private ConnectClientRequest(final ConnectClientRequest request, final ABIVersion version) {
         super(request, version);
         this.minVersion = request.minVersion;
         this.maxVersion = request.maxVersion;
-        this.resumeSequence = request.resumeSequence;
     }
 
     public ABIVersion getMinVersion() {
     }
 
     public ABIVersion getMinVersion() {
@@ -64,13 +61,9 @@ public final class ConnectClientRequest extends Request<ClientIdentifier, Connec
         return maxVersion;
     }
 
         return maxVersion;
     }
 
-    public long getResumeSequence() {
-        return resumeSequence;
-    }
-
     @Override
     public final ConnectClientFailure toRequestFailure(final RequestException cause) {
     @Override
     public final ConnectClientFailure toRequestFailure(final RequestException cause) {
-        return new ConnectClientFailure(getTarget(), cause);
+        return new ConnectClientFailure(getTarget(), getSequence(), cause);
     }
 
     @Override
     }
 
     @Override
@@ -85,7 +78,6 @@ public final class ConnectClientRequest extends Request<ClientIdentifier, Connec
 
     @Override
     protected @Nonnull ToStringHelper addToStringAttributes(final @Nonnull ToStringHelper toStringHelper) {
 
     @Override
     protected @Nonnull ToStringHelper addToStringAttributes(final @Nonnull ToStringHelper toStringHelper) {
-        return super.addToStringAttributes(toStringHelper).add("minVersion", minVersion).add("maxVersion", maxVersion)
-                .add("resumeSequence", resumeSequence);
+        return super.addToStringAttributes(toStringHelper).add("minVersion", minVersion).add("maxVersion", maxVersion);
     }
 }
     }
 }
index 5fb26d071b4e59ff8f2d9a7c2aa7c0bf54daa69e..d4c8b111b3c6b2b81d42aebc31f354bef43f8944 100644 (file)
@@ -15,7 +15,6 @@ import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.yangtools.concepts.WritableObjects;
 
 /**
  * Externalizable proxy for use with {@link ConnectClientRequest}. It implements the initial (Boron) serialization
 
 /**
  * Externalizable proxy for use with {@link ConnectClientRequest}. It implements the initial (Boron) serialization
@@ -26,7 +25,6 @@ import org.opendaylight.yangtools.concepts.WritableObjects;
 final class ConnectClientRequestProxyV1 extends AbstractRequestProxy<ClientIdentifier, ConnectClientRequest> {
     private ABIVersion minVersion;
     private ABIVersion maxVersion;
 final class ConnectClientRequestProxyV1 extends AbstractRequestProxy<ClientIdentifier, ConnectClientRequest> {
     private ABIVersion minVersion;
     private ABIVersion maxVersion;
-    private long resumeSequence;
 
     public ConnectClientRequestProxyV1() {
         // for Externalizable
 
     public ConnectClientRequestProxyV1() {
         // for Externalizable
@@ -36,7 +34,6 @@ final class ConnectClientRequestProxyV1 extends AbstractRequestProxy<ClientIdent
         super(request);
         this.minVersion = request.getMinVersion();
         this.maxVersion = request.getMaxVersion();
         super(request);
         this.minVersion = request.getMinVersion();
         this.maxVersion = request.getMaxVersion();
-        this.resumeSequence = request.getResumeSequence();
     }
 
     @Override
     }
 
     @Override
@@ -44,7 +41,6 @@ final class ConnectClientRequestProxyV1 extends AbstractRequestProxy<ClientIdent
         super.writeExternal(out);
         minVersion.writeTo(out);
         maxVersion.writeTo(out);
         super.writeExternal(out);
         minVersion.writeTo(out);
         maxVersion.writeTo(out);
-        WritableObjects.writeLong(out, resumeSequence);
     }
 
     @Override
     }
 
     @Override
@@ -52,12 +48,12 @@ final class ConnectClientRequestProxyV1 extends AbstractRequestProxy<ClientIdent
         super.readExternal(in);
         minVersion = ABIVersion.inexactReadFrom(in);
         maxVersion = ABIVersion.inexactReadFrom(in);
         super.readExternal(in);
         minVersion = ABIVersion.inexactReadFrom(in);
         maxVersion = ABIVersion.inexactReadFrom(in);
-        resumeSequence = WritableObjects.readLong(in);
     }
 
     @Override
     }
 
     @Override
-    protected ConnectClientRequest createRequest(final ClientIdentifier target, final ActorRef replyTo) {
-        return new ConnectClientRequest(target, replyTo, minVersion, maxVersion, resumeSequence);
+    protected ConnectClientRequest createRequest(final ClientIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new ConnectClientRequest(target, sequence, replyTo, minVersion, maxVersion);
     }
 
     @Override
     }
 
     @Override
index 7b2ea076f3c4c113a4eccd43d5f7d8603a4a70a8..65389ec17c4ca589ecc15c4895c2c69471eb9b2e 100644 (file)
@@ -35,11 +35,11 @@ public final class ConnectClientSuccess extends RequestSuccess<ClientIdentifier,
     private final List<ActorSelection> alternates;
     private final DataTree dataTree;
     private final ActorRef backend;
     private final List<ActorSelection> alternates;
     private final DataTree dataTree;
     private final ActorRef backend;
-    private final long maxMessages;
+    private final int maxMessages;
 
 
-    ConnectClientSuccess(final ClientIdentifier target, final ActorRef backend, final List<ActorSelection> alternates,
-        final Optional<DataTree> dataTree, final long maxMessages) {
-        super(target);
+    ConnectClientSuccess(final ClientIdentifier target, final long sequence, final ActorRef backend,
+        final List<ActorSelection> alternates, final Optional<DataTree> dataTree, final int maxMessages) {
+        super(target, sequence);
         this.backend = Preconditions.checkNotNull(backend);
         this.alternates = ImmutableList.copyOf(alternates);
         this.dataTree = dataTree.orElse(null);
         this.backend = Preconditions.checkNotNull(backend);
         this.alternates = ImmutableList.copyOf(alternates);
         this.dataTree = dataTree.orElse(null);
@@ -47,9 +47,10 @@ public final class ConnectClientSuccess extends RequestSuccess<ClientIdentifier,
         this.maxMessages = maxMessages;
     }
 
         this.maxMessages = maxMessages;
     }
 
-    public ConnectClientSuccess(final @Nonnull ClientIdentifier target, final @Nonnull ActorRef backend,
-            final @Nonnull List<ActorSelection> alternates, final @Nonnull DataTree dataTree, final long maxMessages) {
-        this(target, backend, alternates, Optional.of(dataTree), maxMessages);
+    public ConnectClientSuccess(final @Nonnull ClientIdentifier target, final long sequence,
+            final @Nonnull ActorRef backend, final @Nonnull List<ActorSelection> alternates,
+            final @Nonnull DataTree dataTree, final int maxMessages) {
+        this(target, sequence, backend, alternates, Optional.of(dataTree), maxMessages);
     }
 
     /**
     }
 
     /**
@@ -69,7 +70,7 @@ public final class ConnectClientSuccess extends RequestSuccess<ClientIdentifier,
         return Optional.ofNullable(dataTree);
     }
 
         return Optional.ofNullable(dataTree);
     }
 
-    public long getMaxMessages() {
+    public int getMaxMessages() {
         return maxMessages;
     }
 
         return maxMessages;
     }
 
@@ -85,6 +86,7 @@ public final class ConnectClientSuccess extends RequestSuccess<ClientIdentifier,
 
     @Override
     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
 
     @Override
     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-        return super.addToStringAttributes(toStringHelper).add("alternates", alternates).add("dataTree", dataTree);
+        return super.addToStringAttributes(toStringHelper).add("alternates", alternates).add("dataTree", dataTree)
+                .add("maxMessages", maxMessages);
     }
 }
     }
 }
index 6c9ae2a19ad4eb4840d9e5ab903b98d4995f286a..f9fddb72766a72f3400ccb1569b330bc2e1701f8 100644 (file)
@@ -32,7 +32,7 @@ final class ConnectClientSuccessProxyV1 extends AbstractSuccessProxy<ClientIdent
 
     private List<ActorSelection> alternates;
     private ActorRef backend;
 
     private List<ActorSelection> alternates;
     private ActorRef backend;
-    private long maxMessages;
+    private int maxMessages;
 
     public ConnectClientSuccessProxyV1() {
         // For Externalizable
 
     public ConnectClientSuccessProxyV1() {
         // For Externalizable
@@ -42,6 +42,7 @@ final class ConnectClientSuccessProxyV1 extends AbstractSuccessProxy<ClientIdent
         super(success);
         this.alternates = success.getAlternates();
         this.backend = success.getBackend();
         super(success);
         this.alternates = success.getAlternates();
         this.backend = success.getBackend();
+        this.maxMessages = success.getMaxMessages();
         // We are ignoring the DataTree, it is not serializable anyway
     }
 
         // We are ignoring the DataTree, it is not serializable anyway
     }
 
@@ -49,38 +50,32 @@ final class ConnectClientSuccessProxyV1 extends AbstractSuccessProxy<ClientIdent
     public void writeExternal(final ObjectOutput out) throws IOException {
         super.writeExternal(out);
 
     public void writeExternal(final ObjectOutput out) throws IOException {
         super.writeExternal(out);
 
-        out.writeUTF(Serialization.serializedActorPath(backend));
+        out.writeObject(Serialization.serializedActorPath(backend));
+        out.writeInt(maxMessages);
 
         out.writeInt(alternates.size());
         for (ActorSelection b : alternates) {
             out.writeObject(b.toSerializationFormat());
         }
 
         out.writeInt(alternates.size());
         for (ActorSelection b : alternates) {
             out.writeObject(b.toSerializationFormat());
         }
-
-        out.writeLong(maxMessages);
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
 
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
 
-        backend = JavaSerializer.currentSystem().value().provider().resolveActorRef(in.readUTF());
-
-        final int backendsSize = in.readInt();
-        if (backendsSize < 1) {
-            throw new IOException("Illegal number of backends " + backendsSize);
-        }
+        backend = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
+        maxMessages = in.readInt();
 
 
-        alternates = new ArrayList<>(backendsSize);
-        for (int i = 0; i < backendsSize; ++i) {
+        final int alternatesSize = in.readInt();
+        alternates = new ArrayList<>(alternatesSize);
+        for (int i = 0; i < alternatesSize; ++i) {
             alternates.add(ActorSelection.apply(ActorRef.noSender(), (String)in.readObject()));
         }
             alternates.add(ActorSelection.apply(ActorRef.noSender(), (String)in.readObject()));
         }
-
-        maxMessages = in.readLong();
     }
 
     @Override
     }
 
     @Override
-    protected ConnectClientSuccess createSuccess(final ClientIdentifier target) {
-        return new ConnectClientSuccess(target, backend, alternates, Optional.empty(), maxMessages);
+    protected ConnectClientSuccess createSuccess(final ClientIdentifier target, final long sequence) {
+        return new ConnectClientSuccess(target, sequence, backend, alternates, Optional.empty(), maxMessages);
     }
 
     @Override
     }
 
     @Override
index 28a77c4426cbdfe31f2a74f42ea620d1f2e095a2..eebcf36c4db0b20295bc28ced551cdc036cd7d8e 100644 (file)
@@ -22,7 +22,11 @@ public final class CreateLocalHistoryRequest extends LocalHistoryRequest<CreateL
     private static final long serialVersionUID = 1L;
 
     public CreateLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
     private static final long serialVersionUID = 1L;
 
     public CreateLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+        this(target, 0, replyTo);
+    }
+
+    CreateLocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     private CreateLocalHistoryRequest(final CreateLocalHistoryRequest request, final ABIVersion version) {
     }
 
     private CreateLocalHistoryRequest(final CreateLocalHistoryRequest request, final ABIVersion version) {
index 234e477ee18f79ebb2f4762ecc7fddc5005d8071..8a17e909f11b38b31f8ef051a5dae3a6525e5505 100644 (file)
@@ -28,7 +28,8 @@ final class CreateLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequest
     }
 
     @Override
     }
 
     @Override
-    protected CreateLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        return new CreateLocalHistoryRequest(target, replyTo);
+    protected CreateLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new CreateLocalHistoryRequest(target, sequence, replyTo);
     }
 }
     }
 }
index 1997ddd74ace16a73bc2258c08bcdf552a59984c..505ad37ac3bc36dd3657669b25aedb92e2a7044c 100644 (file)
@@ -21,8 +21,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 public final class DestroyLocalHistoryRequest extends LocalHistoryRequest<DestroyLocalHistoryRequest> {
     private static final long serialVersionUID = 1L;
 
 public final class DestroyLocalHistoryRequest extends LocalHistoryRequest<DestroyLocalHistoryRequest> {
     private static final long serialVersionUID = 1L;
 
-    public DestroyLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    public DestroyLocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     private DestroyLocalHistoryRequest(final DestroyLocalHistoryRequest request, final ABIVersion version) {
     }
 
     private DestroyLocalHistoryRequest(final DestroyLocalHistoryRequest request, final ABIVersion version) {
index 9e953ceb210fd806a904c02718a2c0d7f5330beb..845d0193e2d9b98c3b432168ce1b692fa7bbee76 100644 (file)
@@ -28,7 +28,8 @@ final class DestroyLocalHistoryRequestProxyV1 extends AbstractLocalHistoryReques
     }
 
     @Override
     }
 
     @Override
-    protected DestroyLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        return new DestroyLocalHistoryRequest(target, replyTo);
+    protected DestroyLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new DestroyLocalHistoryRequest(target, sequence, replyTo);
     }
 }
     }
 }
index cbb612f9ec0ecf4c95bd6b6ae95e70ac44bbca13..f689e2552ff00d88fd72ab575ad7a32f9654b2bc 100644 (file)
@@ -23,9 +23,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public final class ExistsTransactionRequest extends AbstractReadTransactionRequest<ExistsTransactionRequest> {
     private static final long serialVersionUID = 1L;
 
 public final class ExistsTransactionRequest extends AbstractReadTransactionRequest<ExistsTransactionRequest> {
     private static final long serialVersionUID = 1L;
 
-    public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo,
-        final @Nonnull YangInstanceIdentifier path) {
-        super(identifier, replyTo, path);
+    public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
+            final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
+        super(identifier, sequence, replyTo, path);
     }
 
     private ExistsTransactionRequest(final ExistsTransactionRequest request, final ABIVersion version) {
     }
 
     private ExistsTransactionRequest(final ExistsTransactionRequest request, final ABIVersion version) {
index 4f5bd1c42fc052729aec97c2a209a2e96cf10da9..84f74004fe59aa6626f724c0565e6eea2449b576 100644 (file)
@@ -29,8 +29,8 @@ final class ExistsTransactionRequestProxyV1 extends AbstractReadTransactionReque
     }
 
     @Override
     }
 
     @Override
-    ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo,
-            final YangInstanceIdentifier path) {
-        return new ExistsTransactionRequest(target, replyTo, path);
+    ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo, final YangInstanceIdentifier path) {
+        return new ExistsTransactionRequest(target, sequence, replyTo, path);
     }
 }
\ No newline at end of file
     }
 }
\ No newline at end of file
index 5481d0de40ff45bf26f2527af2249ba46222fee6..8a1704de763725d3e2aaa0a2aad8baba43c00f36 100644 (file)
@@ -23,8 +23,8 @@ public final class ExistsTransactionSuccess extends TransactionSuccess<ExistsTra
     private static final long serialVersionUID = 1L;
     private final boolean exists;
 
     private static final long serialVersionUID = 1L;
     private final boolean exists;
 
-    public ExistsTransactionSuccess(final TransactionIdentifier target, final boolean exists) {
-        super(target);
+    public ExistsTransactionSuccess(final TransactionIdentifier target, final long sequence, final boolean exists) {
+        super(target, sequence);
         this.exists = exists;
     }
 
         this.exists = exists;
     }
 
index 6932d2414e5611c4289da34365b4e07bd0af9395..538c1ffa7fbee1c4a9298ed8aa0e36002c151169 100644 (file)
@@ -44,7 +44,7 @@ final class ExistsTransactionSuccessProxyV1 extends AbstractTransactionSuccessPr
     }
 
     @Override
     }
 
     @Override
-    protected ExistsTransactionSuccess createSuccess(final TransactionIdentifier target) {
-        return new ExistsTransactionSuccess(target, exists);
+    protected ExistsTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new ExistsTransactionSuccess(target, sequence, exists);
     }
 }
     }
 }
index 795468c03e027e2332c868a499cb36e17be1295a..4fd69c24cee0e2ec2948f768975683a683038faa 100644 (file)
@@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 public final class LocalHistoryFailure extends RequestFailure<LocalHistoryIdentifier, LocalHistoryFailure> {
     private static final long serialVersionUID = 1L;
 
 public final class LocalHistoryFailure extends RequestFailure<LocalHistoryIdentifier, LocalHistoryFailure> {
     private static final long serialVersionUID = 1L;
 
-    LocalHistoryFailure(final LocalHistoryIdentifier target, final RequestException cause) {
-        super(target, cause);
+    LocalHistoryFailure(final LocalHistoryIdentifier target, final long sequence, final RequestException cause) {
+        super(target, sequence, cause);
     }
 
     @Override
     }
 
     @Override
index e8cdf6d19f32460d3fed5f99252bb5ce1d2a6e87..0f3e533ba2a766a48a52c4ece0ab52dd439ec6ac 100644 (file)
@@ -31,8 +31,9 @@ final class LocalHistoryFailureProxyV1 extends AbstractRequestFailureProxy<Local
     }
 
     @Override
     }
 
     @Override
-    protected LocalHistoryFailure createFailure(final LocalHistoryIdentifier target, final RequestException cause) {
-        return new LocalHistoryFailure(target, cause);
+    protected LocalHistoryFailure createFailure(final LocalHistoryIdentifier target, final long sequence,
+            final RequestException cause) {
+        return new LocalHistoryFailure(target, sequence, cause);
     }
 
     @Override
     }
 
     @Override
index badb763629a75494af4a56091fb9d42f674eb967..c74711618cd9533b23119ff50b5ba3feb402eaf4 100644 (file)
@@ -26,8 +26,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 public abstract class LocalHistoryRequest<T extends LocalHistoryRequest<T>> extends Request<LocalHistoryIdentifier, T> {
     private static final long serialVersionUID = 1L;
 
 public abstract class LocalHistoryRequest<T extends LocalHistoryRequest<T>> extends Request<LocalHistoryIdentifier, T> {
     private static final long serialVersionUID = 1L;
 
-    LocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    LocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     LocalHistoryRequest(final T request, final ABIVersion version) {
     }
 
     LocalHistoryRequest(final T request, final ABIVersion version) {
@@ -36,7 +36,7 @@ public abstract class LocalHistoryRequest<T extends LocalHistoryRequest<T>> exte
 
     @Override
     public final LocalHistoryFailure toRequestFailure(final RequestException cause) {
 
     @Override
     public final LocalHistoryFailure toRequestFailure(final RequestException cause) {
-        return new LocalHistoryFailure(getTarget(), cause);
+        return new LocalHistoryFailure(getTarget(), getSequence(), cause);
     }
 
     @Override
     }
 
     @Override
index 4e588cc20006680b5066bd99b814ce4fe4207bac..3b8ed35816ede5bb36a0e41ddaa57f7d74ae5971 100644 (file)
@@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 public final class LocalHistorySuccess extends RequestSuccess<LocalHistoryIdentifier, LocalHistorySuccess> {
     private static final long serialVersionUID = 1L;
 
 public final class LocalHistorySuccess extends RequestSuccess<LocalHistoryIdentifier, LocalHistorySuccess> {
     private static final long serialVersionUID = 1L;
 
-    public LocalHistorySuccess(final LocalHistoryIdentifier target) {
-        super(target);
+    public LocalHistorySuccess(final LocalHistoryIdentifier target, final long sequence) {
+        super(target, sequence);
     }
 
     private LocalHistorySuccess(final LocalHistorySuccess success, final ABIVersion version) {
     }
 
     private LocalHistorySuccess(final LocalHistorySuccess success, final ABIVersion version) {
index 7806c333546bfbddefc21c335cf5f47493b40a57..23858b20b2221bb55b99b1477001329782fc0da1 100644 (file)
@@ -34,7 +34,7 @@ final class LocalHistorySuccessProxyV1 extends AbstractSuccessProxy<LocalHistory
     }
 
     @Override
     }
 
     @Override
-    protected LocalHistorySuccess createSuccess(final LocalHistoryIdentifier target) {
-        return new LocalHistorySuccess(target);
+    protected LocalHistorySuccess createSuccess(final LocalHistoryIdentifier target, final long sequence) {
+        return new LocalHistorySuccess(target, sequence);
     }
 }
     }
 }
index fd1b1b8e995c65fc4b299e55b4ba2ed2e0861249..416297d81a0fe5bc8d03dcceb052cb6a1b1f6fc4 100644 (file)
@@ -28,9 +28,9 @@ public final class ModifyTransactionRequest extends TransactionRequest<ModifyTra
     private final List<TransactionModification> modifications;
     private final PersistenceProtocol protocol;
 
     private final List<TransactionModification> modifications;
     private final PersistenceProtocol protocol;
 
-    ModifyTransactionRequest(final TransactionIdentifier target, final ActorRef replyTo,
+    ModifyTransactionRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo,
         final List<TransactionModification> modifications, final PersistenceProtocol protocol) {
         final List<TransactionModification> modifications, final PersistenceProtocol protocol) {
-        super(target, replyTo);
+        super(target, sequence, replyTo);
         this.modifications = ImmutableList.copyOf(modifications);
         this.protocol = protocol;
     }
         this.modifications = ImmutableList.copyOf(modifications);
         this.protocol = protocol;
     }
index 336902e192fe0ce0eebf687d1fc202b20181fbed..4e2a8cec2a26ae1a1485016b698446397cb55142 100644 (file)
@@ -30,7 +30,8 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
     private final List<TransactionModification> modifications = new ArrayList<>(1);
     private final TransactionIdentifier identifier;
     private final ActorRef replyTo;
     private final List<TransactionModification> modifications = new ArrayList<>(1);
     private final TransactionIdentifier identifier;
     private final ActorRef replyTo;
-    private PersistenceProtocol protocol = null;
+    private PersistenceProtocol protocol;
+    private Long sequence;
 
     public ModifyTransactionRequestBuilder(final TransactionIdentifier identifier, final ActorRef replyTo) {
         this.identifier = Preconditions.checkNotNull(identifier);
 
     public ModifyTransactionRequestBuilder(final TransactionIdentifier identifier, final ActorRef replyTo) {
         this.identifier = Preconditions.checkNotNull(identifier);
@@ -42,24 +43,28 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
         return identifier;
     }
 
         return identifier;
     }
 
-    private void checkFinished() {
-        Preconditions.checkState(protocol != null, "Batch has already been finished");
+    private void checkNotFinished() {
+        Preconditions.checkState(protocol == null, "Batch has already been finished");
     }
 
     public void addModification(final TransactionModification modification) {
     }
 
     public void addModification(final TransactionModification modification) {
-        checkFinished();
+        checkNotFinished();
         modifications.add(Preconditions.checkNotNull(modification));
     }
 
         modifications.add(Preconditions.checkNotNull(modification));
     }
 
+    public void setSequence(final long sequence) {
+        this.sequence = sequence;
+    }
+
     public void setAbort() {
     public void setAbort() {
-        checkFinished();
+        checkNotFinished();
         // Transaction is being aborted, no need to transmit operations
         modifications.clear();
         protocol = PersistenceProtocol.ABORT;
     }
 
     public void setCommit(final boolean coordinated) {
         // Transaction is being aborted, no need to transmit operations
         modifications.clear();
         protocol = PersistenceProtocol.ABORT;
     }
 
     public void setCommit(final boolean coordinated) {
-        checkFinished();
+        checkNotFinished();
         protocol = coordinated ? PersistenceProtocol.THREE_PHASE : PersistenceProtocol.SIMPLE;
     }
 
         protocol = coordinated ? PersistenceProtocol.THREE_PHASE : PersistenceProtocol.SIMPLE;
     }
 
@@ -69,9 +74,13 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
 
     @Override
     public ModifyTransactionRequest build() {
 
     @Override
     public ModifyTransactionRequest build() {
-        final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, replyTo, modifications, protocol);
+        Preconditions.checkState(sequence != null, "Request sequence has not been set");
+
+        final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, sequence, replyTo, modifications,
+            protocol);
         modifications.clear();
         protocol = null;
         modifications.clear();
         protocol = null;
+        sequence = null;
         return ret;
     }
 }
         return ret;
     }
 }
index 57b50505762a2d69f09e77c73c41639c0e8e9b24..d9de3afc694eb608c2b2f3868f5e2ece31562c79 100644 (file)
@@ -76,7 +76,8 @@ final class ModifyTransactionRequestProxyV1 extends AbstractTransactionRequestPr
     }
 
     @Override
     }
 
     @Override
-    protected ModifyTransactionRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        return new ModifyTransactionRequest(target, replyTo, modifications, protocol.orElse(null));
+    protected ModifyTransactionRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new ModifyTransactionRequest(target, sequence, replyTo, modifications, protocol.orElse(null));
     }
 }
     }
 }
index 4390d17e5d40edcebfddd933ad8771f1d7725053..ecbd749dd1f8ada03f0f8a1b83beabc0ba0ef7a8 100644 (file)
@@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 public final class PurgeLocalHistoryRequest extends LocalHistoryRequest<PurgeLocalHistoryRequest> {
     private static final long serialVersionUID = 1L;
 
 public final class PurgeLocalHistoryRequest extends LocalHistoryRequest<PurgeLocalHistoryRequest> {
     private static final long serialVersionUID = 1L;
 
-    public PurgeLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    public PurgeLocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     private PurgeLocalHistoryRequest(final PurgeLocalHistoryRequest request, final ABIVersion version) {
     }
 
     private PurgeLocalHistoryRequest(final PurgeLocalHistoryRequest request, final ABIVersion version) {
index 0aaac6e32a672ca0715523d1b92bc9cfa6c36321..1ac90daa213ec6b0804b00149ea9959ccea13369 100644 (file)
@@ -28,7 +28,8 @@ final class PurgeLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestP
     }
 
     @Override
     }
 
     @Override
-    protected PurgeLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        return new PurgeLocalHistoryRequest(target, replyTo);
+    protected PurgeLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new PurgeLocalHistoryRequest(target, sequence, replyTo);
     }
 }
     }
 }
index 20b679f420284f70cadb57cf0b51bc290cfdc65d..2d754cbb18a60d2a84613869729f395ba15ec4e4 100644 (file)
@@ -23,9 +23,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public final class ReadTransactionRequest extends AbstractReadTransactionRequest<ReadTransactionRequest> {
     private static final long serialVersionUID = 1L;
 
 public final class ReadTransactionRequest extends AbstractReadTransactionRequest<ReadTransactionRequest> {
     private static final long serialVersionUID = 1L;
 
-    public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo,
-            final @Nonnull YangInstanceIdentifier path) {
-        super(identifier, replyTo, path);
+    public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
+            final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
+        super(identifier, sequence, replyTo, path);
     }
 
     private ReadTransactionRequest(final ReadTransactionRequest request, final ABIVersion version) {
     }
 
     private ReadTransactionRequest(final ReadTransactionRequest request, final ABIVersion version) {
index ae0f6f6470d34f78b712bbb08d023850f2c60d33..0c60b2b015380afb9d2d345b835e6a5baac9044b 100644 (file)
@@ -29,8 +29,8 @@ final class ReadTransactionRequestProxyV1 extends AbstractReadTransactionRequest
     }
 
     @Override
     }
 
     @Override
-    ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo,
-            final YangInstanceIdentifier path) {
-        return new ReadTransactionRequest(target, replyTo, path);
+    ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo, final YangInstanceIdentifier path) {
+        return new ReadTransactionRequest(target, sequence, replyTo, path);
     }
 }
\ No newline at end of file
     }
 }
\ No newline at end of file
index 45e7631580bf0d7d89d1c3679a9ed55b8ea6717a..f3d8395d6f952b51c69b16a78ca32f9078283b0b 100644 (file)
@@ -24,8 +24,9 @@ public final class ReadTransactionSuccess extends TransactionSuccess<ReadTransac
     private static final long serialVersionUID = 1L;
     private final Optional<NormalizedNode<?, ?>> data;
 
     private static final long serialVersionUID = 1L;
     private final Optional<NormalizedNode<?, ?>> data;
 
-    public ReadTransactionSuccess(final TransactionIdentifier identifier, final Optional<NormalizedNode<?, ?>> data) {
-        super(identifier);
+    public ReadTransactionSuccess(final TransactionIdentifier identifier, final long sequence,
+            final Optional<NormalizedNode<?, ?>> data) {
+        super(identifier, sequence);
         this.data = Preconditions.checkNotNull(data);
     }
 
         this.data = Preconditions.checkNotNull(data);
     }
 
index 923284278fec5f37f502f0ed1bf7268a27a599e3..ed45695d1078d768f7bd6e5c0d8e0cab36087e11 100644 (file)
@@ -63,7 +63,7 @@ final class ReadTransactionSuccessProxyV1 extends AbstractTransactionSuccessProx
     }
 
     @Override
     }
 
     @Override
-    protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target) {
-        return new ReadTransactionSuccess(target, data);
+    protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new ReadTransactionSuccess(target, sequence, data);
     }
 }
     }
 }
index a7f132a6096619b6d65d65cc120a0f420573a5b9..b8499cc2a25dacaccd126c568d37ef2aedfbfcee 100644 (file)
@@ -21,8 +21,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionAbortRequest extends TransactionRequest<TransactionAbortRequest> {
     private static final long serialVersionUID = 1L;
 
 public final class TransactionAbortRequest extends TransactionRequest<TransactionAbortRequest> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionAbortRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    public TransactionAbortRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     @Override
     }
 
     @Override
index bc1fc582f419db3afdd5bfe42d63277b714a691b..0743e3e24eeec95e27d1177450bec436a3610f47 100644 (file)
@@ -28,7 +28,8 @@ final class TransactionAbortRequestProxyV1 extends AbstractTransactionRequestPro
     }
 
     @Override
     }
 
     @Override
-    protected TransactionAbortRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        return new TransactionAbortRequest(target, replyTo);
+    protected TransactionAbortRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new TransactionAbortRequest(target, sequence, replyTo);
     }
 }
     }
 }
index c9625afa68e782b3c590d9defee639a33933cbe3..69c6dddd8f9c49f84801cbb0f938c71b256ec8d3 100644 (file)
@@ -19,8 +19,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionAbortSuccess extends TransactionSuccess<TransactionAbortSuccess> {
     private static final long serialVersionUID = 1L;
 
 public final class TransactionAbortSuccess extends TransactionSuccess<TransactionAbortSuccess> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionAbortSuccess(final TransactionIdentifier identifier) {
-        super(identifier);
+    public TransactionAbortSuccess(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
     }
 
     @Override
     }
 
     @Override
index 2c347371e33d492724ad874e395005e528123751..3cf513ae3ca5af95970736be8b9a04dc07d55517 100644 (file)
@@ -27,7 +27,7 @@ final class TransactionAbortSuccessProxyV1 extends AbstractTransactionSuccessPro
     }
 
     @Override
     }
 
     @Override
-    protected TransactionAbortSuccess createSuccess(final TransactionIdentifier target) {
-        return new TransactionAbortSuccess(target);
+    protected TransactionAbortSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new TransactionAbortSuccess(target, sequence);
     }
 }
     }
 }
index c7d417626a5c1494ba14dfcf4becb3c2f5ed36ed..4e689b2ace39db9712c5f182c96fd358fd1b7380 100644 (file)
@@ -19,8 +19,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionCanCommitSuccess extends TransactionSuccess<TransactionCanCommitSuccess> {
     private static final long serialVersionUID = 1L;
 
 public final class TransactionCanCommitSuccess extends TransactionSuccess<TransactionCanCommitSuccess> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionCanCommitSuccess(final TransactionIdentifier identifier) {
-        super(identifier);
+    public TransactionCanCommitSuccess(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
     }
 
     @Override
     }
 
     @Override
index a8af4af2c14ef5b16d88ad801c5dc3154299c565..b645d68885093f6f72577d74867ad8aff45ec29f 100644 (file)
@@ -40,7 +40,7 @@ final class TransactionCanCommitSuccessProxyV1 extends AbstractTransactionSucces
     }
 
     @Override
     }
 
     @Override
-    protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target) {
-        return new TransactionCanCommitSuccess(target);
+    protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new TransactionCanCommitSuccess(target, sequence);
     }
 }
     }
 }
index 275b5cf1e836224669557c076f23416299962dca..6b28244484e3eb739881696429e9119db14dc47e 100644 (file)
@@ -19,8 +19,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionCommitSuccess extends TransactionSuccess<TransactionCommitSuccess> {
     private static final long serialVersionUID = 1L;
 
 public final class TransactionCommitSuccess extends TransactionSuccess<TransactionCommitSuccess> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionCommitSuccess(final TransactionIdentifier identifier) {
-        super(identifier);
+    public TransactionCommitSuccess(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
     }
 
     @Override
     }
 
     @Override
index 4628a9d7b66612bb63f85f681eb55fc30c15ef59..aaf07c26e1301af7d69a343b97e5fafb39e36a7d 100644 (file)
@@ -27,7 +27,7 @@ final class TransactionCommitSuccessProxyV1 extends AbstractTransactionSuccessPr
     }
 
     @Override
     }
 
     @Override
-    protected TransactionCommitSuccess createSuccess(final TransactionIdentifier target) {
-        return new TransactionCommitSuccess(target);
+    protected TransactionCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new TransactionCommitSuccess(target, sequence);
     }
 }
     }
 }
index 6707aa199d81f7354888a138b4cfd7575df64efc..955c2680086db7553969857e0656238e37c179f8 100644 (file)
@@ -21,8 +21,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionDoCommitRequest extends TransactionRequest<TransactionDoCommitRequest> {
     private static final long serialVersionUID = 1L;
 
 public final class TransactionDoCommitRequest extends TransactionRequest<TransactionDoCommitRequest> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionDoCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    public TransactionDoCommitRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     @Override
     }
 
     @Override
index f7718446dcee2ad1226159c57459c298432aeb28..ce9ca9b0043e80e607544578958f44b248b8ee02 100644 (file)
@@ -28,7 +28,8 @@ final class TransactionDoCommitRequestProxyV1 extends AbstractTransactionRequest
     }
 
     @Override
     }
 
     @Override
-    protected TransactionDoCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        return new TransactionDoCommitRequest(target, replyTo);
+    protected TransactionDoCommitRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new TransactionDoCommitRequest(target, sequence, replyTo);
     }
 }
     }
 }
index 750d327e416d9f62e29c41ca7e12ec8cf11f299a..e0b6a5998795c271453d761edb158700a8e9a0b6 100644 (file)
@@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionFailure extends RequestFailure<TransactionIdentifier, TransactionFailure> {
     private static final long serialVersionUID = 1L;
 
 public final class TransactionFailure extends RequestFailure<TransactionIdentifier, TransactionFailure> {
     private static final long serialVersionUID = 1L;
 
-    TransactionFailure(final TransactionIdentifier target, final RequestException cause) {
-        super(target, cause);
+    TransactionFailure(final TransactionIdentifier target, final long sequence, final RequestException cause) {
+        super(target, sequence, cause);
     }
 
     @Override
     }
 
     @Override
index fd99749bed401d92d8fdcfdbbdeafede088fd67f..15cf09c5b7677b99ca2c6e1a9390e3895f4e41fb 100644 (file)
@@ -31,8 +31,9 @@ final class TransactionFailureProxyV1 extends AbstractRequestFailureProxy<Transa
     }
 
     @Override
     }
 
     @Override
-    protected TransactionFailure createFailure(final TransactionIdentifier target, final RequestException cause) {
-        return new TransactionFailure(target, cause);
+    protected TransactionFailure createFailure(final TransactionIdentifier target, final long sequence,
+            final RequestException cause) {
+        return new TransactionFailure(target, sequence, cause);
     }
 
     @Override
     }
 
     @Override
index 2e73f4777200ed6cee9bdcbc6ea44ebda57d6fc8..d79dbea6423254be0a54a999f3550389f0fab022 100644 (file)
@@ -21,8 +21,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionPreCommitRequest extends TransactionRequest<TransactionPreCommitRequest> {
     private static final long serialVersionUID = 1L;
 
 public final class TransactionPreCommitRequest extends TransactionRequest<TransactionPreCommitRequest> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionPreCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    public TransactionPreCommitRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     @Override
     }
 
     @Override
index dd41e6ab3a36944001ee48494236f87fde7742bf..649db56a57a4a00faf7fd882fd60f83bb340af32 100644 (file)
@@ -28,7 +28,8 @@ final class TransactionPreCommitRequestProxyV1 extends AbstractTransactionReques
     }
 
     @Override
     }
 
     @Override
-    protected TransactionPreCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        return new TransactionPreCommitRequest(target, replyTo);
+    protected TransactionPreCommitRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new TransactionPreCommitRequest(target, sequence, replyTo);
     }
 }
     }
 }
index 8a7da4e61b57eab350aa4871c9c441f182bdedd6..1cf00e668fb7f3811f7959c6ec5926021bedfeb1 100644 (file)
@@ -18,8 +18,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionPreCommitSuccess extends TransactionSuccess<TransactionPreCommitSuccess> {
     private static final long serialVersionUID = 1L;
 
 public final class TransactionPreCommitSuccess extends TransactionSuccess<TransactionPreCommitSuccess> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionPreCommitSuccess(final TransactionIdentifier identifier) {
-        super(identifier);
+    public TransactionPreCommitSuccess(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
     }
 
     @Override
     }
 
     @Override
index 2c0cdea17e2e8c11402ff9eed294858c285746f8..387a60c8e9a7ae8f75205829d309a4b6bf0a3d77 100644 (file)
@@ -27,7 +27,7 @@ final class TransactionPreCommitSuccessProxyV1 extends AbstractTransactionSucces
     }
 
     @Override
     }
 
     @Override
-    protected TransactionPreCommitSuccess createSuccess(final TransactionIdentifier target) {
-        return new TransactionPreCommitSuccess(target);
+    protected TransactionPreCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new TransactionPreCommitSuccess(target, sequence);
     }
 }
     }
 }
index 154d4b39e094fb557124b1cdbf5f956be0e2e736..7ae6b81e3ba9db52c9e2f24f79e3226b65eaad68 100644 (file)
@@ -26,8 +26,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public abstract class TransactionRequest<T extends TransactionRequest<T>> extends Request<TransactionIdentifier, T> {
     private static final long serialVersionUID = 1L;
 
 public abstract class TransactionRequest<T extends TransactionRequest<T>> extends Request<TransactionIdentifier, T> {
     private static final long serialVersionUID = 1L;
 
-    TransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) {
-        super(identifier, replyTo);
+    TransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo) {
+        super(identifier, sequence, replyTo);
     }
 
     TransactionRequest(final T request, final ABIVersion version) {
     }
 
     TransactionRequest(final T request, final ABIVersion version) {
@@ -36,7 +36,7 @@ public abstract class TransactionRequest<T extends TransactionRequest<T>> extend
 
     @Override
     public final TransactionFailure toRequestFailure(final RequestException cause) {
 
     @Override
     public final TransactionFailure toRequestFailure(final RequestException cause) {
-        return new TransactionFailure(getTarget(), cause);
+        return new TransactionFailure(getTarget(), getSequence(), cause);
     }
 
     @Override
     }
 
     @Override
index daa4ba1b8b7a3fd73249d19eb5775c835b21a9aa..77a6b56d1e44839f1e9a4c1f2d2825439a7f9894 100644 (file)
@@ -24,8 +24,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public abstract class TransactionSuccess<T extends TransactionSuccess<T>> extends RequestSuccess<TransactionIdentifier, T> {
     private static final long serialVersionUID = 1L;
 
 public abstract class TransactionSuccess<T extends TransactionSuccess<T>> extends RequestSuccess<TransactionIdentifier, T> {
     private static final long serialVersionUID = 1L;
 
-    TransactionSuccess(final TransactionIdentifier identifier) {
-        super(identifier);
+    TransactionSuccess(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
     }
 
     @Override
     }
 
     @Override
index 89458b8a07df2a698bcf6278492ac33396c4a7c2..69fc29305c2dbacad7e9e432a80d14ce77fb5bf8 100644 (file)
@@ -15,9 +15,10 @@ import org.opendaylight.yangtools.concepts.WritableObjects;
 
 abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externalizable {
     private static final long serialVersionUID = 1L;
 
 abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externalizable {
     private static final long serialVersionUID = 1L;
+
     private T message;
     private T message;
-    private long sequence;
-    private long retry;
+    private long sessionId;
+    private long txSequence;
 
     public AbstractEnvelopeProxy() {
         // for Externalizable
 
     public AbstractEnvelopeProxy() {
         // for Externalizable
@@ -25,13 +26,13 @@ abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externa
 
     AbstractEnvelopeProxy(final Envelope<T> envelope) {
         message = envelope.getMessage();
 
     AbstractEnvelopeProxy(final Envelope<T> envelope) {
         message = envelope.getMessage();
-        sequence = envelope.getSequence();
-        retry = envelope.getRetry();
+        txSequence = envelope.getTxSequence();
+        sessionId = envelope.getSessionId();
     }
 
     @Override
     public final void writeExternal(final ObjectOutput out) throws IOException {
     }
 
     @Override
     public final void writeExternal(final ObjectOutput out) throws IOException {
-        WritableObjects.writeLongs(out, sequence, retry);
+        WritableObjects.writeLongs(out, sessionId, txSequence);
         out.writeObject(message);
     }
 
         out.writeObject(message);
     }
 
@@ -39,14 +40,14 @@ abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externa
     @Override
     public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         final byte header = WritableObjects.readLongHeader(in);
     @Override
     public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         final byte header = WritableObjects.readLongHeader(in);
-        sequence = WritableObjects.readFirstLong(in, header);
-        retry = WritableObjects.readSecondLong(in, header);
+        sessionId = WritableObjects.readFirstLong(in, header);
+        txSequence = WritableObjects.readSecondLong(in, header);
         message = (T) in.readObject();
     }
 
         message = (T) in.readObject();
     }
 
-    abstract Envelope<T> createEnvelope(T message, long sequence, long retry);
+    abstract Envelope<T> createEnvelope(T message, long sessionId, long txSequence);
 
     final Object readResolve() {
 
     final Object readResolve() {
-        return createEnvelope(message, sequence, retry);
+        return createEnvelope(message, sessionId, txSequence);
     }
 }
\ No newline at end of file
     }
 }
\ No newline at end of file
index 4b60aecefa7e01d146a9c3afb9387d5a564a66c8..48ad0d39bbc333e8f6f84f63c611c3387d84213d 100644 (file)
@@ -15,6 +15,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import javax.annotation.Nonnull;
 import org.opendaylight.yangtools.concepts.WritableIdentifier;
 import java.io.ObjectOutput;
 import javax.annotation.Nonnull;
 import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
 
 /**
  * Abstract Externalizable proxy for use with {@link Message} subclasses.
 
 /**
  * Abstract Externalizable proxy for use with {@link Message} subclasses.
@@ -27,6 +28,7 @@ import org.opendaylight.yangtools.concepts.WritableIdentifier;
 abstract class AbstractMessageProxy<T extends WritableIdentifier, C extends Message<T, C>> implements Externalizable {
     private static final long serialVersionUID = 1L;
     private T target;
 abstract class AbstractMessageProxy<T extends WritableIdentifier, C extends Message<T, C>> implements Externalizable {
     private static final long serialVersionUID = 1L;
     private T target;
+    private long sequence;
 
     protected AbstractMessageProxy() {
         // For Externalizable
 
     protected AbstractMessageProxy() {
         // For Externalizable
@@ -34,22 +36,25 @@ abstract class AbstractMessageProxy<T extends WritableIdentifier, C extends Mess
 
     AbstractMessageProxy(final @Nonnull C message) {
         this.target = message.getTarget();
 
     AbstractMessageProxy(final @Nonnull C message) {
         this.target = message.getTarget();
+        this.sequence = message.getSequence();
     }
 
     @Override
     public void writeExternal(final ObjectOutput out) throws IOException {
         target.writeTo(out);
     }
 
     @Override
     public void writeExternal(final ObjectOutput out) throws IOException {
         target.writeTo(out);
+        WritableObjects.writeLong(out, sequence);
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         target = Verify.verifyNotNull(readTarget(in));
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         target = Verify.verifyNotNull(readTarget(in));
+        sequence = WritableObjects.readLong(in);
     }
 
     protected final Object readResolve() {
     }
 
     protected final Object readResolve() {
-        return Verify.verifyNotNull(createMessage(target));
+        return Verify.verifyNotNull(createMessage(target, sequence));
     }
 
     protected abstract @Nonnull T readTarget(@Nonnull DataInput in) throws IOException;
     }
 
     protected abstract @Nonnull T readTarget(@Nonnull DataInput in) throws IOException;
-    abstract @Nonnull C createMessage(@Nonnull T target);
+    abstract @Nonnull C createMessage(@Nonnull T target, long sequence);
 }
\ No newline at end of file
 }
\ No newline at end of file
index 97a5f05cd1631904fbcbad6a5fc8e4077dfb5771..a7624a4a6335c8eed661d6190723e6893cd1836e 100644 (file)
@@ -49,9 +49,9 @@ public abstract class AbstractRequestFailureProxy<T extends WritableIdentifier,
     }
 
     @Override
     }
 
     @Override
-    final C createResponse(final T target) {
-        return createFailure(target, cause);
+    final C createResponse(final T target, final long sequence) {
+        return createFailure(target, sequence, cause);
     }
 
     }
 
-    protected abstract @Nonnull C createFailure(@Nonnull T target, @Nonnull RequestException cause);
+    protected abstract @Nonnull C createFailure(@Nonnull T target, long sequence, @Nonnull RequestException cause);
 }
\ No newline at end of file
 }
\ No newline at end of file
index c0bf959dfc294309f5476cd79b0145e091fd4abf..f55b85b85945b8906243f5b62e5d2a0721372e4c 100644 (file)
@@ -42,19 +42,19 @@ public abstract class AbstractRequestProxy<T extends WritableIdentifier, C exten
     @Override
     public void writeExternal(final ObjectOutput out) throws IOException {
         super.writeExternal(out);
     @Override
     public void writeExternal(final ObjectOutput out) throws IOException {
         super.writeExternal(out);
-        out.writeUTF(Serialization.serializedActorPath(replyTo));
+        out.writeObject(Serialization.serializedActorPath(replyTo));
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
-        replyTo = JavaSerializer.currentSystem().value().provider().resolveActorRef(in.readUTF());
+        replyTo = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
     }
 
     @Override
     }
 
     @Override
-    final @Nonnull C createMessage(@Nonnull final T target) {
-        return createRequest(target, replyTo);
+    final @Nonnull C createMessage(@Nonnull final T target, final long sequence) {
+        return createRequest(target, sequence, replyTo);
     }
 
     }
 
-    protected abstract @Nonnull C createRequest(@Nonnull T target, @Nonnull ActorRef replyTo);
+    protected abstract @Nonnull C createRequest(@Nonnull T target, long sequence, @Nonnull ActorRef replyTo);
 }
\ No newline at end of file
 }
\ No newline at end of file
index 8262bb29f96db8a4ecb1c9b2a80dcc1123e81bd9..351a6f882b3dd0e03ddc92ef458e0ff03198962a 100644 (file)
@@ -31,9 +31,9 @@ abstract class AbstractResponseProxy<T extends WritableIdentifier, C extends Res
     }
 
     @Override
     }
 
     @Override
-    final C createMessage(final T target) {
-        return createResponse(target);
+    final C createMessage(final T target, final long sequence) {
+        return createResponse(target, sequence);
     }
 
     }
 
-    abstract @Nonnull C createResponse(@Nonnull T target);
+    abstract @Nonnull C createResponse(@Nonnull T target, long sequence);
 }
 }
index 639b20c4f7b733684aeee08501b36e79091eca87..f1786e07f94b11a0eefeb4569876d55dc5b61eaa 100644 (file)
@@ -33,9 +33,9 @@ public abstract class AbstractSuccessProxy<T extends WritableIdentifier, C exten
     }
 
     @Override
     }
 
     @Override
-    final C createResponse(final T target) {
-        return createSuccess(target);
+    final C createResponse(final T target, final long sequence) {
+        return createSuccess(target, sequence);
     }
 
     }
 
-    protected abstract @Nonnull C createSuccess(@Nonnull T target);
+    protected abstract @Nonnull C createSuccess(@Nonnull T target, long sequence);
 }
\ No newline at end of file
 }
\ No newline at end of file
index cd674b48569e109ba1bd60be9dd386641e770820..5f2d15d60797b184d978dc0620aa5ccf467446d1 100644 (file)
@@ -16,13 +16,13 @@ public abstract class Envelope<T extends Message<?, ?>> implements Immutable, Se
     private static final long serialVersionUID = 1L;
 
     private final T message;
     private static final long serialVersionUID = 1L;
 
     private final T message;
-    private final long sequence;
-    private final long retry;
+    private final long txSequence;
+    private final long sessionId;
 
 
-    Envelope(final T message, final long sequence, final long retry) {
+    Envelope(final T message, final long sessionId, final long txSequence) {
         this.message = Preconditions.checkNotNull(message);
         this.message = Preconditions.checkNotNull(message);
-        this.sequence = sequence;
-        this.retry = retry;
+        this.sessionId = sessionId;
+        this.txSequence = txSequence;
     }
 
     /**
     }
 
     /**
@@ -35,27 +35,27 @@ public abstract class Envelope<T extends Message<?, ?>> implements Immutable, Se
     }
 
     /**
     }
 
     /**
-     * Get the message sequence of this envelope.
+     * Get the message transmission sequence of this envelope.
      *
      * @return Message sequence
      */
      *
      * @return Message sequence
      */
-    public long getSequence() {
-        return sequence;
+    public long getTxSequence() {
+        return txSequence;
     }
 
     /**
     }
 
     /**
-     * Get the message retry counter.
+     * Get the session identifier.
      *
      *
-     * @return Retry counter
+     * @return Session identifier
      */
      */
-    public long getRetry() {
-        return retry;
+    public long getSessionId() {
+        return sessionId;
     }
 
     @Override
     public String toString() {
     }
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(Envelope.class).add("sequence", Long.toUnsignedString(sequence, 16)).
-                add("retry", retry).add("message", message).toString();
+        return MoreObjects.toStringHelper(Envelope.class).add("sessionId", Long.toHexString(sessionId))
+                .add("txSequence", Long.toHexString(txSequence)).add("message", message).toString();
     }
 
     final Object writeReplace() {
     }
 
     final Object writeReplace() {
index 8da54f5954acc78757672de70241d3181155f8c2..6c32ae25541de11bbb4416eb36bd7bc928c3d045 100644 (file)
@@ -10,8 +10,8 @@ package org.opendaylight.controller.cluster.access.concepts;
 public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
     private static final long serialVersionUID = 1L;
 
 public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
     private static final long serialVersionUID = 1L;
 
-    public FailureEnvelope(final RequestFailure<?, ?> message, final long sequence, final long retry) {
-        super(message, sequence, retry);
+    public FailureEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
+        super(message, sessionId, txSequence);
     }
 
     @Override
     }
 
     @Override
index a14e69875f0a1508d8953fb1b460c2a8fc5e66dd..892b44d33bd70f4b8d8a5608c1534bd864d9d700 100644 (file)
@@ -19,7 +19,7 @@ final class FailureEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestFa
     }
 
     @Override
     }
 
     @Override
-    FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sequence, final long retry) {
-        return new FailureEnvelope(message, sequence, retry);
+    FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
+        return new FailureEnvelope(message, sessionId, txSequence);
     }
 }
     }
 }
index 5070b7cf71ef518b6387415f78b3d58bd40b897e..30e631eed7b0e9f46ba5e5fd2a72e985d9981233 100644 (file)
@@ -50,20 +50,23 @@ import org.opendaylight.yangtools.concepts.WritableIdentifier;
 public abstract class Message<T extends WritableIdentifier, C extends Message<T, C>> implements Immutable,
         Serializable {
     private static final long serialVersionUID = 1L;
 public abstract class Message<T extends WritableIdentifier, C extends Message<T, C>> implements Immutable,
         Serializable {
     private static final long serialVersionUID = 1L;
-    private final T target;
+
     private final ABIVersion version;
     private final ABIVersion version;
+    private final long sequence;
+    private final T target;
 
 
-    private Message(final ABIVersion version, final T target) {
+    private Message(final ABIVersion version, final T target, final long sequence) {
         this.target = Preconditions.checkNotNull(target);
         this.version = Preconditions.checkNotNull(version);
         this.target = Preconditions.checkNotNull(target);
         this.version = Preconditions.checkNotNull(version);
+        this.sequence = sequence;
     }
 
     }
 
-    Message(final T target) {
-        this(ABIVersion.current(), target);
+    Message(final T target, final long sequence) {
+        this(ABIVersion.current(), target, sequence);
     }
 
     Message(final C msg, final ABIVersion version) {
     }
 
     Message(final C msg, final ABIVersion version) {
-        this(version, msg.getTarget());
+        this(version, msg.getTarget(), msg.getSequence());
     }
 
     /**
     }
 
     /**
@@ -75,8 +78,17 @@ public abstract class Message<T extends WritableIdentifier, C extends Message<T,
         return target;
     }
 
         return target;
     }
 
+    /**
+     * Get the logical sequence number.
+     *
+     * @return logical sequence number
+     */
+    public final long getSequence() {
+        return sequence;
+    }
+
     @VisibleForTesting
     @VisibleForTesting
-    public final ABIVersion getVersion() {
+    public final @Nonnull ABIVersion getVersion() {
         return version;
     }
 
         return version;
     }
 
@@ -129,7 +141,7 @@ public abstract class Message<T extends WritableIdentifier, C extends Message<T,
      * @throws NullPointerException if toStringHelper is null
      */
     protected @Nonnull ToStringHelper addToStringAttributes(final @Nonnull ToStringHelper toStringHelper) {
      * @throws NullPointerException if toStringHelper is null
      */
     protected @Nonnull ToStringHelper addToStringAttributes(final @Nonnull ToStringHelper toStringHelper) {
-        return toStringHelper.add("target", target);
+        return toStringHelper.add("target", target).add("sequence", Long.toUnsignedString(sequence));
     }
 
     /**
     }
 
     /**
index 08923688939821cdf0b519bc924fcf9b3a95ff07..2f5cb4f4becaa84e1fd6677d93dd7d2375d27442 100644 (file)
@@ -29,8 +29,8 @@ public abstract class Request<T extends WritableIdentifier, C extends Request<T,
     private static final long serialVersionUID = 1L;
     private final ActorRef replyTo;
 
     private static final long serialVersionUID = 1L;
     private final ActorRef replyTo;
 
-    protected Request(final @Nonnull T target, final @Nonnull ActorRef replyTo) {
-        super(target);
+    protected Request(final @Nonnull T target, final long sequence, final @Nonnull ActorRef replyTo) {
+        super(target, sequence);
         this.replyTo = Preconditions.checkNotNull(replyTo);
     }
 
         this.replyTo = Preconditions.checkNotNull(replyTo);
     }
 
index 263664bd06e839147628965b8ec903e362ee5fd6..1c6e72c59a6596ea498cad2dbc432fd63934101b 100644 (file)
@@ -12,8 +12,8 @@ import akka.actor.ActorRef;
 public final class RequestEnvelope extends Envelope<Request<?, ?>> {
     private static final long serialVersionUID = 1L;
 
 public final class RequestEnvelope extends Envelope<Request<?, ?>> {
     private static final long serialVersionUID = 1L;
 
-    public RequestEnvelope(final Request<?, ?> message, final long sequence, final long retry) {
-        super(message, sequence, retry);
+    public RequestEnvelope(final Request<?, ?> message, final long sessionId, final long txSequence) {
+        super(message, sessionId, txSequence);
     }
 
     @Override
     }
 
     @Override
@@ -28,7 +28,7 @@ public final class RequestEnvelope extends Envelope<Request<?, ?>> {
      * @throws NullPointerException if cause is null
      */
     public void sendFailure(final RequestException cause) {
      * @throws NullPointerException if cause is null
      */
     public void sendFailure(final RequestException cause) {
-        sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSequence(), getRetry()));
+        sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence()));
     }
 
     /**
     }
 
     /**
@@ -38,7 +38,7 @@ public final class RequestEnvelope extends Envelope<Request<?, ?>> {
      * @throws NullPointerException if success is null
      */
     public void sendSuccess(final RequestSuccess<?, ?> success) {
      * @throws NullPointerException if success is null
      */
     public void sendSuccess(final RequestSuccess<?, ?> success) {
-        sendResponse(new SuccessEnvelope(success, getSequence(), getRetry()));
+        sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence()));
     }
 
     private void sendResponse(final ResponseEnvelope<?> envelope) {
     }
 
     private void sendResponse(final ResponseEnvelope<?> envelope) {
index 1b499d003ff5385d3db628ed116672e630a947ff..8ab450d8d34adb950824dd3e11c058e6e22d6ec1 100644 (file)
@@ -19,7 +19,7 @@ final class RequestEnvelopeProxy extends AbstractEnvelopeProxy<Request<?, ?>> {
     }
 
     @Override
     }
 
     @Override
-    RequestEnvelope createEnvelope(final Request<?, ?> message, final long sequence, final long retry) {
-        return new RequestEnvelope(message, sequence, retry);
+    RequestEnvelope createEnvelope(final Request<?, ?> message, final long sessionId, final long txSequence) {
+        return new RequestEnvelope(message, sessionId, txSequence);
     }
 }
     }
 }
index 301b54ee1ffb7a8c241ab6d26fc71630e7e80a8a..ecd2ff495542a78699948167fbe48db176e07c29 100644 (file)
@@ -32,8 +32,8 @@ public abstract class RequestFailure<T extends WritableIdentifier, C extends Req
         this.cause = Preconditions.checkNotNull(failure.getCause());
     }
 
         this.cause = Preconditions.checkNotNull(failure.getCause());
     }
 
-    protected RequestFailure(final @Nonnull T target, final @Nonnull RequestException cause) {
-        super(target);
+    protected RequestFailure(final @Nonnull T target, final long sequence, final @Nonnull RequestException cause) {
+        super(target, sequence);
         this.cause = Preconditions.checkNotNull(cause);
     }
 
         this.cause = Preconditions.checkNotNull(cause);
     }
 
index 6ed4598e24f6eb7f10ad4559a0220db08cc167dd..ad9402f93825af3ff8d271d6285600ddb1ca3756 100644 (file)
@@ -28,8 +28,8 @@ public abstract class RequestSuccess<T extends WritableIdentifier, C extends Req
         super(success, version);
     }
 
         super(success, version);
     }
 
-    protected RequestSuccess(final @Nonnull T target) {
-        super(target);
+    protected RequestSuccess(final @Nonnull T target, final long sequence) {
+        super(target, sequence);
     }
 
     @Override
     }
 
     @Override
index 12845256c8ef98a3787e3821e9dc6656002de5e1..c520da19b70e5f8d9b628d05f5dd209f97d49c71 100644 (file)
@@ -26,8 +26,8 @@ import org.opendaylight.yangtools.concepts.WritableIdentifier;
 public abstract class Response<T extends WritableIdentifier, C extends Response<T, C>> extends Message<T, C> {
     private static final long serialVersionUID = 1L;
 
 public abstract class Response<T extends WritableIdentifier, C extends Response<T, C>> extends Message<T, C> {
     private static final long serialVersionUID = 1L;
 
-    Response(final @Nonnull T target) {
-        super(target);
+    Response(final @Nonnull T target, final long sequence) {
+        super(target, sequence);
     }
 
     Response(final @Nonnull C response, final @Nonnull ABIVersion version) {
     }
 
     Response(final @Nonnull C response, final @Nonnull ABIVersion version) {
index a3625f665a218998fb3229c90e727b834f663e28..9f998e7fac49b81d71e97487ed306c7205a0dda8 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.access.concepts;
 public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
     private static final long serialVersionUID = 1L;
 
 public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
     private static final long serialVersionUID = 1L;
 
-    ResponseEnvelope(final T message, final long sequence, final long retry) {
-        super(message, sequence, retry);
+    ResponseEnvelope(final T message, final long sessionId, final long txSequence) {
+        super(message, sessionId, txSequence);
     }
 }
     }
 }
index be37dae1109b5cace90c6dde458fd38975d5cfe2..d98e257ce0ac57715120e6c8c87272b4b7e57cfb 100644 (file)
@@ -10,8 +10,8 @@ package org.opendaylight.controller.cluster.access.concepts;
 public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
     private static final long serialVersionUID = 1L;
 
 public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
     private static final long serialVersionUID = 1L;
 
-    public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sequence, final long retry) {
-        super(message, sequence, retry);
+    public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
+        super(message, sessionId, txSequence);
     }
 
     @Override
     }
 
     @Override
index 9f0fc2f325c7ca2b86ae20f8cfe7d61bf3dcb8ee..50df24771f1a533c06abbe1bcaf21c2a7c11d334 100644 (file)
@@ -19,7 +19,7 @@ final class SuccessEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestSu
     }
 
     @Override
     }
 
     @Override
-    SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sequence, final long retry) {
-        return new SuccessEnvelope(message, sequence, retry);
+    SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
+        return new SuccessEnvelope(message, sessionId, txSequence);
     }
 }
     }
 }
index efdfa04b1613feec35894f9899a13445c206ea40..bdffb08c3eb81bf576d3420520db7ef814942013 100644 (file)
@@ -26,10 +26,15 @@ import org.opendaylight.controller.cluster.access.ABIVersion;
 public class BackendInfo {
     private final ABIVersion version;
     private final ActorRef actor;
 public class BackendInfo {
     private final ABIVersion version;
     private final ActorRef actor;
+    private final int maxMessages;
+    private final long sessionId;
 
 
-    protected BackendInfo(final ActorRef actor, final ABIVersion version) {
+    protected BackendInfo(final ActorRef actor, final long sessionId, final ABIVersion version, final int maxMessages) {
         this.version = Preconditions.checkNotNull(version);
         this.actor = Preconditions.checkNotNull(actor);
         this.version = Preconditions.checkNotNull(version);
         this.actor = Preconditions.checkNotNull(actor);
+        Preconditions.checkArgument(maxMessages > 0, "Maximum messages has to be positive, not %s", maxMessages);
+        this.maxMessages = maxMessages;
+        this.sessionId = sessionId;
     }
 
     public final ActorRef getActor() {
     }
 
     public final ActorRef getActor() {
@@ -40,6 +45,14 @@ public class BackendInfo {
         return version;
     }
 
         return version;
     }
 
+    public final int getMaxMessages() {
+        return maxMessages;
+    }
+
+    public final long getSessionId() {
+        return sessionId;
+    }
+
     @Override
     public final int hashCode() {
         return super.hashCode();
     @Override
     public final int hashCode() {
         return super.hashCode();
@@ -56,6 +69,7 @@ public class BackendInfo {
     }
 
     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
     }
 
     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-        return toStringHelper.add("actor", actor).add("version", version);
+        return toStringHelper.add("actor", actor).add("sessionId", sessionId).add("version", version)
+                .add("maxMessages", maxMessages);
     }
 }
     }
 }
index 3cdda596ade0739a679add4b55c1ade989d3aec3..8939ec977e16dabcc6f9507950aa07a3d95c3d4c 100644 (file)
@@ -83,13 +83,12 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
     }
 
     // This method is executing in the actor context, hence we can safely interact with the queue
     }
 
     // This method is executing in the actor context, hence we can safely interact with the queue
-    private ClientActorBehavior doSendRequest(final long sequence, final TransactionRequest<?> request,
-            final RequestCallback callback) {
+    private ClientActorBehavior doSendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
         // Get or allocate queue for the request
         final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
 
         // Note this is a tri-state return and can be null
         // Get or allocate queue for the request
         final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
 
         // Note this is a tri-state return and can be null
-        final Optional<FiniteDuration> result = queue.enqueueRequest(sequence, request, callback);
+        final Optional<FiniteDuration> result = queue.enqueueRequest(request, callback);
         if (result == null) {
             // Happy path: we are done here
             return this;
         if (result == null) {
             // Happy path: we are done here
             return this;
@@ -189,7 +188,7 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
      * @param request Request to send
      * @param callback Callback to invoke
      */
      * @param request Request to send
      * @param callback Callback to invoke
      */
-    public final void sendRequest(final long sequence, final TransactionRequest<?> request, final RequestCallback callback) {
-        context().executeInActor(cb -> cb.doSendRequest(sequence, request, callback));
+    public final void sendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
+        context().executeInActor(cb -> cb.doSendRequest(request, callback));
     }
 }
     }
 }
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java
new file mode 100644 (file)
index 0000000..31d863a
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import java.util.AbstractQueue;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Queue;
+
+/**
+ * A specialized always-empty implementation of {@link java.util.Queue}. This implementation will always refuse new
+ * elements in its {@link #offer(Object)} method.
+
+ * @author Robert Varga
+ *
+ * @param <E> the type of elements held in this collection
+ */
+// TODO: move this class into yangtools.util
+final class EmptyQueue<T> extends AbstractQueue<T> {
+    private static final EmptyQueue<?> INSTANCE = new EmptyQueue<>();
+
+    private EmptyQueue() {
+        // No instances
+    }
+
+    @SuppressWarnings("unchecked")
+    static <T> Queue<T> getInstance() {
+        return (Queue<T>) INSTANCE;
+    }
+
+    @Override
+    public boolean offer(final T e) {
+        return false;
+    }
+
+    @Override
+    public T poll() {
+        return null;
+    }
+
+    @Override
+    public T peek() {
+        return null;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        return Collections.emptyIterator();
+    }
+
+    @Override
+    public int size() {
+        return 0;
+    }
+}
\ No newline at end of file
index 513a3f936b9ddabac4131d519f82ccf3461e89d3..5cf7873a4b95efadc64ba335dc3004aea9333bb4 100644 (file)
@@ -10,16 +10,18 @@ package org.opendaylight.controller.cluster.access.client;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
-import java.util.Deque;
+import com.google.common.base.Verify;
+import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Optional;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,18 +44,32 @@ final class SequencedQueue {
         TimeUnit.NANOSECONDS);
 
     /**
         TimeUnit.NANOSECONDS);
 
     /**
-     * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can
-     * progress at different speeds, these may be completed out of order.
-     *
-     * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to
-     *       figure out a more efficient lookup mechanism to deal with responses which do not match the queue
-     *       order.
+     * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
+     * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
+     * resolution.
      */
      */
-    private final Deque<SequencedQueueEntry> queue = new LinkedList<>();
+    private static final int DEFAULT_TX_LIMIT = 1000;
+
     private final Ticker ticker;
     private final Long cookie;
 
     private final Ticker ticker;
     private final Long cookie;
 
-    // Updated/consulted from actor context only
+    /*
+     * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
+     * with the limit changing between reconnects (which imply retransmission).
+     *
+     * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
+     * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
+     * and one for requests which have not been transmitted at all.
+     *
+     * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
+     * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
+     * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
+     * to retransmit -- hence the second queue.
+     */
+    private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
+    private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
+    private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
+
     /**
      * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
      * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
     /**
      * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
      * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
@@ -62,6 +78,11 @@ final class SequencedQueue {
     private CompletionStage<? extends BackendInfo> backendProof;
     private BackendInfo backend;
 
     private CompletionStage<? extends BackendInfo> backendProof;
     private BackendInfo backend;
 
+    // This is not final because we need to be able to replace it.
+    private long txSequence;
+
+    private int lastTxLimit = DEFAULT_TX_LIMIT;
+
     /**
      * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
      */
     /**
      * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
      */
@@ -86,12 +107,16 @@ final class SequencedQueue {
         Preconditions.checkState(notClosed, "Queue %s is closed", this);
     }
 
         Preconditions.checkState(notClosed, "Queue %s is closed", this);
     }
 
+    private long nextTxSequence() {
+        return txSequence++;
+    }
+
     /**
      * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
      * the following scenarios:
      * 1) The request has been enqueued and transmitted. No further actions are necessary
      * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
     /**
      * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
      * the following scenarios:
      * 1) The request has been enqueued and transmitted. No further actions are necessary
      * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
-     * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that
+     * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
      *    process needs to complete before transmission occurs
      *
      * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
      *    process needs to complete before transmission occurs
      *
      * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
@@ -105,21 +130,32 @@ final class SequencedQueue {
      * @param callback Callback to be invoked
      * @return Optional duration with semantics described above.
      */
      * @param callback Callback to be invoked
      * @return Optional duration with semantics described above.
      */
-    @Nullable Optional<FiniteDuration> enqueueRequest(final long sequence, final Request<?, ?> request,
-            final RequestCallback callback) {
+    @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
         checkNotClosed();
 
         final long now = ticker.read();
         checkNotClosed();
 
         final long now = ticker.read();
-        final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now);
-
-        queue.add(e);
-        LOG.debug("Enqueued request {} to queue {}", request, this);
-
+        final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
         if (backend == null) {
         if (backend == null) {
+            LOG.debug("No backend available, request resolution");
+            pending.add(e);
             return Optional.empty();
         }
             return Optional.empty();
         }
+        if (!lastInflight.isEmpty()) {
+            LOG.debug("Retransmit not yet complete, delaying request {}", request);
+            pending.add(e);
+            return null;
+        }
+        if (currentInflight.size() >= lastTxLimit) {
+            LOG.debug("Queue is at capacity, delayed sending of request {}", request);
+            pending.add(e);
+            return null;
+        }
 
 
-        e.retransmit(backend, now);
+        // Ready to transmit
+        currentInflight.offer(e);
+        LOG.debug("Enqueued request {} to queue {}", request, this);
+
+        e.retransmit(backend, nextTxSequence(), now);
         if (expectingTimer == null) {
             expectingTimer = now + REQUEST_TIMEOUT_NANOS;
             return Optional.of(INITIAL_REQUEST_TIMEOUT);
         if (expectingTimer == null) {
             expectingTimer = now + REQUEST_TIMEOUT_NANOS;
             return Optional.of(INITIAL_REQUEST_TIMEOUT);
@@ -128,53 +164,162 @@ final class SequencedQueue {
         }
     }
 
         }
     }
 
-    ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
-        // Responses to different targets may arrive out of order, hence we use an iterator
+    /*
+     * We are using tri-state return here to indicate one of three conditions:
+     * - if a matching entry is found, return an Optional containing it
+     * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
+     * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
+     */
+    private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
+            final ResponseEnvelope<?> envelope) {
+        // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
+        // to use an iterator
         final Iterator<SequencedQueueEntry> it = queue.iterator();
         while (it.hasNext()) {
             final SequencedQueueEntry e = it.next();
         final Iterator<SequencedQueueEntry> it = queue.iterator();
         while (it.hasNext()) {
             final SequencedQueueEntry e = it.next();
-            if (e.acceptsResponse(response)) {
-                lastProgress = ticker.read();
-                it.remove();
-                LOG.debug("Completing request {} with {}", e, response);
-                return e.complete(response.getMessage());
+            final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
+
+            final Request<?, ?> request = e.getRequest();
+            final Response<?, ?> response = envelope.getMessage();
+
+            // First check for matching target, or move to next entry
+            if (!request.getTarget().equals(response.getTarget())) {
+                continue;
+            }
+
+            // Sanity-check logical sequence, ignore any out-of-order messages
+            if (request.getSequence() != response.getSequence()) {
+                LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
+                return Optional.empty();
+            }
+
+            // Now check session match
+            if (envelope.getSessionId() != txDetails.getSessionId()) {
+                LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
+                return Optional.empty();
+            }
+            if (envelope.getTxSequence() != txDetails.getTxSequence()) {
+                LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
+                return Optional.empty();
             }
             }
+
+            LOG.debug("Completing request {} with {}", request, envelope);
+            it.remove();
+            return Optional.of(e);
         }
 
         }
 
-        LOG.debug("No request matching {} found", response);
-        return current;
+        return null;
+    }
+
+    ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
+        Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
+        if (maybeEntry == null) {
+            maybeEntry = findMatchingEntry(lastInflight, envelope);
+        }
+
+        if (maybeEntry == null || !maybeEntry.isPresent()) {
+            LOG.warn("No request matching {} found, ignoring response", envelope);
+            return current;
+        }
+
+        lastProgress = ticker.read();
+        final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
+
+        // We have freed up a slot, try to transmit something
+        if (backend != null) {
+            final int toSend = lastTxLimit - currentInflight.size();
+            if (toSend > 0) {
+                runTransmit(toSend);
+            }
+        }
+
+        return ret;
+    }
+
+    private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
+        int toSend = count;
+
+        while (toSend > 0) {
+            final SequencedQueueEntry e = queue.poll();
+            if (e == null) {
+                break;
+            }
+
+            LOG.debug("Transmitting entry {}", e);
+            e.retransmit(backend, nextTxSequence(), lastProgress);
+            toSend--;
+        }
+
+        return toSend;
+    }
+
+    private void runTransmit(final int count) {
+        final int toSend;
+
+        // Process lastInflight first, possibly clearing it
+        if (!lastInflight.isEmpty()) {
+            toSend = transmitEntries(lastInflight, count);
+            if (lastInflight.isEmpty()) {
+                // We won't be needing the queue anymore, change it to specialized implementation
+                lastInflight = EmptyQueue.getInstance();
+            }
+        } else {
+            toSend = count;
+        }
+
+        // Process pending next.
+        transmitEntries(pending, toSend);
     }
 
     Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
     }
 
     Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
+        Preconditions.checkNotNull(backend);
         if (!proof.equals(backendProof)) {
             LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
             return Optional.empty();
         }
 
         if (!proof.equals(backendProof)) {
             LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
             return Optional.empty();
         }
 
-        this.backend = Preconditions.checkNotNull(backend);
-        backendProof = null;
         LOG.debug("Resolved backend {}",  backend);
 
         LOG.debug("Resolved backend {}",  backend);
 
-        if (queue.isEmpty()) {
-            // No pending requests, hence no need for a timer
-            return Optional.empty();
+        // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
+        // and also not to exceed new limits
+        final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
+        newLast.addAll(currentInflight);
+        newLast.addAll(lastInflight);
+        lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
+
+        // Clear currentInflight, possibly compacting it
+        final int txLimit = backend.getMaxMessages();
+        if (lastTxLimit > txLimit) {
+            currentInflight = new ArrayDeque<>();
+        } else {
+            currentInflight.clear();
         }
 
         }
 
-        LOG.debug("Resending requests to backend {}", backend);
-        final long now = ticker.read();
-        for (SequencedQueueEntry e : queue) {
-            e.retransmit(backend, now);
-        }
+        // We are ready to roll
+        this.backend = backend;
+        backendProof = null;
+        txSequence = 0;
+        lastTxLimit = txLimit;
+        lastProgress = ticker.read();
 
 
-        if (expectingTimer != null) {
-            // We already have a timer going, no need to schedule a new one
+        // No pending requests, return
+        if (lastInflight.isEmpty() && pending.isEmpty()) {
             return Optional.empty();
         }
 
             return Optional.empty();
         }
 
-        // Above loop may have cost us some time. Recalculate timeout.
-        final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
-        expectingTimer = nextTicks;
-        return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS));
+        LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
+
+        runTransmit(lastTxLimit);
+
+        // Calculate next timer if necessary
+        if (expectingTimer == null) {
+            // Request transmission may have cost us some time. Recalculate timeout.
+            final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
+            expectingTimer = nextTicks;
+            return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
+        } else {
+            return Optional.empty();
+        }
     }
 
     boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
     }
 
     boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
@@ -189,7 +334,7 @@ final class SequencedQueue {
     }
 
     boolean hasCompleted() {
     }
 
     boolean hasCompleted() {
-        return !notClosed && queue.isEmpty();
+        return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
     }
 
     /**
     }
 
     /**
@@ -203,7 +348,7 @@ final class SequencedQueue {
         expectingTimer = null;
         final long now = ticker.read();
 
         expectingTimer = null;
         final long now = ticker.read();
 
-        if (!queue.isEmpty()) {
+        if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
             final long ticksSinceProgress = now - lastProgress;
             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
             final long ticksSinceProgress = now - lastProgress;
             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
@@ -216,7 +361,7 @@ final class SequencedQueue {
         }
 
         // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
         }
 
         // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
-        final SequencedQueueEntry head = queue.peek();
+        final SequencedQueueEntry head = currentInflight.peek();
         if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
             backend = null;
             LOG.debug("Queue {} invalidated backend info", this);
         if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
             backend = null;
             LOG.debug("Queue {} invalidated backend info", this);
@@ -226,14 +371,17 @@ final class SequencedQueue {
         }
     }
 
         }
     }
 
+    private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
+        queue.forEach(e -> e.poison(cause));
+        queue.clear();
+    }
+
     void poison(final RequestException cause) {
         close();
 
     void poison(final RequestException cause) {
         close();
 
-        SequencedQueueEntry e = queue.poll();
-        while (e != null) {
-            e.poison(cause);
-            e = queue.poll();
-        }
+        poisonQueue(currentInflight, cause);
+        poisonQueue(lastInflight, cause);
+        poisonQueue(pending, cause);
     }
 
     // FIXME: add a caller from ClientSingleTransaction
     }
 
     // FIXME: add a caller from ClientSingleTransaction
index 83e3e079137e7b61ec601640fa21257e6bedf60b..8814d50c54e8be64e7d784ba85d000d937a0729b 100644 (file)
@@ -10,12 +10,11 @@ package org.opendaylight.controller.cluster.access.client;
 import akka.actor.ActorRef;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import akka.actor.ActorRef;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
-import java.util.Optional;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,49 +22,31 @@ import org.slf4j.LoggerFactory;
  * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information.
  *
  * @author Robert Varga
  * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information.
  *
  * @author Robert Varga
- *
- * @param <I> Target identifier type
  */
 final class SequencedQueueEntry {
  */
 final class SequencedQueueEntry {
-    private static final class LastTry {
-        final long timeTicks;
-        final long retry;
-
-        LastTry(final long retry, final long timeTicks) {
-            this.retry = retry;
-            this.timeTicks = timeTicks;
-        }
-    }
-
     private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class);
 
     private final Request<?, ?> request;
     private final RequestCallback callback;
     private final long enqueuedTicks;
     private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class);
 
     private final Request<?, ?> request;
     private final RequestCallback callback;
     private final long enqueuedTicks;
-    private final long sequence;
 
 
-    private Optional<LastTry> lastTry = Optional.empty();
+    private TxDetails txDetails;
 
 
-    SequencedQueueEntry(final Request<?, ?> request, final long sequence, final RequestCallback callback,
+    SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback,
         final long now) {
         this.request = Preconditions.checkNotNull(request);
         this.callback = Preconditions.checkNotNull(callback);
         this.enqueuedTicks = now;
         final long now) {
         this.request = Preconditions.checkNotNull(request);
         this.callback = Preconditions.checkNotNull(callback);
         this.enqueuedTicks = now;
-        this.sequence = sequence;
     }
 
     }
 
-    long getSequence() {
-        return sequence;
+    Request<?, ?> getRequest() {
+        return request;
     }
 
     }
 
-    boolean acceptsResponse(final ResponseEnvelope<?> response) {
-        return getSequence() == response.getSequence() && request.getTarget().equals(response.getMessage().getTarget());
+    @Nullable TxDetails getTxDetails() {
+        return txDetails;
     }
 
     }
 
-    long getCurrentTry() {
-        return lastTry.isPresent() ? lastTry.get().retry : 0;
-     }
-
     ClientActorBehavior complete(final Response<?, ?> response) {
         LOG.debug("Completing request {} with {}", request, response);
         return callback.complete(response);
     ClientActorBehavior complete(final Response<?, ?> response) {
         LOG.debug("Completing request {} with {}", request, response);
         return callback.complete(response);
@@ -79,8 +60,8 @@ final class SequencedQueueEntry {
     boolean isTimedOut(final long now, final long timeoutNanos) {
         final long elapsed;
 
     boolean isTimedOut(final long now, final long timeoutNanos) {
         final long elapsed;
 
-        if (lastTry.isPresent()) {
-            elapsed = now - lastTry.get().timeTicks;
+        if (txDetails != null) {
+            elapsed = now - txDetails.getTimeTicks();
         } else {
             elapsed = now - enqueuedTicks;
         }
         } else {
             elapsed = now - enqueuedTicks;
         }
@@ -93,18 +74,19 @@ final class SequencedQueueEntry {
         }
     }
 
         }
     }
 
-    void retransmit(final BackendInfo backend, final long now) {
-        final long retry = lastTry.isPresent() ? lastTry.get().retry + 1 : 0;
-        final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), sequence, retry);
+    void retransmit(final BackendInfo backend, final long txSequence, final long now) {
+        final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()),
+            backend.getSessionId(), txSequence);
 
         final ActorRef actor = backend.getActor();
 
         final ActorRef actor = backend.getActor();
-        LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
+        LOG.trace("Transmitting request {} as {} to {}", request, toSend, actor);
         actor.tell(toSend, ActorRef.noSender());
         actor.tell(toSend, ActorRef.noSender());
-        lastTry = Optional.of(new LastTry(retry, now));
+        txDetails = new TxDetails(backend.getSessionId(), txSequence, now);
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString();
     }
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString();
     }
+
 }
 }
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TxDetails.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TxDetails.java
new file mode 100644 (file)
index 0000000..84d4b62
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+/**
+ * Holder class for transmission details about a particular {@link SequencedQueueEntry}.
+ *
+ * @author Robert Varga
+ */
+final class TxDetails {
+    private final long sessionId;
+    private final long txSequence;
+    private final long timeTicks;
+
+    TxDetails(final long sessionId, final long txSequence, final long timeTicks) {
+        this.sessionId = sessionId;
+        this.txSequence = txSequence;
+        this.timeTicks = timeTicks;
+    }
+
+    long getSessionId() {
+        return sessionId;
+    }
+
+    long getTxSequence() {
+        return txSequence;
+    }
+
+    long getTimeTicks() {
+        return timeTicks;
+    }
+}
\ No newline at end of file
index aecd238c6d10864c82d327d4a4aac850e1fee141..dca396648c599da0cdea7f1ff7a8b7b944090b7e 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.client;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -49,7 +50,7 @@ public class SequencedQueueEntryTest {
         private static final long serialVersionUID = 1L;
 
         MockFailure(final WritableIdentifier target, final RequestException cause) {
         private static final long serialVersionUID = 1L;
 
         MockFailure(final WritableIdentifier target, final RequestException cause) {
-            super(target, cause);
+            super(target, 0, cause);
         }
 
         @Override
         }
 
         @Override
@@ -67,7 +68,7 @@ public class SequencedQueueEntryTest {
         private static final long serialVersionUID = 1L;
 
         MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
         private static final long serialVersionUID = 1L;
 
         MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
-            super(target, replyTo);
+            super(target, 0, replyTo);
         }
 
         @Override
         }
 
         @Override
@@ -127,11 +128,11 @@ public class SequencedQueueEntryTest {
         ticker.increment(ThreadLocalRandom.current().nextLong());
 
         mockActor = TestProbe.apply(actorSystem);
         ticker.increment(ThreadLocalRandom.current().nextLong());
 
         mockActor = TestProbe.apply(actorSystem);
-        mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
+        mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
 
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
 
-        entry = new SequencedQueueEntry(mockRequest, 0, mockCallback, ticker.read());
+        entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read());
     }
 
     @After
     }
 
     @After
@@ -140,19 +141,14 @@ public class SequencedQueueEntryTest {
     }
 
     @Test
     }
 
     @Test
-    public void testGetSequence() {
-        assertEquals(0, entry.getSequence());
-    }
-
-    @Test
-    public void testGetCurrentTry() {
-        assertEquals(0, entry.getCurrentTry());
-        entry.retransmit(mockBackendInfo, ticker.read());
-        assertEquals(0, entry.getCurrentTry());
-        entry.retransmit(mockBackendInfo, ticker.read());
-        assertEquals(1, entry.getCurrentTry());
-        entry.retransmit(mockBackendInfo, ticker.read());
-        assertEquals(2, entry.getCurrentTry());
+    public void testGetTxDetails() {
+        assertNull(entry.getTxDetails());
+        entry.retransmit(mockBackendInfo, 0, ticker.read());
+        assertEquals(0, entry.getTxDetails().getTxSequence());
+        entry.retransmit(mockBackendInfo, 1, ticker.read());
+        assertEquals(1, entry.getTxDetails().getTxSequence());
+        entry.retransmit(mockBackendInfo, 3, ticker.read());
+        assertEquals(3, entry.getTxDetails().getTxSequence());
     }
 
     @Test
     }
 
     @Test
@@ -175,13 +171,13 @@ public class SequencedQueueEntryTest {
         assertTrue(entry.isTimedOut(ticker.read(), 0));
         assertFalse(entry.isTimedOut(ticker.read(), 1));
 
         assertTrue(entry.isTimedOut(ticker.read(), 0));
         assertFalse(entry.isTimedOut(ticker.read(), 1));
 
-        entry.retransmit(mockBackendInfo, ticker.read());
+        entry.retransmit(mockBackendInfo, 0, ticker.read());
         assertTrue(entry.isTimedOut(ticker.read(), 0));
         ticker.increment(10);
         assertTrue(entry.isTimedOut(ticker.read(), 10));
         assertFalse(entry.isTimedOut(ticker.read(), 20));
 
         assertTrue(entry.isTimedOut(ticker.read(), 0));
         ticker.increment(10);
         assertTrue(entry.isTimedOut(ticker.read(), 10));
         assertFalse(entry.isTimedOut(ticker.read(), 20));
 
-        entry.retransmit(mockBackendInfo, ticker.read());
+        entry.retransmit(mockBackendInfo, 1, ticker.read());
         assertTrue(entry.isTimedOut(ticker.read(), 0));
         ticker.increment(10);
         assertTrue(entry.isTimedOut(ticker.read(), 10));
         assertTrue(entry.isTimedOut(ticker.read(), 0));
         ticker.increment(10);
         assertTrue(entry.isTimedOut(ticker.read(), 10));
@@ -191,7 +187,7 @@ public class SequencedQueueEntryTest {
     @Test
     public void testRetransmit() {
         assertFalse(mockActor.msgAvailable());
     @Test
     public void testRetransmit() {
         assertFalse(mockActor.msgAvailable());
-        entry.retransmit(mockBackendInfo, ticker.read());
+        entry.retransmit(mockBackendInfo, 0, ticker.read());
 
         assertTrue(mockActor.msgAvailable());
         assertRequestEquals(mockRequest, mockActor.receiveOne(Duration.apply(5, TimeUnit.SECONDS)));
 
         assertTrue(mockActor.msgAvailable());
         assertRequestEquals(mockRequest, mockActor.receiveOne(Duration.apply(5, TimeUnit.SECONDS)));
@@ -201,8 +197,8 @@ public class SequencedQueueEntryTest {
          assertTrue(o instanceof RequestEnvelope);
 
          final RequestEnvelope actual = (RequestEnvelope) o;
          assertTrue(o instanceof RequestEnvelope);
 
          final RequestEnvelope actual = (RequestEnvelope) o;
-         assertEquals(0, actual.getRetry());
-         assertEquals(0, actual.getSequence());
+         assertEquals(0, actual.getSessionId());
+         assertEquals(0, actual.getTxSequence());
          assertEquals(expected.getTarget(), actual.getMessage().getTarget());
     }
 }
          assertEquals(expected.getTarget(), actual.getMessage().getTarget());
     }
 }
index b05ca3a7e00029da66ef2efc57373cafb1e2ed56..fb92de9351e4bff3d9a529ebc5158ebc327e856d 100644 (file)
@@ -54,7 +54,7 @@ public class SequencedQueueTest {
         private static final long serialVersionUID = 1L;
 
         MockFailure(final WritableIdentifier target, final RequestException cause) {
         private static final long serialVersionUID = 1L;
 
         MockFailure(final WritableIdentifier target, final RequestException cause) {
-            super(target, cause);
+            super(target, 0, cause);
         }
 
         @Override
         }
 
         @Override
@@ -72,7 +72,7 @@ public class SequencedQueueTest {
         private static final long serialVersionUID = 1L;
 
         MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
         private static final long serialVersionUID = 1L;
 
         MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
-            super(target, replyTo);
+            super(target, 0, replyTo);
         }
 
         @Override
         }
 
         @Override
@@ -135,7 +135,7 @@ public class SequencedQueueTest {
         ticker.increment(ThreadLocalRandom.current().nextLong());
 
         mockActor = TestProbe.apply(actorSystem);
         ticker.increment(ThreadLocalRandom.current().nextLong());
 
         mockActor = TestProbe.apply(actorSystem);
-        mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
+        mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
         mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
         mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
@@ -167,7 +167,7 @@ public class SequencedQueueTest {
         queue.close();
 
         // Kaboom
         queue.close();
 
         // Kaboom
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
     }
 
     @Test
     }
 
     @Test
@@ -178,7 +178,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testPoison() {
 
     @Test
     public void testPoison() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
         queue.poison(mockCause);
 
         final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
         queue.poison(mockCause);
 
         final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
@@ -192,7 +192,7 @@ public class SequencedQueueTest {
         queue.poison(mockCause);
 
         // Kaboom
         queue.poison(mockCause);
 
         // Kaboom
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
     }
 
     @Test
     }
 
     @Test
@@ -203,7 +203,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testEnqueueRequestNeedsBackend() {
 
     @Test
     public void testEnqueueRequestNeedsBackend() {
-        final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
+        final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
 
         assertNotNull(ret);
         assertFalse(ret.isPresent());
 
         assertNotNull(ret);
         assertFalse(ret.isPresent());
@@ -225,7 +225,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testSetBackendWithNoResolution() {
 
     @Test
     public void testSetBackendWithNoResolution() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
@@ -235,7 +235,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testSetBackendWithWrongProof() {
 
     @Test
     public void testSetBackendWithWrongProof() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         assertTrue(queue.expectProof(proof));
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         assertTrue(queue.expectProof(proof));
@@ -252,8 +252,8 @@ public class SequencedQueueTest {
     }
 
     @Test
     }
 
     @Test
-    public void testSetbackedWithRequestsNoTimer() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+    public void testSetBackendWithRequestsNoTimer() {
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         assertTrue(queue.expectProof(proof));
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         assertTrue(queue.expectProof(proof));
@@ -270,7 +270,7 @@ public class SequencedQueueTest {
     public void testEnqueueRequestNeedsTimer() {
         setupBackend();
 
     public void testEnqueueRequestNeedsTimer() {
         setupBackend();
 
-        final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
+        final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
         assertNotNull(ret);
         assertTrue(ret.isPresent());
         assertTransmit(mockRequest, 0);
         assertNotNull(ret);
         assertTrue(ret.isPresent());
         assertTransmit(mockRequest, 0);
@@ -281,13 +281,13 @@ public class SequencedQueueTest {
         setupBackend();
 
         // First request
         setupBackend();
 
         // First request
-        Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
+        Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
         assertNotNull(ret);
         assertTrue(ret.isPresent());
         assertTransmit(mockRequest, 0);
 
         // Second request, no timer fired
         assertNotNull(ret);
         assertTrue(ret.isPresent());
         assertTransmit(mockRequest, 0);
 
         // Second request, no timer fired
-        ret = queue.enqueueRequest(1, mockRequest2, mockCallback);
+        ret = queue.enqueueRequest(mockRequest2, mockCallback);
         assertNull(ret);
         assertTransmit(mockRequest2, 1);
     }
         assertNull(ret);
         assertTransmit(mockRequest2, 1);
     }
@@ -300,14 +300,14 @@ public class SequencedQueueTest {
 
     @Test
     public void testRunTimeoutWithoutShift() throws NoProgressException {
 
     @Test
     public void testRunTimeoutWithoutShift() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
         final boolean ret = queue.runTimeout();
         assertFalse(ret);
     }
 
     @Test
     public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
         final boolean ret = queue.runTimeout();
         assertFalse(ret);
     }
 
     @Test
     public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1);
 
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1);
 
@@ -317,7 +317,9 @@ public class SequencedQueueTest {
 
     @Test
     public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
 
     @Test
     public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        setupBackend();
+
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS);
 
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS);
 
@@ -327,7 +329,9 @@ public class SequencedQueueTest {
 
     @Test
     public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
 
     @Test
     public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        setupBackend();
+
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1);
 
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1);
 
@@ -337,7 +341,7 @@ public class SequencedQueueTest {
 
     @Test(expected=NoProgressException.class)
     public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
 
     @Test(expected=NoProgressException.class)
     public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
 
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
 
@@ -347,7 +351,7 @@ public class SequencedQueueTest {
 
     @Test(expected=NoProgressException.class)
     public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
 
     @Test(expected=NoProgressException.class)
     public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
 
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
 
@@ -382,7 +386,9 @@ public class SequencedQueueTest {
 
     @Test
     public void testCompleteSingle() {
 
     @Test
     public void testCompleteSingle() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        setupBackend();
+
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
         verify(mockCallback).complete(mockResponse);
 
         ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
         verify(mockCallback).complete(mockResponse);
@@ -395,7 +401,9 @@ public class SequencedQueueTest {
 
     @Test
     public void testCompleteNull() {
 
     @Test
     public void testCompleteNull() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        setupBackend();
+
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         doReturn(null).when(mockCallback).complete(mockResponse);
 
 
         doReturn(null).when(mockCallback).complete(mockResponse);
 
@@ -408,10 +416,10 @@ public class SequencedQueueTest {
     public void testProgressRecord() throws NoProgressException {
         setupBackend();
 
     public void testProgressRecord() throws NoProgressException {
         setupBackend();
 
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(10);
 
         ticker.increment(10);
-        queue.enqueueRequest(1, mockRequest2, mockCallback);
+        queue.enqueueRequest(mockRequest2, mockCallback);
         queue.complete(mockBehavior, mockResponseEnvelope);
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11);
         queue.complete(mockBehavior, mockResponseEnvelope);
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11);
@@ -436,8 +444,8 @@ public class SequencedQueueTest {
         assertTrue(o instanceof RequestEnvelope);
 
         final RequestEnvelope actual = (RequestEnvelope) o;
         assertTrue(o instanceof RequestEnvelope);
 
         final RequestEnvelope actual = (RequestEnvelope) o;
-        assertEquals(0, actual.getRetry());
-        assertEquals(sequence, actual.getSequence());
+        assertEquals(0, actual.getSessionId());
+        assertEquals(sequence, actual.getTxSequence());
         assertSame(expected, actual.getMessage());
     }
 }
         assertSame(expected, actual.getMessage());
     }
 }
index 605de6714084390dbf3f10022ea7862ac2d24655..f59703cc21d89bbf2fac91b6be93084fd222ddcc 100644 (file)
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-slf4j_${scala.version}</artifactId>
     </dependency>
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-slf4j_${scala.version}</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.scala-lang.modules</groupId>
+      <artifactId>scala-java8-compat_${scala.version}</artifactId>
+    </dependency>
     <dependency>
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-testkit_${scala.version}</artifactId>
     <dependency>
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-testkit_${scala.version}</artifactId>
index 60919c05a42c2ae76b856e14315a0d1bdcac1653..cd104b597b15a8524bd84d07c37b20b5ebe2ad54 100644 (file)
@@ -67,7 +67,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             final LocalHistoryIdentifier historyId, final long transactionId,
             final java.util.Optional<ShardBackendInfo> backend) {
 
             final LocalHistoryIdentifier historyId, final long transactionId,
             final java.util.Optional<ShardBackendInfo> backend) {
 
-        final java.util.Optional<DataTree> dataTree = backend.flatMap(t -> t.getDataTree());
+        final java.util.Optional<DataTree> dataTree = backend.flatMap(ShardBackendInfo::getDataTree);
         final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId);
         if (dataTree.isPresent()) {
             return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot());
         final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId);
         if (dataTree.isPresent()) {
             return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot());
@@ -141,7 +141,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         checkSealed();
 
         final SettableFuture<Boolean> ret = SettableFuture.create();
         checkSealed();
 
         final SettableFuture<Boolean> ret = SettableFuture.create();
-        client().sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(false)), t -> {
+        client().sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
             if (t instanceof TransactionCommitSuccess) {
                 ret.set(Boolean.TRUE);
             } else if (t instanceof RequestFailure) {
             if (t instanceof TransactionCommitSuccess) {
                 ret.set(Boolean.TRUE);
             } else if (t instanceof RequestFailure) {
@@ -156,7 +156,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void abort(final VotingFuture<Void> ret) {
         checkSealed();
 
     void abort(final VotingFuture<Void> ret) {
         checkSealed();
 
-        client.sendRequest(nextSequence(), new TransactionAbortRequest(getIdentifier(), client().self()), t -> {
+        client.sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), client().self()), t -> {
             if (t instanceof TransactionAbortSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
             if (t instanceof TransactionAbortSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -170,7 +170,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void canCommit(final VotingFuture<?> ret) {
         checkSealed();
 
     void canCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        client.sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(true)), t -> {
+        client.sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
             if (t instanceof TransactionCanCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
             if (t instanceof TransactionCanCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -184,7 +184,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void preCommit(final VotingFuture<?> ret) {
         checkSealed();
 
     void preCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        client.sendRequest(nextSequence(), new TransactionPreCommitRequest(getIdentifier(), client().self()), t-> {
+        client.sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
             if (t instanceof TransactionPreCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
             if (t instanceof TransactionPreCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -198,7 +198,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void doCommit(final VotingFuture<?> ret) {
         checkSealed();
 
     void doCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        client.sendRequest(nextSequence(), new TransactionDoCommitRequest(getIdentifier(), client().self()), t-> {
+        client.sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
             if (t instanceof TransactionCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
             if (t instanceof TransactionCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -224,5 +224,4 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     abstract void doAbort();
 
     abstract TransactionRequest<?> doCommit(boolean coordinated);
     abstract void doAbort();
 
     abstract TransactionRequest<?> doCommit(boolean coordinated);
-
 }
 }
index a99ea3dfb42ba8a7b44c52320c3802528fd9c05d..b84008ca39e0ecb9bda50fe2fb895661431fea92 100644 (file)
@@ -174,8 +174,8 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
         transactions.remove(transaction.getIdentifier());
     }
 
         transactions.remove(transaction.getIdentifier());
     }
 
-    void sendRequest(final long sequence, final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
-        sendRequest(sequence, request, response -> {
+    void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
+        sendRequest(request, response -> {
             completer.accept(response);
             return this;
         });
             completer.accept(response);
             return this;
         });
index 1dec84631a2904f6c7b528b737ec14579d7c6010..9e787f12e171e356efc7e5f10c632f49afc06a33 100644 (file)
@@ -87,7 +87,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void doAbort() {
 
     @Override
     void doAbort() {
-        client().sendRequest(nextSequence(), new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER);
+        client().sendRequest(new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER);
         modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted"));
     }
 
         modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted"));
     }
 
index 0633b68f1f992bff6e8095ca69f2790e54e3a0c4..359b428c99d5060f59baa45ca2414a34eba46f98 100644 (file)
@@ -7,30 +7,33 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.OnComplete;
+import akka.actor.ActorRef;
+import akka.japi.Function;
+import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableBiMap.Builder;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableBiMap.Builder;
 import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.client.BackendInfo;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.client.BackendInfo;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
+import scala.compat.java8.FutureConverters;
 
 /**
  * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
 
 /**
  * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
@@ -40,8 +43,6 @@ import scala.concurrent.ExecutionContext;
  * @author Robert Varga
  */
 final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
  * @author Robert Varga
  */
 final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
-    private static final ExecutionContext DIRECT_EXECUTION_CONTEXT =
-            ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
     private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
     private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
 
     private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
     private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
 
@@ -54,6 +55,8 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
     private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
 
     private final ActorContext actorContext;
     private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
 
     private final ActorContext actorContext;
+    // FIXME: this counter should be in superclass somewhere
+    private final AtomicLong nextSessionId = new AtomicLong();
 
     @GuardedBy("this")
     private long nextShard = 1;
 
     @GuardedBy("this")
     private long nextShard = 1;
@@ -105,39 +108,37 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
             return NULL_FUTURE;
         }
 
             return NULL_FUTURE;
         }
 
-        final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<>();
+        final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<ShardBackendInfo>();
+
+        FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
+            LOG.debug("Looking up primary info for {} from {}", shardName, info);
+            return FutureConverters.toJava(Patterns.ask(info.getPrimaryShardActor(),
+                (Function<ActorRef, Object>) replyTo -> new ConnectClientRequest(null, replyTo,
+                    ABIVersion.BORON, ABIVersion.current()), DEAD_TIMEOUT));
+        }).thenApply(response -> {
+            if (response instanceof RequestFailure) {
+                final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
+                LOG.debug("Connect request failed {}", failure, failure.getCause());
+                throw Throwables.propagate(failure.getCause());
+            }
 
 
-        actorContext.findPrimaryShardAsync(shardName).onComplete(new OnComplete<PrimaryShardInfo>() {
-            @Override
-            public void onComplete(final Throwable t, final PrimaryShardInfo v) {
-                if (t != null) {
-                    ret.completeExceptionally(t);
-                } else {
-                    ret.complete(createBackendInfo(v, shardName, cookie));
-                }
+            LOG.debug("Resolved backend information to {}", response);
+
+            Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
+            final ConnectClientSuccess success = (ConnectClientSuccess) response;
+
+            return new ShardBackendInfo(success.getBackend(),
+                nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
+                success.getDataTree(), success.getMaxMessages());
+        }).whenComplete((info, t) -> {
+            if (t != null) {
+                ret.completeExceptionally(t);
+            } else {
+                ret.complete(info);
             }
             }
-        }, DIRECT_EXECUTION_CONTEXT);
+        });
 
         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
         return ret;
     }
 
         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
         return ret;
     }
-
-    private static ABIVersion toABIVersion(final short version) {
-        switch (version) {
-            case DataStoreVersions.BORON_VERSION:
-                return ABIVersion.BORON;
-        }
-
-        throw new IllegalArgumentException("Unsupported version " + version);
-    }
-
-    private static ShardBackendInfo createBackendInfo(final Object result, final String shardName, final Long cookie) {
-        Preconditions.checkArgument(result instanceof PrimaryShardInfo);
-        final PrimaryShardInfo info = (PrimaryShardInfo) result;
-
-        LOG.debug("Creating backend information for {}", info);
-        return new ShardBackendInfo(info.getPrimaryShardActor().resolveOne(DEAD_TIMEOUT).value().get().get(),
-            toABIVersion(info.getPrimaryShardVersion()), shardName, UnsignedLong.fromLongBits(cookie),
-            info.getLocalShardDataTree());
-     }
 }
 }
index 13b7fb4f56555d032fea2be8b3f387dd40b24288..9fb1b89580e3557a346f4925e31d39d4d0e86f22 100644 (file)
@@ -95,21 +95,21 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
         // Make sure we send any modifications before issuing a read
         ensureFlushedBuider();
 
         // Make sure we send any modifications before issuing a read
         ensureFlushedBuider();
-        client().sendRequest(nextSequence(), request, completer);
+        client().sendRequest(request, completer);
         return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
     }
 
     @Override
     CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
         final SettableFuture<Boolean> future = SettableFuture.create();
         return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
     }
 
     @Override
     CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
         final SettableFuture<Boolean> future = SettableFuture.create();
-        return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), client().self(), path),
+        return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
             t -> completeExists(future, t), future);
     }
 
     @Override
     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
         final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
             t -> completeExists(future, t), future);
     }
 
     @Override
     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
         final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
-        return sendReadRequest(new ReadTransactionRequest(getIdentifier(), client().self(), path),
+        return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
             t -> completeRead(future, t), future);
     }
 
             t -> completeRead(future, t), future);
     }
 
@@ -122,6 +122,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     private void ensureInitializedBuider() {
         if (!builderBusy) {
 
     private void ensureInitializedBuider() {
         if (!builderBusy) {
+            builder.setSequence(nextSequence());
             builderBusy = true;
         }
     }
             builderBusy = true;
         }
     }
@@ -133,7 +134,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     }
 
     private void flushBuilder() {
     }
 
     private void flushBuilder() {
-        client().sendRequest(nextSequence(), builder.build(), this::completeModify);
+        client().sendRequest(builder.build(), this::completeModify);
         builderBusy = false;
     }
 
         builderBusy = false;
     }
 
index 6bb7072ea3a4d21aa9a7bbe963adee3d207f22f2..92a213d1bfbe7bf9127ded018ea5572cce4d9ad3 100644 (file)
@@ -30,9 +30,9 @@ final class ShardBackendInfo extends BackendInfo {
     private final UnsignedLong cookie;
     private final String shardName;
 
     private final UnsignedLong cookie;
     private final String shardName;
 
-    ShardBackendInfo(final ActorRef actor, final ABIVersion version, final String shardName, final UnsignedLong cookie,
-        final Optional<DataTree> dataTree) {
-        super(actor, version);
+    ShardBackendInfo(final ActorRef actor, final long sessionId, final ABIVersion version, final String shardName,
+        final UnsignedLong cookie, final Optional<DataTree> dataTree, final int maxMessages) {
+        super(actor, sessionId, version, maxMessages);
         this.shardName = Preconditions.checkNotNull(shardName);
         this.cookie = Preconditions.checkNotNull(cookie);
         this.dataTree = Preconditions.checkNotNull(dataTree);
         this.shardName = Preconditions.checkNotNull(shardName);
         this.cookie = Preconditions.checkNotNull(cookie);
         this.dataTree = Preconditions.checkNotNull(dataTree);