BUG-5280: separate request sequence and transmit sequence 33/43333/28
authorRobert Varga <rovarga@cisco.com>
Mon, 8 Aug 2016 15:04:54 +0000 (17:04 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 24 Aug 2016 01:51:03 +0000 (01:51 +0000)
Clean up the confusion in sequence numbers. There are actually
two sequences:
- logical request sequence, which indicates the order of requests
  in which they should be applied to the target entity. It is
  assigned by logic emitting the request.
- transmit sequence within a connection to the backend, as
  initiated by ConnectClientRequest. It is assigned by SequencedQueue
  as it is transmitting requests and reset when a new connection
  to the backend is made.

This requires establishing the concept of a session, which is
a single conversation between frontend and backend. It is severed
whenever the frontend times out and re-established when the leader
is found and it responds with ConnectClientSuccess.

The sending of ConnectClientRequest is not done via the queue,
as it is part of backend resolution process. Since this is not
a performance-critical path, we use simple Patterns.ask() to
send the request and get completion notification -- which we then
translate to ShardBackendInfo.

ConnectClientSuccess gives us backend-preferred version and
backend-specified cap on the number of outstanding messages then
it can handle concurrently. This maximum is used to limit the
transmit path of SequencedQueue, so that it does not attempt
to send more requests at any given time.

Internal queue for unsent requests is kept unbounded for now,
subject to a Semaphore-driven throttle in a follow-up patch.

Change-Id: I61663073bf6632c1ed8c036dee37f1ac39cf7794
Signed-off-by: Robert Varga <rovarga@cisco.com>
85 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbortLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CommitLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientFailure.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientFailureProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailure.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailure.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailureProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractMessageProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractRequestFailureProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractRequestProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractSuccessProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Envelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Message.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Request.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestFailure.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Response.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfo.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TxDetails.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntryTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueTest.java
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ShardBackendInfo.java

index 1045306..5dc630c 100644 (file)
@@ -25,6 +25,6 @@ public final class AbortLocalTransactionRequest extends AbstractLocalTransaction
 
     public AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
             final @Nonnull ActorRef replyTo) {
-        super(identifier, replyTo);
+        super(identifier, 0, replyTo);
     }
 }
\ No newline at end of file
index 7730c5d..bc9bb62 100644 (file)
@@ -23,8 +23,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 abstract class AbstractLocalTransactionRequest<T extends AbstractLocalTransactionRequest<T>> extends TransactionRequest<T> {
     private static final long serialVersionUID = 1L;
 
-    AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) {
-        super(identifier, replyTo);
+    AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo) {
+        super(identifier, sequence, replyTo);
     }
 
     @Override
index d23430f..a8fbbcf 100644 (file)
@@ -32,9 +32,9 @@ public abstract class AbstractReadTransactionRequest<T extends AbstractReadTrans
     private static final long serialVersionUID = 1L;
     private final YangInstanceIdentifier path;
 
-    AbstractReadTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo,
+    AbstractReadTransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo,
         final YangInstanceIdentifier path) {
-        super(identifier, replyTo);
+        super(identifier, sequence, replyTo);
         this.path = Preconditions.checkNotNull(path);
     }
 
index 13b03a2..e2d07f1 100644 (file)
@@ -51,9 +51,10 @@ abstract class AbstractReadTransactionRequestProxyV1<T extends AbstractReadTrans
     }
 
     @Override
-    protected final T createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        return createReadRequest(target, replyTo, path);
+    protected final T createRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+        return createReadRequest(target, sequence, replyTo, path);
     }
 
-    abstract T createReadRequest(TransactionIdentifier target, ActorRef replyTo, YangInstanceIdentifier path);
+    abstract T createReadRequest(TransactionIdentifier target, long sequence, ActorRef replyTo,
+            YangInstanceIdentifier path);
 }
index 1f73951..1cb7bb1 100644 (file)
@@ -29,7 +29,7 @@ public final class CommitLocalTransactionRequest extends AbstractLocalTransactio
 
     public CommitLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
             final @Nonnull ActorRef replyTo, final @Nonnull DataTreeModification mod, final boolean coordinated) {
-        super(identifier, replyTo);
+        super(identifier, 0, replyTo);
         this.mod = Preconditions.checkNotNull(mod);
         this.coordinated = coordinated;
     }
index c1380cc..46b460a 100644 (file)
@@ -23,8 +23,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 public final class ConnectClientFailure extends RequestFailure<ClientIdentifier, ConnectClientFailure> {
     private static final long serialVersionUID = 1L;
 
-    ConnectClientFailure(final ClientIdentifier target, final RequestException cause) {
-        super(target, cause);
+    ConnectClientFailure(final ClientIdentifier target, final long sequence, final RequestException cause) {
+        super(target, sequence, cause);
     }
 
     private ConnectClientFailure(final ConnectClientFailure failure, final ABIVersion version) {
index 5635754..7059fe7 100644 (file)
@@ -29,8 +29,9 @@ final class ConnectClientFailureProxyV1 extends AbstractRequestFailureProxy<Clie
     }
 
     @Override
-    protected ConnectClientFailure createFailure(final ClientIdentifier target, final RequestException cause) {
-        return new ConnectClientFailure(target, cause);
+    protected ConnectClientFailure createFailure(final ClientIdentifier target, final long sequence,
+            final RequestException cause) {
+        return new ConnectClientFailure(target, sequence, cause);
     }
 
     @Override
index 4800cea..45e7ba8 100644 (file)
@@ -34,26 +34,23 @@ public final class ConnectClientRequest extends Request<ClientIdentifier, Connec
 
     private final ABIVersion minVersion;
     private final ABIVersion maxVersion;
-    private final long resumeSequence;
 
-    public ConnectClientRequest(final ClientIdentifier identifier, final ActorRef replyTo, final ABIVersion minVersion,
-            final ABIVersion maxVersion) {
-        this(identifier, replyTo, minVersion, maxVersion, 0);
+    ConnectClientRequest(final ClientIdentifier identifier, final long txSequence, final ActorRef replyTo,
+            final ABIVersion minVersion, final ABIVersion maxVersion) {
+        super(identifier, txSequence, replyTo);
+        this.minVersion = Preconditions.checkNotNull(minVersion);
+        this.maxVersion = Preconditions.checkNotNull(maxVersion);
     }
 
     public ConnectClientRequest(final ClientIdentifier identifier, final ActorRef replyTo, final ABIVersion minVersion,
-            final ABIVersion maxVersion, final long resumeSequence) {
-        super(identifier, replyTo);
-        this.minVersion = Preconditions.checkNotNull(minVersion);
-        this.maxVersion = Preconditions.checkNotNull(maxVersion);
-        this.resumeSequence = resumeSequence;
+            final ABIVersion maxVersion) {
+        this(identifier, 0, replyTo, minVersion, maxVersion);
     }
 
     private ConnectClientRequest(final ConnectClientRequest request, final ABIVersion version) {
         super(request, version);
         this.minVersion = request.minVersion;
         this.maxVersion = request.maxVersion;
-        this.resumeSequence = request.resumeSequence;
     }
 
     public ABIVersion getMinVersion() {
@@ -64,13 +61,9 @@ public final class ConnectClientRequest extends Request<ClientIdentifier, Connec
         return maxVersion;
     }
 
-    public long getResumeSequence() {
-        return resumeSequence;
-    }
-
     @Override
     public final ConnectClientFailure toRequestFailure(final RequestException cause) {
-        return new ConnectClientFailure(getTarget(), cause);
+        return new ConnectClientFailure(getTarget(), getSequence(), cause);
     }
 
     @Override
@@ -85,7 +78,6 @@ public final class ConnectClientRequest extends Request<ClientIdentifier, Connec
 
     @Override
     protected @Nonnull ToStringHelper addToStringAttributes(final @Nonnull ToStringHelper toStringHelper) {
-        return super.addToStringAttributes(toStringHelper).add("minVersion", minVersion).add("maxVersion", maxVersion)
-                .add("resumeSequence", resumeSequence);
+        return super.addToStringAttributes(toStringHelper).add("minVersion", minVersion).add("maxVersion", maxVersion);
     }
 }
index 5fb26d0..d4c8b11 100644 (file)
@@ -15,7 +15,6 @@ import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.yangtools.concepts.WritableObjects;
 
 /**
  * Externalizable proxy for use with {@link ConnectClientRequest}. It implements the initial (Boron) serialization
@@ -26,7 +25,6 @@ import org.opendaylight.yangtools.concepts.WritableObjects;
 final class ConnectClientRequestProxyV1 extends AbstractRequestProxy<ClientIdentifier, ConnectClientRequest> {
     private ABIVersion minVersion;
     private ABIVersion maxVersion;
-    private long resumeSequence;
 
     public ConnectClientRequestProxyV1() {
         // for Externalizable
@@ -36,7 +34,6 @@ final class ConnectClientRequestProxyV1 extends AbstractRequestProxy<ClientIdent
         super(request);
         this.minVersion = request.getMinVersion();
         this.maxVersion = request.getMaxVersion();
-        this.resumeSequence = request.getResumeSequence();
     }
 
     @Override
@@ -44,7 +41,6 @@ final class ConnectClientRequestProxyV1 extends AbstractRequestProxy<ClientIdent
         super.writeExternal(out);
         minVersion.writeTo(out);
         maxVersion.writeTo(out);
-        WritableObjects.writeLong(out, resumeSequence);
     }
 
     @Override
@@ -52,12 +48,12 @@ final class ConnectClientRequestProxyV1 extends AbstractRequestProxy<ClientIdent
         super.readExternal(in);
         minVersion = ABIVersion.inexactReadFrom(in);
         maxVersion = ABIVersion.inexactReadFrom(in);
-        resumeSequence = WritableObjects.readLong(in);
     }
 
     @Override
-    protected ConnectClientRequest createRequest(final ClientIdentifier target, final ActorRef replyTo) {
-        return new ConnectClientRequest(target, replyTo, minVersion, maxVersion, resumeSequence);
+    protected ConnectClientRequest createRequest(final ClientIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new ConnectClientRequest(target, sequence, replyTo, minVersion, maxVersion);
     }
 
     @Override
index 7b2ea07..65389ec 100644 (file)
@@ -35,11 +35,11 @@ public final class ConnectClientSuccess extends RequestSuccess<ClientIdentifier,
     private final List<ActorSelection> alternates;
     private final DataTree dataTree;
     private final ActorRef backend;
-    private final long maxMessages;
+    private final int maxMessages;
 
-    ConnectClientSuccess(final ClientIdentifier target, final ActorRef backend, final List<ActorSelection> alternates,
-        final Optional<DataTree> dataTree, final long maxMessages) {
-        super(target);
+    ConnectClientSuccess(final ClientIdentifier target, final long sequence, final ActorRef backend,
+        final List<ActorSelection> alternates, final Optional<DataTree> dataTree, final int maxMessages) {
+        super(target, sequence);
         this.backend = Preconditions.checkNotNull(backend);
         this.alternates = ImmutableList.copyOf(alternates);
         this.dataTree = dataTree.orElse(null);
@@ -47,9 +47,10 @@ public final class ConnectClientSuccess extends RequestSuccess<ClientIdentifier,
         this.maxMessages = maxMessages;
     }
 
-    public ConnectClientSuccess(final @Nonnull ClientIdentifier target, final @Nonnull ActorRef backend,
-            final @Nonnull List<ActorSelection> alternates, final @Nonnull DataTree dataTree, final long maxMessages) {
-        this(target, backend, alternates, Optional.of(dataTree), maxMessages);
+    public ConnectClientSuccess(final @Nonnull ClientIdentifier target, final long sequence,
+            final @Nonnull ActorRef backend, final @Nonnull List<ActorSelection> alternates,
+            final @Nonnull DataTree dataTree, final int maxMessages) {
+        this(target, sequence, backend, alternates, Optional.of(dataTree), maxMessages);
     }
 
     /**
@@ -69,7 +70,7 @@ public final class ConnectClientSuccess extends RequestSuccess<ClientIdentifier,
         return Optional.ofNullable(dataTree);
     }
 
-    public long getMaxMessages() {
+    public int getMaxMessages() {
         return maxMessages;
     }
 
@@ -85,6 +86,7 @@ public final class ConnectClientSuccess extends RequestSuccess<ClientIdentifier,
 
     @Override
     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-        return super.addToStringAttributes(toStringHelper).add("alternates", alternates).add("dataTree", dataTree);
+        return super.addToStringAttributes(toStringHelper).add("alternates", alternates).add("dataTree", dataTree)
+                .add("maxMessages", maxMessages);
     }
 }
index 6c9ae2a..f9fddb7 100644 (file)
@@ -32,7 +32,7 @@ final class ConnectClientSuccessProxyV1 extends AbstractSuccessProxy<ClientIdent
 
     private List<ActorSelection> alternates;
     private ActorRef backend;
-    private long maxMessages;
+    private int maxMessages;
 
     public ConnectClientSuccessProxyV1() {
         // For Externalizable
@@ -42,6 +42,7 @@ final class ConnectClientSuccessProxyV1 extends AbstractSuccessProxy<ClientIdent
         super(success);
         this.alternates = success.getAlternates();
         this.backend = success.getBackend();
+        this.maxMessages = success.getMaxMessages();
         // We are ignoring the DataTree, it is not serializable anyway
     }
 
@@ -49,38 +50,32 @@ final class ConnectClientSuccessProxyV1 extends AbstractSuccessProxy<ClientIdent
     public void writeExternal(final ObjectOutput out) throws IOException {
         super.writeExternal(out);
 
-        out.writeUTF(Serialization.serializedActorPath(backend));
+        out.writeObject(Serialization.serializedActorPath(backend));
+        out.writeInt(maxMessages);
 
         out.writeInt(alternates.size());
         for (ActorSelection b : alternates) {
             out.writeObject(b.toSerializationFormat());
         }
-
-        out.writeLong(maxMessages);
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
 
-        backend = JavaSerializer.currentSystem().value().provider().resolveActorRef(in.readUTF());
-
-        final int backendsSize = in.readInt();
-        if (backendsSize < 1) {
-            throw new IOException("Illegal number of backends " + backendsSize);
-        }
+        backend = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
+        maxMessages = in.readInt();
 
-        alternates = new ArrayList<>(backendsSize);
-        for (int i = 0; i < backendsSize; ++i) {
+        final int alternatesSize = in.readInt();
+        alternates = new ArrayList<>(alternatesSize);
+        for (int i = 0; i < alternatesSize; ++i) {
             alternates.add(ActorSelection.apply(ActorRef.noSender(), (String)in.readObject()));
         }
-
-        maxMessages = in.readLong();
     }
 
     @Override
-    protected ConnectClientSuccess createSuccess(final ClientIdentifier target) {
-        return new ConnectClientSuccess(target, backend, alternates, Optional.empty(), maxMessages);
+    protected ConnectClientSuccess createSuccess(final ClientIdentifier target, final long sequence) {
+        return new ConnectClientSuccess(target, sequence, backend, alternates, Optional.empty(), maxMessages);
     }
 
     @Override
index 28a77c4..eebcf36 100644 (file)
@@ -22,7 +22,11 @@ public final class CreateLocalHistoryRequest extends LocalHistoryRequest<CreateL
     private static final long serialVersionUID = 1L;
 
     public CreateLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+        this(target, 0, replyTo);
+    }
+
+    CreateLocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     private CreateLocalHistoryRequest(final CreateLocalHistoryRequest request, final ABIVersion version) {
index 234e477..8a17e90 100644 (file)
@@ -28,7 +28,8 @@ final class CreateLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequest
     }
 
     @Override
-    protected CreateLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        return new CreateLocalHistoryRequest(target, replyTo);
+    protected CreateLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new CreateLocalHistoryRequest(target, sequence, replyTo);
     }
 }
index 1997ddd..505ad37 100644 (file)
@@ -21,8 +21,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 public final class DestroyLocalHistoryRequest extends LocalHistoryRequest<DestroyLocalHistoryRequest> {
     private static final long serialVersionUID = 1L;
 
-    public DestroyLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    public DestroyLocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     private DestroyLocalHistoryRequest(final DestroyLocalHistoryRequest request, final ABIVersion version) {
index 9e953ce..845d019 100644 (file)
@@ -28,7 +28,8 @@ final class DestroyLocalHistoryRequestProxyV1 extends AbstractLocalHistoryReques
     }
 
     @Override
-    protected DestroyLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        return new DestroyLocalHistoryRequest(target, replyTo);
+    protected DestroyLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new DestroyLocalHistoryRequest(target, sequence, replyTo);
     }
 }
index cbb612f..f689e25 100644 (file)
@@ -23,9 +23,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public final class ExistsTransactionRequest extends AbstractReadTransactionRequest<ExistsTransactionRequest> {
     private static final long serialVersionUID = 1L;
 
-    public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo,
-        final @Nonnull YangInstanceIdentifier path) {
-        super(identifier, replyTo, path);
+    public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
+            final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
+        super(identifier, sequence, replyTo, path);
     }
 
     private ExistsTransactionRequest(final ExistsTransactionRequest request, final ABIVersion version) {
index 4f5bd1c..84f7400 100644 (file)
@@ -29,8 +29,8 @@ final class ExistsTransactionRequestProxyV1 extends AbstractReadTransactionReque
     }
 
     @Override
-    ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo,
-            final YangInstanceIdentifier path) {
-        return new ExistsTransactionRequest(target, replyTo, path);
+    ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo, final YangInstanceIdentifier path) {
+        return new ExistsTransactionRequest(target, sequence, replyTo, path);
     }
 }
\ No newline at end of file
index 5481d0d..8a1704d 100644 (file)
@@ -23,8 +23,8 @@ public final class ExistsTransactionSuccess extends TransactionSuccess<ExistsTra
     private static final long serialVersionUID = 1L;
     private final boolean exists;
 
-    public ExistsTransactionSuccess(final TransactionIdentifier target, final boolean exists) {
-        super(target);
+    public ExistsTransactionSuccess(final TransactionIdentifier target, final long sequence, final boolean exists) {
+        super(target, sequence);
         this.exists = exists;
     }
 
index 6932d24..538c1ff 100644 (file)
@@ -44,7 +44,7 @@ final class ExistsTransactionSuccessProxyV1 extends AbstractTransactionSuccessPr
     }
 
     @Override
-    protected ExistsTransactionSuccess createSuccess(final TransactionIdentifier target) {
-        return new ExistsTransactionSuccess(target, exists);
+    protected ExistsTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new ExistsTransactionSuccess(target, sequence, exists);
     }
 }
index 795468c..4fd69c2 100644 (file)
@@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 public final class LocalHistoryFailure extends RequestFailure<LocalHistoryIdentifier, LocalHistoryFailure> {
     private static final long serialVersionUID = 1L;
 
-    LocalHistoryFailure(final LocalHistoryIdentifier target, final RequestException cause) {
-        super(target, cause);
+    LocalHistoryFailure(final LocalHistoryIdentifier target, final long sequence, final RequestException cause) {
+        super(target, sequence, cause);
     }
 
     @Override
index e8cdf6d..0f3e533 100644 (file)
@@ -31,8 +31,9 @@ final class LocalHistoryFailureProxyV1 extends AbstractRequestFailureProxy<Local
     }
 
     @Override
-    protected LocalHistoryFailure createFailure(final LocalHistoryIdentifier target, final RequestException cause) {
-        return new LocalHistoryFailure(target, cause);
+    protected LocalHistoryFailure createFailure(final LocalHistoryIdentifier target, final long sequence,
+            final RequestException cause) {
+        return new LocalHistoryFailure(target, sequence, cause);
     }
 
     @Override
index badb763..c747116 100644 (file)
@@ -26,8 +26,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 public abstract class LocalHistoryRequest<T extends LocalHistoryRequest<T>> extends Request<LocalHistoryIdentifier, T> {
     private static final long serialVersionUID = 1L;
 
-    LocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    LocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     LocalHistoryRequest(final T request, final ABIVersion version) {
@@ -36,7 +36,7 @@ public abstract class LocalHistoryRequest<T extends LocalHistoryRequest<T>> exte
 
     @Override
     public final LocalHistoryFailure toRequestFailure(final RequestException cause) {
-        return new LocalHistoryFailure(getTarget(), cause);
+        return new LocalHistoryFailure(getTarget(), getSequence(), cause);
     }
 
     @Override
index 4e588cc..3b8ed35 100644 (file)
@@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 public final class LocalHistorySuccess extends RequestSuccess<LocalHistoryIdentifier, LocalHistorySuccess> {
     private static final long serialVersionUID = 1L;
 
-    public LocalHistorySuccess(final LocalHistoryIdentifier target) {
-        super(target);
+    public LocalHistorySuccess(final LocalHistoryIdentifier target, final long sequence) {
+        super(target, sequence);
     }
 
     private LocalHistorySuccess(final LocalHistorySuccess success, final ABIVersion version) {
index 7806c33..23858b2 100644 (file)
@@ -34,7 +34,7 @@ final class LocalHistorySuccessProxyV1 extends AbstractSuccessProxy<LocalHistory
     }
 
     @Override
-    protected LocalHistorySuccess createSuccess(final LocalHistoryIdentifier target) {
-        return new LocalHistorySuccess(target);
+    protected LocalHistorySuccess createSuccess(final LocalHistoryIdentifier target, final long sequence) {
+        return new LocalHistorySuccess(target, sequence);
     }
 }
index fd1b1b8..416297d 100644 (file)
@@ -28,9 +28,9 @@ public final class ModifyTransactionRequest extends TransactionRequest<ModifyTra
     private final List<TransactionModification> modifications;
     private final PersistenceProtocol protocol;
 
-    ModifyTransactionRequest(final TransactionIdentifier target, final ActorRef replyTo,
+    ModifyTransactionRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo,
         final List<TransactionModification> modifications, final PersistenceProtocol protocol) {
-        super(target, replyTo);
+        super(target, sequence, replyTo);
         this.modifications = ImmutableList.copyOf(modifications);
         this.protocol = protocol;
     }
index 336902e..4e2a8ce 100644 (file)
@@ -30,7 +30,8 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
     private final List<TransactionModification> modifications = new ArrayList<>(1);
     private final TransactionIdentifier identifier;
     private final ActorRef replyTo;
-    private PersistenceProtocol protocol = null;
+    private PersistenceProtocol protocol;
+    private Long sequence;
 
     public ModifyTransactionRequestBuilder(final TransactionIdentifier identifier, final ActorRef replyTo) {
         this.identifier = Preconditions.checkNotNull(identifier);
@@ -42,24 +43,28 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
         return identifier;
     }
 
-    private void checkFinished() {
-        Preconditions.checkState(protocol != null, "Batch has already been finished");
+    private void checkNotFinished() {
+        Preconditions.checkState(protocol == null, "Batch has already been finished");
     }
 
     public void addModification(final TransactionModification modification) {
-        checkFinished();
+        checkNotFinished();
         modifications.add(Preconditions.checkNotNull(modification));
     }
 
+    public void setSequence(final long sequence) {
+        this.sequence = sequence;
+    }
+
     public void setAbort() {
-        checkFinished();
+        checkNotFinished();
         // Transaction is being aborted, no need to transmit operations
         modifications.clear();
         protocol = PersistenceProtocol.ABORT;
     }
 
     public void setCommit(final boolean coordinated) {
-        checkFinished();
+        checkNotFinished();
         protocol = coordinated ? PersistenceProtocol.THREE_PHASE : PersistenceProtocol.SIMPLE;
     }
 
@@ -69,9 +74,13 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
 
     @Override
     public ModifyTransactionRequest build() {
-        final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, replyTo, modifications, protocol);
+        Preconditions.checkState(sequence != null, "Request sequence has not been set");
+
+        final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, sequence, replyTo, modifications,
+            protocol);
         modifications.clear();
         protocol = null;
+        sequence = null;
         return ret;
     }
 }
index 57b5050..d9de3af 100644 (file)
@@ -76,7 +76,8 @@ final class ModifyTransactionRequestProxyV1 extends AbstractTransactionRequestPr
     }
 
     @Override
-    protected ModifyTransactionRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        return new ModifyTransactionRequest(target, replyTo, modifications, protocol.orElse(null));
+    protected ModifyTransactionRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new ModifyTransactionRequest(target, sequence, replyTo, modifications, protocol.orElse(null));
     }
 }
index 4390d17..ecbd749 100644 (file)
@@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 public final class PurgeLocalHistoryRequest extends LocalHistoryRequest<PurgeLocalHistoryRequest> {
     private static final long serialVersionUID = 1L;
 
-    public PurgeLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    public PurgeLocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     private PurgeLocalHistoryRequest(final PurgeLocalHistoryRequest request, final ABIVersion version) {
index 0aaac6e..1ac90da 100644 (file)
@@ -28,7 +28,8 @@ final class PurgeLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestP
     }
 
     @Override
-    protected PurgeLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
-        return new PurgeLocalHistoryRequest(target, replyTo);
+    protected PurgeLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new PurgeLocalHistoryRequest(target, sequence, replyTo);
     }
 }
index 20b679f..2d754cb 100644 (file)
@@ -23,9 +23,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public final class ReadTransactionRequest extends AbstractReadTransactionRequest<ReadTransactionRequest> {
     private static final long serialVersionUID = 1L;
 
-    public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo,
-            final @Nonnull YangInstanceIdentifier path) {
-        super(identifier, replyTo, path);
+    public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
+            final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
+        super(identifier, sequence, replyTo, path);
     }
 
     private ReadTransactionRequest(final ReadTransactionRequest request, final ABIVersion version) {
index ae0f6f6..0c60b2b 100644 (file)
@@ -29,8 +29,8 @@ final class ReadTransactionRequestProxyV1 extends AbstractReadTransactionRequest
     }
 
     @Override
-    ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo,
-            final YangInstanceIdentifier path) {
-        return new ReadTransactionRequest(target, replyTo, path);
+    ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo, final YangInstanceIdentifier path) {
+        return new ReadTransactionRequest(target, sequence, replyTo, path);
     }
 }
\ No newline at end of file
index 45e7631..f3d8395 100644 (file)
@@ -24,8 +24,9 @@ public final class ReadTransactionSuccess extends TransactionSuccess<ReadTransac
     private static final long serialVersionUID = 1L;
     private final Optional<NormalizedNode<?, ?>> data;
 
-    public ReadTransactionSuccess(final TransactionIdentifier identifier, final Optional<NormalizedNode<?, ?>> data) {
-        super(identifier);
+    public ReadTransactionSuccess(final TransactionIdentifier identifier, final long sequence,
+            final Optional<NormalizedNode<?, ?>> data) {
+        super(identifier, sequence);
         this.data = Preconditions.checkNotNull(data);
     }
 
index 9232842..ed45695 100644 (file)
@@ -63,7 +63,7 @@ final class ReadTransactionSuccessProxyV1 extends AbstractTransactionSuccessProx
     }
 
     @Override
-    protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target) {
-        return new ReadTransactionSuccess(target, data);
+    protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new ReadTransactionSuccess(target, sequence, data);
     }
 }
index a7f132a..b8499cc 100644 (file)
@@ -21,8 +21,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionAbortRequest extends TransactionRequest<TransactionAbortRequest> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionAbortRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    public TransactionAbortRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     @Override
index bc1fc58..0743e3e 100644 (file)
@@ -28,7 +28,8 @@ final class TransactionAbortRequestProxyV1 extends AbstractTransactionRequestPro
     }
 
     @Override
-    protected TransactionAbortRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        return new TransactionAbortRequest(target, replyTo);
+    protected TransactionAbortRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new TransactionAbortRequest(target, sequence, replyTo);
     }
 }
index c9625af..69c6ddd 100644 (file)
@@ -19,8 +19,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionAbortSuccess extends TransactionSuccess<TransactionAbortSuccess> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionAbortSuccess(final TransactionIdentifier identifier) {
-        super(identifier);
+    public TransactionAbortSuccess(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
     }
 
     @Override
index 2c34737..3cf513a 100644 (file)
@@ -27,7 +27,7 @@ final class TransactionAbortSuccessProxyV1 extends AbstractTransactionSuccessPro
     }
 
     @Override
-    protected TransactionAbortSuccess createSuccess(final TransactionIdentifier target) {
-        return new TransactionAbortSuccess(target);
+    protected TransactionAbortSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new TransactionAbortSuccess(target, sequence);
     }
 }
index c7d4176..4e689b2 100644 (file)
@@ -19,8 +19,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionCanCommitSuccess extends TransactionSuccess<TransactionCanCommitSuccess> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionCanCommitSuccess(final TransactionIdentifier identifier) {
-        super(identifier);
+    public TransactionCanCommitSuccess(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
     }
 
     @Override
index a8af4af..b645d68 100644 (file)
@@ -40,7 +40,7 @@ final class TransactionCanCommitSuccessProxyV1 extends AbstractTransactionSucces
     }
 
     @Override
-    protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target) {
-        return new TransactionCanCommitSuccess(target);
+    protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new TransactionCanCommitSuccess(target, sequence);
     }
 }
index 275b5cf..6b28244 100644 (file)
@@ -19,8 +19,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionCommitSuccess extends TransactionSuccess<TransactionCommitSuccess> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionCommitSuccess(final TransactionIdentifier identifier) {
-        super(identifier);
+    public TransactionCommitSuccess(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
     }
 
     @Override
index 4628a9d..aaf07c2 100644 (file)
@@ -27,7 +27,7 @@ final class TransactionCommitSuccessProxyV1 extends AbstractTransactionSuccessPr
     }
 
     @Override
-    protected TransactionCommitSuccess createSuccess(final TransactionIdentifier target) {
-        return new TransactionCommitSuccess(target);
+    protected TransactionCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new TransactionCommitSuccess(target, sequence);
     }
 }
index 6707aa1..955c268 100644 (file)
@@ -21,8 +21,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionDoCommitRequest extends TransactionRequest<TransactionDoCommitRequest> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionDoCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    public TransactionDoCommitRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     @Override
index f771844..ce9ca9b 100644 (file)
@@ -28,7 +28,8 @@ final class TransactionDoCommitRequestProxyV1 extends AbstractTransactionRequest
     }
 
     @Override
-    protected TransactionDoCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        return new TransactionDoCommitRequest(target, replyTo);
+    protected TransactionDoCommitRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new TransactionDoCommitRequest(target, sequence, replyTo);
     }
 }
index 750d327..e0b6a59 100644 (file)
@@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionFailure extends RequestFailure<TransactionIdentifier, TransactionFailure> {
     private static final long serialVersionUID = 1L;
 
-    TransactionFailure(final TransactionIdentifier target, final RequestException cause) {
-        super(target, cause);
+    TransactionFailure(final TransactionIdentifier target, final long sequence, final RequestException cause) {
+        super(target, sequence, cause);
     }
 
     @Override
index fd99749..15cf09c 100644 (file)
@@ -31,8 +31,9 @@ final class TransactionFailureProxyV1 extends AbstractRequestFailureProxy<Transa
     }
 
     @Override
-    protected TransactionFailure createFailure(final TransactionIdentifier target, final RequestException cause) {
-        return new TransactionFailure(target, cause);
+    protected TransactionFailure createFailure(final TransactionIdentifier target, final long sequence,
+            final RequestException cause) {
+        return new TransactionFailure(target, sequence, cause);
     }
 
     @Override
index 2e73f47..d79dbea 100644 (file)
@@ -21,8 +21,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionPreCommitRequest extends TransactionRequest<TransactionPreCommitRequest> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionPreCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        super(target, replyTo);
+    public TransactionPreCommitRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+        super(target, sequence, replyTo);
     }
 
     @Override
index dd41e6a..649db56 100644 (file)
@@ -28,7 +28,8 @@ final class TransactionPreCommitRequestProxyV1 extends AbstractTransactionReques
     }
 
     @Override
-    protected TransactionPreCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
-        return new TransactionPreCommitRequest(target, replyTo);
+    protected TransactionPreCommitRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo) {
+        return new TransactionPreCommitRequest(target, sequence, replyTo);
     }
 }
index 8a7da4e..1cf00e6 100644 (file)
@@ -18,8 +18,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public final class TransactionPreCommitSuccess extends TransactionSuccess<TransactionPreCommitSuccess> {
     private static final long serialVersionUID = 1L;
 
-    public TransactionPreCommitSuccess(final TransactionIdentifier identifier) {
-        super(identifier);
+    public TransactionPreCommitSuccess(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
     }
 
     @Override
index 2c0cdea..387a60c 100644 (file)
@@ -27,7 +27,7 @@ final class TransactionPreCommitSuccessProxyV1 extends AbstractTransactionSucces
     }
 
     @Override
-    protected TransactionPreCommitSuccess createSuccess(final TransactionIdentifier target) {
-        return new TransactionPreCommitSuccess(target);
+    protected TransactionPreCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new TransactionPreCommitSuccess(target, sequence);
     }
 }
index 154d4b3..7ae6b81 100644 (file)
@@ -26,8 +26,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public abstract class TransactionRequest<T extends TransactionRequest<T>> extends Request<TransactionIdentifier, T> {
     private static final long serialVersionUID = 1L;
 
-    TransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) {
-        super(identifier, replyTo);
+    TransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo) {
+        super(identifier, sequence, replyTo);
     }
 
     TransactionRequest(final T request, final ABIVersion version) {
@@ -36,7 +36,7 @@ public abstract class TransactionRequest<T extends TransactionRequest<T>> extend
 
     @Override
     public final TransactionFailure toRequestFailure(final RequestException cause) {
-        return new TransactionFailure(getTarget(), cause);
+        return new TransactionFailure(getTarget(), getSequence(), cause);
     }
 
     @Override
index daa4ba1..77a6b56 100644 (file)
@@ -24,8 +24,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 public abstract class TransactionSuccess<T extends TransactionSuccess<T>> extends RequestSuccess<TransactionIdentifier, T> {
     private static final long serialVersionUID = 1L;
 
-    TransactionSuccess(final TransactionIdentifier identifier) {
-        super(identifier);
+    TransactionSuccess(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
     }
 
     @Override
index 89458b8..69fc293 100644 (file)
@@ -15,9 +15,10 @@ import org.opendaylight.yangtools.concepts.WritableObjects;
 
 abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externalizable {
     private static final long serialVersionUID = 1L;
+
     private T message;
-    private long sequence;
-    private long retry;
+    private long sessionId;
+    private long txSequence;
 
     public AbstractEnvelopeProxy() {
         // for Externalizable
@@ -25,13 +26,13 @@ abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externa
 
     AbstractEnvelopeProxy(final Envelope<T> envelope) {
         message = envelope.getMessage();
-        sequence = envelope.getSequence();
-        retry = envelope.getRetry();
+        txSequence = envelope.getTxSequence();
+        sessionId = envelope.getSessionId();
     }
 
     @Override
     public final void writeExternal(final ObjectOutput out) throws IOException {
-        WritableObjects.writeLongs(out, sequence, retry);
+        WritableObjects.writeLongs(out, sessionId, txSequence);
         out.writeObject(message);
     }
 
@@ -39,14 +40,14 @@ abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externa
     @Override
     public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         final byte header = WritableObjects.readLongHeader(in);
-        sequence = WritableObjects.readFirstLong(in, header);
-        retry = WritableObjects.readSecondLong(in, header);
+        sessionId = WritableObjects.readFirstLong(in, header);
+        txSequence = WritableObjects.readSecondLong(in, header);
         message = (T) in.readObject();
     }
 
-    abstract Envelope<T> createEnvelope(T message, long sequence, long retry);
+    abstract Envelope<T> createEnvelope(T message, long sessionId, long txSequence);
 
     final Object readResolve() {
-        return createEnvelope(message, sequence, retry);
+        return createEnvelope(message, sessionId, txSequence);
     }
 }
\ No newline at end of file
index 4b60aec..48ad0d3 100644 (file)
@@ -15,6 +15,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import javax.annotation.Nonnull;
 import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
 
 /**
  * Abstract Externalizable proxy for use with {@link Message} subclasses.
@@ -27,6 +28,7 @@ import org.opendaylight.yangtools.concepts.WritableIdentifier;
 abstract class AbstractMessageProxy<T extends WritableIdentifier, C extends Message<T, C>> implements Externalizable {
     private static final long serialVersionUID = 1L;
     private T target;
+    private long sequence;
 
     protected AbstractMessageProxy() {
         // For Externalizable
@@ -34,22 +36,25 @@ abstract class AbstractMessageProxy<T extends WritableIdentifier, C extends Mess
 
     AbstractMessageProxy(final @Nonnull C message) {
         this.target = message.getTarget();
+        this.sequence = message.getSequence();
     }
 
     @Override
     public void writeExternal(final ObjectOutput out) throws IOException {
         target.writeTo(out);
+        WritableObjects.writeLong(out, sequence);
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         target = Verify.verifyNotNull(readTarget(in));
+        sequence = WritableObjects.readLong(in);
     }
 
     protected final Object readResolve() {
-        return Verify.verifyNotNull(createMessage(target));
+        return Verify.verifyNotNull(createMessage(target, sequence));
     }
 
     protected abstract @Nonnull T readTarget(@Nonnull DataInput in) throws IOException;
-    abstract @Nonnull C createMessage(@Nonnull T target);
+    abstract @Nonnull C createMessage(@Nonnull T target, long sequence);
 }
\ No newline at end of file
index 97a5f05..a7624a4 100644 (file)
@@ -49,9 +49,9 @@ public abstract class AbstractRequestFailureProxy<T extends WritableIdentifier,
     }
 
     @Override
-    final C createResponse(final T target) {
-        return createFailure(target, cause);
+    final C createResponse(final T target, final long sequence) {
+        return createFailure(target, sequence, cause);
     }
 
-    protected abstract @Nonnull C createFailure(@Nonnull T target, @Nonnull RequestException cause);
+    protected abstract @Nonnull C createFailure(@Nonnull T target, long sequence, @Nonnull RequestException cause);
 }
\ No newline at end of file
index c0bf959..f55b85b 100644 (file)
@@ -42,19 +42,19 @@ public abstract class AbstractRequestProxy<T extends WritableIdentifier, C exten
     @Override
     public void writeExternal(final ObjectOutput out) throws IOException {
         super.writeExternal(out);
-        out.writeUTF(Serialization.serializedActorPath(replyTo));
+        out.writeObject(Serialization.serializedActorPath(replyTo));
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
-        replyTo = JavaSerializer.currentSystem().value().provider().resolveActorRef(in.readUTF());
+        replyTo = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
     }
 
     @Override
-    final @Nonnull C createMessage(@Nonnull final T target) {
-        return createRequest(target, replyTo);
+    final @Nonnull C createMessage(@Nonnull final T target, final long sequence) {
+        return createRequest(target, sequence, replyTo);
     }
 
-    protected abstract @Nonnull C createRequest(@Nonnull T target, @Nonnull ActorRef replyTo);
+    protected abstract @Nonnull C createRequest(@Nonnull T target, long sequence, @Nonnull ActorRef replyTo);
 }
\ No newline at end of file
index 8262bb2..351a6f8 100644 (file)
@@ -31,9 +31,9 @@ abstract class AbstractResponseProxy<T extends WritableIdentifier, C extends Res
     }
 
     @Override
-    final C createMessage(final T target) {
-        return createResponse(target);
+    final C createMessage(final T target, final long sequence) {
+        return createResponse(target, sequence);
     }
 
-    abstract @Nonnull C createResponse(@Nonnull T target);
+    abstract @Nonnull C createResponse(@Nonnull T target, long sequence);
 }
index 639b20c..f1786e0 100644 (file)
@@ -33,9 +33,9 @@ public abstract class AbstractSuccessProxy<T extends WritableIdentifier, C exten
     }
 
     @Override
-    final C createResponse(final T target) {
-        return createSuccess(target);
+    final C createResponse(final T target, final long sequence) {
+        return createSuccess(target, sequence);
     }
 
-    protected abstract @Nonnull C createSuccess(@Nonnull T target);
+    protected abstract @Nonnull C createSuccess(@Nonnull T target, long sequence);
 }
\ No newline at end of file
index cd674b4..5f2d15d 100644 (file)
@@ -16,13 +16,13 @@ public abstract class Envelope<T extends Message<?, ?>> implements Immutable, Se
     private static final long serialVersionUID = 1L;
 
     private final T message;
-    private final long sequence;
-    private final long retry;
+    private final long txSequence;
+    private final long sessionId;
 
-    Envelope(final T message, final long sequence, final long retry) {
+    Envelope(final T message, final long sessionId, final long txSequence) {
         this.message = Preconditions.checkNotNull(message);
-        this.sequence = sequence;
-        this.retry = retry;
+        this.sessionId = sessionId;
+        this.txSequence = txSequence;
     }
 
     /**
@@ -35,27 +35,27 @@ public abstract class Envelope<T extends Message<?, ?>> implements Immutable, Se
     }
 
     /**
-     * Get the message sequence of this envelope.
+     * Get the message transmission sequence of this envelope.
      *
      * @return Message sequence
      */
-    public long getSequence() {
-        return sequence;
+    public long getTxSequence() {
+        return txSequence;
     }
 
     /**
-     * Get the message retry counter.
+     * Get the session identifier.
      *
-     * @return Retry counter
+     * @return Session identifier
      */
-    public long getRetry() {
-        return retry;
+    public long getSessionId() {
+        return sessionId;
     }
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(Envelope.class).add("sequence", Long.toUnsignedString(sequence, 16)).
-                add("retry", retry).add("message", message).toString();
+        return MoreObjects.toStringHelper(Envelope.class).add("sessionId", Long.toHexString(sessionId))
+                .add("txSequence", Long.toHexString(txSequence)).add("message", message).toString();
     }
 
     final Object writeReplace() {
index 8da54f5..6c32ae2 100644 (file)
@@ -10,8 +10,8 @@ package org.opendaylight.controller.cluster.access.concepts;
 public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
     private static final long serialVersionUID = 1L;
 
-    public FailureEnvelope(final RequestFailure<?, ?> message, final long sequence, final long retry) {
-        super(message, sequence, retry);
+    public FailureEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
+        super(message, sessionId, txSequence);
     }
 
     @Override
index a14e698..892b44d 100644 (file)
@@ -19,7 +19,7 @@ final class FailureEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestFa
     }
 
     @Override
-    FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sequence, final long retry) {
-        return new FailureEnvelope(message, sequence, retry);
+    FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
+        return new FailureEnvelope(message, sessionId, txSequence);
     }
 }
index 5070b7c..30e631e 100644 (file)
@@ -50,20 +50,23 @@ import org.opendaylight.yangtools.concepts.WritableIdentifier;
 public abstract class Message<T extends WritableIdentifier, C extends Message<T, C>> implements Immutable,
         Serializable {
     private static final long serialVersionUID = 1L;
-    private final T target;
+
     private final ABIVersion version;
+    private final long sequence;
+    private final T target;
 
-    private Message(final ABIVersion version, final T target) {
+    private Message(final ABIVersion version, final T target, final long sequence) {
         this.target = Preconditions.checkNotNull(target);
         this.version = Preconditions.checkNotNull(version);
+        this.sequence = sequence;
     }
 
-    Message(final T target) {
-        this(ABIVersion.current(), target);
+    Message(final T target, final long sequence) {
+        this(ABIVersion.current(), target, sequence);
     }
 
     Message(final C msg, final ABIVersion version) {
-        this(version, msg.getTarget());
+        this(version, msg.getTarget(), msg.getSequence());
     }
 
     /**
@@ -75,8 +78,17 @@ public abstract class Message<T extends WritableIdentifier, C extends Message<T,
         return target;
     }
 
+    /**
+     * Get the logical sequence number.
+     *
+     * @return logical sequence number
+     */
+    public final long getSequence() {
+        return sequence;
+    }
+
     @VisibleForTesting
-    public final ABIVersion getVersion() {
+    public final @Nonnull ABIVersion getVersion() {
         return version;
     }
 
@@ -129,7 +141,7 @@ public abstract class Message<T extends WritableIdentifier, C extends Message<T,
      * @throws NullPointerException if toStringHelper is null
      */
     protected @Nonnull ToStringHelper addToStringAttributes(final @Nonnull ToStringHelper toStringHelper) {
-        return toStringHelper.add("target", target);
+        return toStringHelper.add("target", target).add("sequence", Long.toUnsignedString(sequence));
     }
 
     /**
index 0892368..2f5cb4f 100644 (file)
@@ -29,8 +29,8 @@ public abstract class Request<T extends WritableIdentifier, C extends Request<T,
     private static final long serialVersionUID = 1L;
     private final ActorRef replyTo;
 
-    protected Request(final @Nonnull T target, final @Nonnull ActorRef replyTo) {
-        super(target);
+    protected Request(final @Nonnull T target, final long sequence, final @Nonnull ActorRef replyTo) {
+        super(target, sequence);
         this.replyTo = Preconditions.checkNotNull(replyTo);
     }
 
index 263664b..1c6e72c 100644 (file)
@@ -12,8 +12,8 @@ import akka.actor.ActorRef;
 public final class RequestEnvelope extends Envelope<Request<?, ?>> {
     private static final long serialVersionUID = 1L;
 
-    public RequestEnvelope(final Request<?, ?> message, final long sequence, final long retry) {
-        super(message, sequence, retry);
+    public RequestEnvelope(final Request<?, ?> message, final long sessionId, final long txSequence) {
+        super(message, sessionId, txSequence);
     }
 
     @Override
@@ -28,7 +28,7 @@ public final class RequestEnvelope extends Envelope<Request<?, ?>> {
      * @throws NullPointerException if cause is null
      */
     public void sendFailure(final RequestException cause) {
-        sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSequence(), getRetry()));
+        sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence()));
     }
 
     /**
@@ -38,7 +38,7 @@ public final class RequestEnvelope extends Envelope<Request<?, ?>> {
      * @throws NullPointerException if success is null
      */
     public void sendSuccess(final RequestSuccess<?, ?> success) {
-        sendResponse(new SuccessEnvelope(success, getSequence(), getRetry()));
+        sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence()));
     }
 
     private void sendResponse(final ResponseEnvelope<?> envelope) {
index 1b499d0..8ab450d 100644 (file)
@@ -19,7 +19,7 @@ final class RequestEnvelopeProxy extends AbstractEnvelopeProxy<Request<?, ?>> {
     }
 
     @Override
-    RequestEnvelope createEnvelope(final Request<?, ?> message, final long sequence, final long retry) {
-        return new RequestEnvelope(message, sequence, retry);
+    RequestEnvelope createEnvelope(final Request<?, ?> message, final long sessionId, final long txSequence) {
+        return new RequestEnvelope(message, sessionId, txSequence);
     }
 }
index 301b54e..ecd2ff4 100644 (file)
@@ -32,8 +32,8 @@ public abstract class RequestFailure<T extends WritableIdentifier, C extends Req
         this.cause = Preconditions.checkNotNull(failure.getCause());
     }
 
-    protected RequestFailure(final @Nonnull T target, final @Nonnull RequestException cause) {
-        super(target);
+    protected RequestFailure(final @Nonnull T target, final long sequence, final @Nonnull RequestException cause) {
+        super(target, sequence);
         this.cause = Preconditions.checkNotNull(cause);
     }
 
index 6ed4598..ad9402f 100644 (file)
@@ -28,8 +28,8 @@ public abstract class RequestSuccess<T extends WritableIdentifier, C extends Req
         super(success, version);
     }
 
-    protected RequestSuccess(final @Nonnull T target) {
-        super(target);
+    protected RequestSuccess(final @Nonnull T target, final long sequence) {
+        super(target, sequence);
     }
 
     @Override
index 1284525..c520da1 100644 (file)
@@ -26,8 +26,8 @@ import org.opendaylight.yangtools.concepts.WritableIdentifier;
 public abstract class Response<T extends WritableIdentifier, C extends Response<T, C>> extends Message<T, C> {
     private static final long serialVersionUID = 1L;
 
-    Response(final @Nonnull T target) {
-        super(target);
+    Response(final @Nonnull T target, final long sequence) {
+        super(target, sequence);
     }
 
     Response(final @Nonnull C response, final @Nonnull ABIVersion version) {
index a3625f6..9f998e7 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.access.concepts;
 public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
     private static final long serialVersionUID = 1L;
 
-    ResponseEnvelope(final T message, final long sequence, final long retry) {
-        super(message, sequence, retry);
+    ResponseEnvelope(final T message, final long sessionId, final long txSequence) {
+        super(message, sessionId, txSequence);
     }
 }
index be37dae..d98e257 100644 (file)
@@ -10,8 +10,8 @@ package org.opendaylight.controller.cluster.access.concepts;
 public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
     private static final long serialVersionUID = 1L;
 
-    public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sequence, final long retry) {
-        super(message, sequence, retry);
+    public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
+        super(message, sessionId, txSequence);
     }
 
     @Override
index 9f0fc2f..50df247 100644 (file)
@@ -19,7 +19,7 @@ final class SuccessEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestSu
     }
 
     @Override
-    SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sequence, final long retry) {
-        return new SuccessEnvelope(message, sequence, retry);
+    SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
+        return new SuccessEnvelope(message, sessionId, txSequence);
     }
 }
index efdfa04..bdffb08 100644 (file)
@@ -26,10 +26,15 @@ import org.opendaylight.controller.cluster.access.ABIVersion;
 public class BackendInfo {
     private final ABIVersion version;
     private final ActorRef actor;
+    private final int maxMessages;
+    private final long sessionId;
 
-    protected BackendInfo(final ActorRef actor, final ABIVersion version) {
+    protected BackendInfo(final ActorRef actor, final long sessionId, final ABIVersion version, final int maxMessages) {
         this.version = Preconditions.checkNotNull(version);
         this.actor = Preconditions.checkNotNull(actor);
+        Preconditions.checkArgument(maxMessages > 0, "Maximum messages has to be positive, not %s", maxMessages);
+        this.maxMessages = maxMessages;
+        this.sessionId = sessionId;
     }
 
     public final ActorRef getActor() {
@@ -40,6 +45,14 @@ public class BackendInfo {
         return version;
     }
 
+    public final int getMaxMessages() {
+        return maxMessages;
+    }
+
+    public final long getSessionId() {
+        return sessionId;
+    }
+
     @Override
     public final int hashCode() {
         return super.hashCode();
@@ -56,6 +69,7 @@ public class BackendInfo {
     }
 
     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-        return toStringHelper.add("actor", actor).add("version", version);
+        return toStringHelper.add("actor", actor).add("sessionId", sessionId).add("version", version)
+                .add("maxMessages", maxMessages);
     }
 }
index 3cdda59..8939ec9 100644 (file)
@@ -83,13 +83,12 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
     }
 
     // This method is executing in the actor context, hence we can safely interact with the queue
-    private ClientActorBehavior doSendRequest(final long sequence, final TransactionRequest<?> request,
-            final RequestCallback callback) {
+    private ClientActorBehavior doSendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
         // Get or allocate queue for the request
         final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
 
         // Note this is a tri-state return and can be null
-        final Optional<FiniteDuration> result = queue.enqueueRequest(sequence, request, callback);
+        final Optional<FiniteDuration> result = queue.enqueueRequest(request, callback);
         if (result == null) {
             // Happy path: we are done here
             return this;
@@ -189,7 +188,7 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
      * @param request Request to send
      * @param callback Callback to invoke
      */
-    public final void sendRequest(final long sequence, final TransactionRequest<?> request, final RequestCallback callback) {
-        context().executeInActor(cb -> cb.doSendRequest(sequence, request, callback));
+    public final void sendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
+        context().executeInActor(cb -> cb.doSendRequest(request, callback));
     }
 }
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java
new file mode 100644 (file)
index 0000000..31d863a
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import java.util.AbstractQueue;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Queue;
+
+/**
+ * A specialized always-empty implementation of {@link java.util.Queue}. This implementation will always refuse new
+ * elements in its {@link #offer(Object)} method.
+
+ * @author Robert Varga
+ *
+ * @param <E> the type of elements held in this collection
+ */
+// TODO: move this class into yangtools.util
+final class EmptyQueue<T> extends AbstractQueue<T> {
+    private static final EmptyQueue<?> INSTANCE = new EmptyQueue<>();
+
+    private EmptyQueue() {
+        // No instances
+    }
+
+    @SuppressWarnings("unchecked")
+    static <T> Queue<T> getInstance() {
+        return (Queue<T>) INSTANCE;
+    }
+
+    @Override
+    public boolean offer(final T e) {
+        return false;
+    }
+
+    @Override
+    public T poll() {
+        return null;
+    }
+
+    @Override
+    public T peek() {
+        return null;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        return Collections.emptyIterator();
+    }
+
+    @Override
+    public int size() {
+        return 0;
+    }
+}
\ No newline at end of file
index 513a3f9..5cf7873 100644 (file)
@@ -10,16 +10,18 @@ package org.opendaylight.controller.cluster.access.client;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
-import java.util.Deque;
+import com.google.common.base.Verify;
+import java.util.ArrayDeque;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,18 +44,32 @@ final class SequencedQueue {
         TimeUnit.NANOSECONDS);
 
     /**
-     * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can
-     * progress at different speeds, these may be completed out of order.
-     *
-     * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to
-     *       figure out a more efficient lookup mechanism to deal with responses which do not match the queue
-     *       order.
+     * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
+     * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
+     * resolution.
      */
-    private final Deque<SequencedQueueEntry> queue = new LinkedList<>();
+    private static final int DEFAULT_TX_LIMIT = 1000;
+
     private final Ticker ticker;
     private final Long cookie;
 
-    // Updated/consulted from actor context only
+    /*
+     * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
+     * with the limit changing between reconnects (which imply retransmission).
+     *
+     * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
+     * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
+     * and one for requests which have not been transmitted at all.
+     *
+     * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
+     * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
+     * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
+     * to retransmit -- hence the second queue.
+     */
+    private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
+    private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
+    private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
+
     /**
      * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
      * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
@@ -62,6 +78,11 @@ final class SequencedQueue {
     private CompletionStage<? extends BackendInfo> backendProof;
     private BackendInfo backend;
 
+    // This is not final because we need to be able to replace it.
+    private long txSequence;
+
+    private int lastTxLimit = DEFAULT_TX_LIMIT;
+
     /**
      * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
      */
@@ -86,12 +107,16 @@ final class SequencedQueue {
         Preconditions.checkState(notClosed, "Queue %s is closed", this);
     }
 
+    private long nextTxSequence() {
+        return txSequence++;
+    }
+
     /**
      * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
      * the following scenarios:
      * 1) The request has been enqueued and transmitted. No further actions are necessary
      * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
-     * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that
+     * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
      *    process needs to complete before transmission occurs
      *
      * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
@@ -105,21 +130,32 @@ final class SequencedQueue {
      * @param callback Callback to be invoked
      * @return Optional duration with semantics described above.
      */
-    @Nullable Optional<FiniteDuration> enqueueRequest(final long sequence, final Request<?, ?> request,
-            final RequestCallback callback) {
+    @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
         checkNotClosed();
 
         final long now = ticker.read();
-        final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now);
-
-        queue.add(e);
-        LOG.debug("Enqueued request {} to queue {}", request, this);
-
+        final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
         if (backend == null) {
+            LOG.debug("No backend available, request resolution");
+            pending.add(e);
             return Optional.empty();
         }
+        if (!lastInflight.isEmpty()) {
+            LOG.debug("Retransmit not yet complete, delaying request {}", request);
+            pending.add(e);
+            return null;
+        }
+        if (currentInflight.size() >= lastTxLimit) {
+            LOG.debug("Queue is at capacity, delayed sending of request {}", request);
+            pending.add(e);
+            return null;
+        }
 
-        e.retransmit(backend, now);
+        // Ready to transmit
+        currentInflight.offer(e);
+        LOG.debug("Enqueued request {} to queue {}", request, this);
+
+        e.retransmit(backend, nextTxSequence(), now);
         if (expectingTimer == null) {
             expectingTimer = now + REQUEST_TIMEOUT_NANOS;
             return Optional.of(INITIAL_REQUEST_TIMEOUT);
@@ -128,53 +164,162 @@ final class SequencedQueue {
         }
     }
 
-    ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
-        // Responses to different targets may arrive out of order, hence we use an iterator
+    /*
+     * We are using tri-state return here to indicate one of three conditions:
+     * - if a matching entry is found, return an Optional containing it
+     * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
+     * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
+     */
+    private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
+            final ResponseEnvelope<?> envelope) {
+        // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
+        // to use an iterator
         final Iterator<SequencedQueueEntry> it = queue.iterator();
         while (it.hasNext()) {
             final SequencedQueueEntry e = it.next();
-            if (e.acceptsResponse(response)) {
-                lastProgress = ticker.read();
-                it.remove();
-                LOG.debug("Completing request {} with {}", e, response);
-                return e.complete(response.getMessage());
+            final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
+
+            final Request<?, ?> request = e.getRequest();
+            final Response<?, ?> response = envelope.getMessage();
+
+            // First check for matching target, or move to next entry
+            if (!request.getTarget().equals(response.getTarget())) {
+                continue;
+            }
+
+            // Sanity-check logical sequence, ignore any out-of-order messages
+            if (request.getSequence() != response.getSequence()) {
+                LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
+                return Optional.empty();
+            }
+
+            // Now check session match
+            if (envelope.getSessionId() != txDetails.getSessionId()) {
+                LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
+                return Optional.empty();
+            }
+            if (envelope.getTxSequence() != txDetails.getTxSequence()) {
+                LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
+                return Optional.empty();
             }
+
+            LOG.debug("Completing request {} with {}", request, envelope);
+            it.remove();
+            return Optional.of(e);
         }
 
-        LOG.debug("No request matching {} found", response);
-        return current;
+        return null;
+    }
+
+    ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
+        Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
+        if (maybeEntry == null) {
+            maybeEntry = findMatchingEntry(lastInflight, envelope);
+        }
+
+        if (maybeEntry == null || !maybeEntry.isPresent()) {
+            LOG.warn("No request matching {} found, ignoring response", envelope);
+            return current;
+        }
+
+        lastProgress = ticker.read();
+        final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
+
+        // We have freed up a slot, try to transmit something
+        if (backend != null) {
+            final int toSend = lastTxLimit - currentInflight.size();
+            if (toSend > 0) {
+                runTransmit(toSend);
+            }
+        }
+
+        return ret;
+    }
+
+    private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
+        int toSend = count;
+
+        while (toSend > 0) {
+            final SequencedQueueEntry e = queue.poll();
+            if (e == null) {
+                break;
+            }
+
+            LOG.debug("Transmitting entry {}", e);
+            e.retransmit(backend, nextTxSequence(), lastProgress);
+            toSend--;
+        }
+
+        return toSend;
+    }
+
+    private void runTransmit(final int count) {
+        final int toSend;
+
+        // Process lastInflight first, possibly clearing it
+        if (!lastInflight.isEmpty()) {
+            toSend = transmitEntries(lastInflight, count);
+            if (lastInflight.isEmpty()) {
+                // We won't be needing the queue anymore, change it to specialized implementation
+                lastInflight = EmptyQueue.getInstance();
+            }
+        } else {
+            toSend = count;
+        }
+
+        // Process pending next.
+        transmitEntries(pending, toSend);
     }
 
     Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
+        Preconditions.checkNotNull(backend);
         if (!proof.equals(backendProof)) {
             LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
             return Optional.empty();
         }
 
-        this.backend = Preconditions.checkNotNull(backend);
-        backendProof = null;
         LOG.debug("Resolved backend {}",  backend);
 
-        if (queue.isEmpty()) {
-            // No pending requests, hence no need for a timer
-            return Optional.empty();
+        // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
+        // and also not to exceed new limits
+        final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
+        newLast.addAll(currentInflight);
+        newLast.addAll(lastInflight);
+        lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
+
+        // Clear currentInflight, possibly compacting it
+        final int txLimit = backend.getMaxMessages();
+        if (lastTxLimit > txLimit) {
+            currentInflight = new ArrayDeque<>();
+        } else {
+            currentInflight.clear();
         }
 
-        LOG.debug("Resending requests to backend {}", backend);
-        final long now = ticker.read();
-        for (SequencedQueueEntry e : queue) {
-            e.retransmit(backend, now);
-        }
+        // We are ready to roll
+        this.backend = backend;
+        backendProof = null;
+        txSequence = 0;
+        lastTxLimit = txLimit;
+        lastProgress = ticker.read();
 
-        if (expectingTimer != null) {
-            // We already have a timer going, no need to schedule a new one
+        // No pending requests, return
+        if (lastInflight.isEmpty() && pending.isEmpty()) {
             return Optional.empty();
         }
 
-        // Above loop may have cost us some time. Recalculate timeout.
-        final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
-        expectingTimer = nextTicks;
-        return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS));
+        LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
+
+        runTransmit(lastTxLimit);
+
+        // Calculate next timer if necessary
+        if (expectingTimer == null) {
+            // Request transmission may have cost us some time. Recalculate timeout.
+            final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
+            expectingTimer = nextTicks;
+            return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
+        } else {
+            return Optional.empty();
+        }
     }
 
     boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
@@ -189,7 +334,7 @@ final class SequencedQueue {
     }
 
     boolean hasCompleted() {
-        return !notClosed && queue.isEmpty();
+        return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
     }
 
     /**
@@ -203,7 +348,7 @@ final class SequencedQueue {
         expectingTimer = null;
         final long now = ticker.read();
 
-        if (!queue.isEmpty()) {
+        if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
             final long ticksSinceProgress = now - lastProgress;
             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
@@ -216,7 +361,7 @@ final class SequencedQueue {
         }
 
         // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
-        final SequencedQueueEntry head = queue.peek();
+        final SequencedQueueEntry head = currentInflight.peek();
         if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
             backend = null;
             LOG.debug("Queue {} invalidated backend info", this);
@@ -226,14 +371,17 @@ final class SequencedQueue {
         }
     }
 
+    private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
+        queue.forEach(e -> e.poison(cause));
+        queue.clear();
+    }
+
     void poison(final RequestException cause) {
         close();
 
-        SequencedQueueEntry e = queue.poll();
-        while (e != null) {
-            e.poison(cause);
-            e = queue.poll();
-        }
+        poisonQueue(currentInflight, cause);
+        poisonQueue(lastInflight, cause);
+        poisonQueue(pending, cause);
     }
 
     // FIXME: add a caller from ClientSingleTransaction
index 83e3e07..8814d50 100644 (file)
@@ -10,12 +10,11 @@ package org.opendaylight.controller.cluster.access.client;
 import akka.actor.ActorRef;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
-import java.util.Optional;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,49 +22,31 @@ import org.slf4j.LoggerFactory;
  * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information.
  *
  * @author Robert Varga
- *
- * @param <I> Target identifier type
  */
 final class SequencedQueueEntry {
-    private static final class LastTry {
-        final long timeTicks;
-        final long retry;
-
-        LastTry(final long retry, final long timeTicks) {
-            this.retry = retry;
-            this.timeTicks = timeTicks;
-        }
-    }
-
     private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class);
 
     private final Request<?, ?> request;
     private final RequestCallback callback;
     private final long enqueuedTicks;
-    private final long sequence;
 
-    private Optional<LastTry> lastTry = Optional.empty();
+    private TxDetails txDetails;
 
-    SequencedQueueEntry(final Request<?, ?> request, final long sequence, final RequestCallback callback,
+    SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback,
         final long now) {
         this.request = Preconditions.checkNotNull(request);
         this.callback = Preconditions.checkNotNull(callback);
         this.enqueuedTicks = now;
-        this.sequence = sequence;
     }
 
-    long getSequence() {
-        return sequence;
+    Request<?, ?> getRequest() {
+        return request;
     }
 
-    boolean acceptsResponse(final ResponseEnvelope<?> response) {
-        return getSequence() == response.getSequence() && request.getTarget().equals(response.getMessage().getTarget());
+    @Nullable TxDetails getTxDetails() {
+        return txDetails;
     }
 
-    long getCurrentTry() {
-        return lastTry.isPresent() ? lastTry.get().retry : 0;
-     }
-
     ClientActorBehavior complete(final Response<?, ?> response) {
         LOG.debug("Completing request {} with {}", request, response);
         return callback.complete(response);
@@ -79,8 +60,8 @@ final class SequencedQueueEntry {
     boolean isTimedOut(final long now, final long timeoutNanos) {
         final long elapsed;
 
-        if (lastTry.isPresent()) {
-            elapsed = now - lastTry.get().timeTicks;
+        if (txDetails != null) {
+            elapsed = now - txDetails.getTimeTicks();
         } else {
             elapsed = now - enqueuedTicks;
         }
@@ -93,18 +74,19 @@ final class SequencedQueueEntry {
         }
     }
 
-    void retransmit(final BackendInfo backend, final long now) {
-        final long retry = lastTry.isPresent() ? lastTry.get().retry + 1 : 0;
-        final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), sequence, retry);
+    void retransmit(final BackendInfo backend, final long txSequence, final long now) {
+        final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()),
+            backend.getSessionId(), txSequence);
 
         final ActorRef actor = backend.getActor();
-        LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
+        LOG.trace("Transmitting request {} as {} to {}", request, toSend, actor);
         actor.tell(toSend, ActorRef.noSender());
-        lastTry = Optional.of(new LastTry(retry, now));
+        txDetails = new TxDetails(backend.getSessionId(), txSequence, now);
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString();
     }
+
 }
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TxDetails.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TxDetails.java
new file mode 100644 (file)
index 0000000..84d4b62
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+/**
+ * Holder class for transmission details about a particular {@link SequencedQueueEntry}.
+ *
+ * @author Robert Varga
+ */
+final class TxDetails {
+    private final long sessionId;
+    private final long txSequence;
+    private final long timeTicks;
+
+    TxDetails(final long sessionId, final long txSequence, final long timeTicks) {
+        this.sessionId = sessionId;
+        this.txSequence = txSequence;
+        this.timeTicks = timeTicks;
+    }
+
+    long getSessionId() {
+        return sessionId;
+    }
+
+    long getTxSequence() {
+        return txSequence;
+    }
+
+    long getTimeTicks() {
+        return timeTicks;
+    }
+}
\ No newline at end of file
index aecd238..dca3966 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.client;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -49,7 +50,7 @@ public class SequencedQueueEntryTest {
         private static final long serialVersionUID = 1L;
 
         MockFailure(final WritableIdentifier target, final RequestException cause) {
-            super(target, cause);
+            super(target, 0, cause);
         }
 
         @Override
@@ -67,7 +68,7 @@ public class SequencedQueueEntryTest {
         private static final long serialVersionUID = 1L;
 
         MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
-            super(target, replyTo);
+            super(target, 0, replyTo);
         }
 
         @Override
@@ -127,11 +128,11 @@ public class SequencedQueueEntryTest {
         ticker.increment(ThreadLocalRandom.current().nextLong());
 
         mockActor = TestProbe.apply(actorSystem);
-        mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
+        mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
 
-        entry = new SequencedQueueEntry(mockRequest, 0, mockCallback, ticker.read());
+        entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read());
     }
 
     @After
@@ -140,19 +141,14 @@ public class SequencedQueueEntryTest {
     }
 
     @Test
-    public void testGetSequence() {
-        assertEquals(0, entry.getSequence());
-    }
-
-    @Test
-    public void testGetCurrentTry() {
-        assertEquals(0, entry.getCurrentTry());
-        entry.retransmit(mockBackendInfo, ticker.read());
-        assertEquals(0, entry.getCurrentTry());
-        entry.retransmit(mockBackendInfo, ticker.read());
-        assertEquals(1, entry.getCurrentTry());
-        entry.retransmit(mockBackendInfo, ticker.read());
-        assertEquals(2, entry.getCurrentTry());
+    public void testGetTxDetails() {
+        assertNull(entry.getTxDetails());
+        entry.retransmit(mockBackendInfo, 0, ticker.read());
+        assertEquals(0, entry.getTxDetails().getTxSequence());
+        entry.retransmit(mockBackendInfo, 1, ticker.read());
+        assertEquals(1, entry.getTxDetails().getTxSequence());
+        entry.retransmit(mockBackendInfo, 3, ticker.read());
+        assertEquals(3, entry.getTxDetails().getTxSequence());
     }
 
     @Test
@@ -175,13 +171,13 @@ public class SequencedQueueEntryTest {
         assertTrue(entry.isTimedOut(ticker.read(), 0));
         assertFalse(entry.isTimedOut(ticker.read(), 1));
 
-        entry.retransmit(mockBackendInfo, ticker.read());
+        entry.retransmit(mockBackendInfo, 0, ticker.read());
         assertTrue(entry.isTimedOut(ticker.read(), 0));
         ticker.increment(10);
         assertTrue(entry.isTimedOut(ticker.read(), 10));
         assertFalse(entry.isTimedOut(ticker.read(), 20));
 
-        entry.retransmit(mockBackendInfo, ticker.read());
+        entry.retransmit(mockBackendInfo, 1, ticker.read());
         assertTrue(entry.isTimedOut(ticker.read(), 0));
         ticker.increment(10);
         assertTrue(entry.isTimedOut(ticker.read(), 10));
@@ -191,7 +187,7 @@ public class SequencedQueueEntryTest {
     @Test
     public void testRetransmit() {
         assertFalse(mockActor.msgAvailable());
-        entry.retransmit(mockBackendInfo, ticker.read());
+        entry.retransmit(mockBackendInfo, 0, ticker.read());
 
         assertTrue(mockActor.msgAvailable());
         assertRequestEquals(mockRequest, mockActor.receiveOne(Duration.apply(5, TimeUnit.SECONDS)));
@@ -201,8 +197,8 @@ public class SequencedQueueEntryTest {
          assertTrue(o instanceof RequestEnvelope);
 
          final RequestEnvelope actual = (RequestEnvelope) o;
-         assertEquals(0, actual.getRetry());
-         assertEquals(0, actual.getSequence());
+         assertEquals(0, actual.getSessionId());
+         assertEquals(0, actual.getTxSequence());
          assertEquals(expected.getTarget(), actual.getMessage().getTarget());
     }
 }
index b05ca3a..fb92de9 100644 (file)
@@ -54,7 +54,7 @@ public class SequencedQueueTest {
         private static final long serialVersionUID = 1L;
 
         MockFailure(final WritableIdentifier target, final RequestException cause) {
-            super(target, cause);
+            super(target, 0, cause);
         }
 
         @Override
@@ -72,7 +72,7 @@ public class SequencedQueueTest {
         private static final long serialVersionUID = 1L;
 
         MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
-            super(target, replyTo);
+            super(target, 0, replyTo);
         }
 
         @Override
@@ -135,7 +135,7 @@ public class SequencedQueueTest {
         ticker.increment(ThreadLocalRandom.current().nextLong());
 
         mockActor = TestProbe.apply(actorSystem);
-        mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
+        mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
         mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
@@ -167,7 +167,7 @@ public class SequencedQueueTest {
         queue.close();
 
         // Kaboom
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
     }
 
     @Test
@@ -178,7 +178,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testPoison() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
         queue.poison(mockCause);
 
         final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
@@ -192,7 +192,7 @@ public class SequencedQueueTest {
         queue.poison(mockCause);
 
         // Kaboom
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
     }
 
     @Test
@@ -203,7 +203,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testEnqueueRequestNeedsBackend() {
-        final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
+        final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
 
         assertNotNull(ret);
         assertFalse(ret.isPresent());
@@ -225,7 +225,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testSetBackendWithNoResolution() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
@@ -235,7 +235,7 @@ public class SequencedQueueTest {
 
     @Test
     public void testSetBackendWithWrongProof() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         assertTrue(queue.expectProof(proof));
@@ -252,8 +252,8 @@ public class SequencedQueueTest {
     }
 
     @Test
-    public void testSetbackedWithRequestsNoTimer() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+    public void testSetBackendWithRequestsNoTimer() {
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
         assertTrue(queue.expectProof(proof));
@@ -270,7 +270,7 @@ public class SequencedQueueTest {
     public void testEnqueueRequestNeedsTimer() {
         setupBackend();
 
-        final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
+        final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
         assertNotNull(ret);
         assertTrue(ret.isPresent());
         assertTransmit(mockRequest, 0);
@@ -281,13 +281,13 @@ public class SequencedQueueTest {
         setupBackend();
 
         // First request
-        Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
+        Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
         assertNotNull(ret);
         assertTrue(ret.isPresent());
         assertTransmit(mockRequest, 0);
 
         // Second request, no timer fired
-        ret = queue.enqueueRequest(1, mockRequest2, mockCallback);
+        ret = queue.enqueueRequest(mockRequest2, mockCallback);
         assertNull(ret);
         assertTransmit(mockRequest2, 1);
     }
@@ -300,14 +300,14 @@ public class SequencedQueueTest {
 
     @Test
     public void testRunTimeoutWithoutShift() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
         final boolean ret = queue.runTimeout();
         assertFalse(ret);
     }
 
     @Test
     public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1);
 
@@ -317,7 +317,9 @@ public class SequencedQueueTest {
 
     @Test
     public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        setupBackend();
+
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS);
 
@@ -327,7 +329,9 @@ public class SequencedQueueTest {
 
     @Test
     public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        setupBackend();
+
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1);
 
@@ -337,7 +341,7 @@ public class SequencedQueueTest {
 
     @Test(expected=NoProgressException.class)
     public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
 
@@ -347,7 +351,7 @@ public class SequencedQueueTest {
 
     @Test(expected=NoProgressException.class)
     public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
 
@@ -382,7 +386,9 @@ public class SequencedQueueTest {
 
     @Test
     public void testCompleteSingle() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        setupBackend();
+
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
         verify(mockCallback).complete(mockResponse);
@@ -395,7 +401,9 @@ public class SequencedQueueTest {
 
     @Test
     public void testCompleteNull() {
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        setupBackend();
+
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         doReturn(null).when(mockCallback).complete(mockResponse);
 
@@ -408,10 +416,10 @@ public class SequencedQueueTest {
     public void testProgressRecord() throws NoProgressException {
         setupBackend();
 
-        queue.enqueueRequest(0, mockRequest, mockCallback);
+        queue.enqueueRequest(mockRequest, mockCallback);
 
         ticker.increment(10);
-        queue.enqueueRequest(1, mockRequest2, mockCallback);
+        queue.enqueueRequest(mockRequest2, mockCallback);
         queue.complete(mockBehavior, mockResponseEnvelope);
 
         ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11);
@@ -436,8 +444,8 @@ public class SequencedQueueTest {
         assertTrue(o instanceof RequestEnvelope);
 
         final RequestEnvelope actual = (RequestEnvelope) o;
-        assertEquals(0, actual.getRetry());
-        assertEquals(sequence, actual.getSequence());
+        assertEquals(0, actual.getSessionId());
+        assertEquals(sequence, actual.getTxSequence());
         assertSame(expected, actual.getMessage());
     }
 }
index 605de67..f59703c 100644 (file)
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-slf4j_${scala.version}</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.scala-lang.modules</groupId>
+      <artifactId>scala-java8-compat_${scala.version}</artifactId>
+    </dependency>
     <dependency>
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-testkit_${scala.version}</artifactId>
index 60919c0..cd104b5 100644 (file)
@@ -67,7 +67,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             final LocalHistoryIdentifier historyId, final long transactionId,
             final java.util.Optional<ShardBackendInfo> backend) {
 
-        final java.util.Optional<DataTree> dataTree = backend.flatMap(t -> t.getDataTree());
+        final java.util.Optional<DataTree> dataTree = backend.flatMap(ShardBackendInfo::getDataTree);
         final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId);
         if (dataTree.isPresent()) {
             return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot());
@@ -141,7 +141,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         checkSealed();
 
         final SettableFuture<Boolean> ret = SettableFuture.create();
-        client().sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(false)), t -> {
+        client().sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
             if (t instanceof TransactionCommitSuccess) {
                 ret.set(Boolean.TRUE);
             } else if (t instanceof RequestFailure) {
@@ -156,7 +156,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void abort(final VotingFuture<Void> ret) {
         checkSealed();
 
-        client.sendRequest(nextSequence(), new TransactionAbortRequest(getIdentifier(), client().self()), t -> {
+        client.sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), client().self()), t -> {
             if (t instanceof TransactionAbortSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -170,7 +170,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void canCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        client.sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(true)), t -> {
+        client.sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
             if (t instanceof TransactionCanCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -184,7 +184,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void preCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        client.sendRequest(nextSequence(), new TransactionPreCommitRequest(getIdentifier(), client().self()), t-> {
+        client.sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
             if (t instanceof TransactionPreCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -198,7 +198,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void doCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        client.sendRequest(nextSequence(), new TransactionDoCommitRequest(getIdentifier(), client().self()), t-> {
+        client.sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
             if (t instanceof TransactionCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -224,5 +224,4 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     abstract void doAbort();
 
     abstract TransactionRequest<?> doCommit(boolean coordinated);
-
 }
index a99ea3d..b84008c 100644 (file)
@@ -174,8 +174,8 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
         transactions.remove(transaction.getIdentifier());
     }
 
-    void sendRequest(final long sequence, final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
-        sendRequest(sequence, request, response -> {
+    void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
+        sendRequest(request, response -> {
             completer.accept(response);
             return this;
         });
index 1dec846..9e787f1 100644 (file)
@@ -87,7 +87,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void doAbort() {
-        client().sendRequest(nextSequence(), new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER);
+        client().sendRequest(new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER);
         modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted"));
     }
 
index 0633b68..359b428 100644 (file)
@@ -7,30 +7,33 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.OnComplete;
+import akka.actor.ActorRef;
+import akka.japi.Function;
+import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableBiMap.Builder;
 import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.client.BackendInfo;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
+import scala.compat.java8.FutureConverters;
 
 /**
  * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
@@ -40,8 +43,6 @@ import scala.concurrent.ExecutionContext;
  * @author Robert Varga
  */
 final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
-    private static final ExecutionContext DIRECT_EXECUTION_CONTEXT =
-            ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
     private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
     private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
 
@@ -54,6 +55,8 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
     private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
 
     private final ActorContext actorContext;
+    // FIXME: this counter should be in superclass somewhere
+    private final AtomicLong nextSessionId = new AtomicLong();
 
     @GuardedBy("this")
     private long nextShard = 1;
@@ -105,39 +108,37 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
             return NULL_FUTURE;
         }
 
-        final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<>();
+        final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<ShardBackendInfo>();
+
+        FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
+            LOG.debug("Looking up primary info for {} from {}", shardName, info);
+            return FutureConverters.toJava(Patterns.ask(info.getPrimaryShardActor(),
+                (Function<ActorRef, Object>) replyTo -> new ConnectClientRequest(null, replyTo,
+                    ABIVersion.BORON, ABIVersion.current()), DEAD_TIMEOUT));
+        }).thenApply(response -> {
+            if (response instanceof RequestFailure) {
+                final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
+                LOG.debug("Connect request failed {}", failure, failure.getCause());
+                throw Throwables.propagate(failure.getCause());
+            }
 
-        actorContext.findPrimaryShardAsync(shardName).onComplete(new OnComplete<PrimaryShardInfo>() {
-            @Override
-            public void onComplete(final Throwable t, final PrimaryShardInfo v) {
-                if (t != null) {
-                    ret.completeExceptionally(t);
-                } else {
-                    ret.complete(createBackendInfo(v, shardName, cookie));
-                }
+            LOG.debug("Resolved backend information to {}", response);
+
+            Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
+            final ConnectClientSuccess success = (ConnectClientSuccess) response;
+
+            return new ShardBackendInfo(success.getBackend(),
+                nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
+                success.getDataTree(), success.getMaxMessages());
+        }).whenComplete((info, t) -> {
+            if (t != null) {
+                ret.completeExceptionally(t);
+            } else {
+                ret.complete(info);
             }
-        }, DIRECT_EXECUTION_CONTEXT);
+        });
 
         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
         return ret;
     }
-
-    private static ABIVersion toABIVersion(final short version) {
-        switch (version) {
-            case DataStoreVersions.BORON_VERSION:
-                return ABIVersion.BORON;
-        }
-
-        throw new IllegalArgumentException("Unsupported version " + version);
-    }
-
-    private static ShardBackendInfo createBackendInfo(final Object result, final String shardName, final Long cookie) {
-        Preconditions.checkArgument(result instanceof PrimaryShardInfo);
-        final PrimaryShardInfo info = (PrimaryShardInfo) result;
-
-        LOG.debug("Creating backend information for {}", info);
-        return new ShardBackendInfo(info.getPrimaryShardActor().resolveOne(DEAD_TIMEOUT).value().get().get(),
-            toABIVersion(info.getPrimaryShardVersion()), shardName, UnsignedLong.fromLongBits(cookie),
-            info.getLocalShardDataTree());
-     }
 }
index 13b7fb4..9fb1b89 100644 (file)
@@ -95,21 +95,21 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
         // Make sure we send any modifications before issuing a read
         ensureFlushedBuider();
-        client().sendRequest(nextSequence(), request, completer);
+        client().sendRequest(request, completer);
         return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
     }
 
     @Override
     CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
         final SettableFuture<Boolean> future = SettableFuture.create();
-        return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), client().self(), path),
+        return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
             t -> completeExists(future, t), future);
     }
 
     @Override
     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
         final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
-        return sendReadRequest(new ReadTransactionRequest(getIdentifier(), client().self(), path),
+        return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
             t -> completeRead(future, t), future);
     }
 
@@ -122,6 +122,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     private void ensureInitializedBuider() {
         if (!builderBusy) {
+            builder.setSequence(nextSequence());
             builderBusy = true;
         }
     }
@@ -133,7 +134,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     }
 
     private void flushBuilder() {
-        client().sendRequest(nextSequence(), builder.build(), this::completeModify);
+        client().sendRequest(builder.build(), this::completeModify);
         builderBusy = false;
     }
 
index 6bb7072..92a213d 100644 (file)
@@ -30,9 +30,9 @@ final class ShardBackendInfo extends BackendInfo {
     private final UnsignedLong cookie;
     private final String shardName;
 
-    ShardBackendInfo(final ActorRef actor, final ABIVersion version, final String shardName, final UnsignedLong cookie,
-        final Optional<DataTree> dataTree) {
-        super(actor, version);
+    ShardBackendInfo(final ActorRef actor, final long sessionId, final ABIVersion version, final String shardName,
+        final UnsignedLong cookie, final Optional<DataTree> dataTree, final int maxMessages) {
+        super(actor, sessionId, version, maxMessages);
         this.shardName = Preconditions.checkNotNull(shardName);
         this.cookie = Preconditions.checkNotNull(cookie);
         this.dataTree = Preconditions.checkNotNull(dataTree);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.