From: Robert Varga Date: Mon, 8 Aug 2016 15:04:54 +0000 (+0200) Subject: BUG-5280: separate request sequence and transmit sequence X-Git-Tag: release/carbon~509 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=9b4f21460c6dcb10c381df631d064d05de16546c BUG-5280: separate request sequence and transmit sequence Clean up the confusion in sequence numbers. There are actually two sequences: - logical request sequence, which indicates the order of requests in which they should be applied to the target entity. It is assigned by logic emitting the request. - transmit sequence within a connection to the backend, as initiated by ConnectClientRequest. It is assigned by SequencedQueue as it is transmitting requests and reset when a new connection to the backend is made. This requires establishing the concept of a session, which is a single conversation between frontend and backend. It is severed whenever the frontend times out and re-established when the leader is found and it responds with ConnectClientSuccess. The sending of ConnectClientRequest is not done via the queue, as it is part of backend resolution process. Since this is not a performance-critical path, we use simple Patterns.ask() to send the request and get completion notification -- which we then translate to ShardBackendInfo. ConnectClientSuccess gives us backend-preferred version and backend-specified cap on the number of outstanding messages then it can handle concurrently. This maximum is used to limit the transmit path of SequencedQueue, so that it does not attempt to send more requests at any given time. Internal queue for unsent requests is kept unbounded for now, subject to a Semaphore-driven throttle in a follow-up patch. Change-Id: I61663073bf6632c1ed8c036dee37f1ac39cf7794 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbortLocalTransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbortLocalTransactionRequest.java index 104530651d..5dc630cac5 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbortLocalTransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbortLocalTransactionRequest.java @@ -25,6 +25,6 @@ public final class AbortLocalTransactionRequest extends AbstractLocalTransaction public AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo) { - super(identifier, replyTo); + super(identifier, 0, replyTo); } } \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java index 7730c5d615..bc9bb6225b 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java @@ -23,8 +23,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier abstract class AbstractLocalTransactionRequest> extends TransactionRequest { private static final long serialVersionUID = 1L; - AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) { - super(identifier, replyTo); + AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo) { + super(identifier, sequence, replyTo); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java index d23430fa03..a8fbbcfb33 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java @@ -32,9 +32,9 @@ public abstract class AbstractReadTransactionRequest { private static final long serialVersionUID = 1L; - ConnectClientFailure(final ClientIdentifier target, final RequestException cause) { - super(target, cause); + ConnectClientFailure(final ClientIdentifier target, final long sequence, final RequestException cause) { + super(target, sequence, cause); } private ConnectClientFailure(final ConnectClientFailure failure, final ABIVersion version) { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientFailureProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientFailureProxyV1.java index 5635754aee..7059fe7ec1 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientFailureProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ConnectClientFailureProxyV1.java @@ -29,8 +29,9 @@ final class ConnectClientFailureProxyV1 extends AbstractRequestFailureProxy { private ABIVersion minVersion; private ABIVersion maxVersion; - private long resumeSequence; public ConnectClientRequestProxyV1() { // for Externalizable @@ -36,7 +34,6 @@ final class ConnectClientRequestProxyV1 extends AbstractRequestProxy alternates; private final DataTree dataTree; private final ActorRef backend; - private final long maxMessages; + private final int maxMessages; - ConnectClientSuccess(final ClientIdentifier target, final ActorRef backend, final List alternates, - final Optional dataTree, final long maxMessages) { - super(target); + ConnectClientSuccess(final ClientIdentifier target, final long sequence, final ActorRef backend, + final List alternates, final Optional dataTree, final int maxMessages) { + super(target, sequence); this.backend = Preconditions.checkNotNull(backend); this.alternates = ImmutableList.copyOf(alternates); this.dataTree = dataTree.orElse(null); @@ -47,9 +47,10 @@ public final class ConnectClientSuccess extends RequestSuccess alternates, final @Nonnull DataTree dataTree, final long maxMessages) { - this(target, backend, alternates, Optional.of(dataTree), maxMessages); + public ConnectClientSuccess(final @Nonnull ClientIdentifier target, final long sequence, + final @Nonnull ActorRef backend, final @Nonnull List alternates, + final @Nonnull DataTree dataTree, final int maxMessages) { + this(target, sequence, backend, alternates, Optional.of(dataTree), maxMessages); } /** @@ -69,7 +70,7 @@ public final class ConnectClientSuccess extends RequestSuccess alternates; private ActorRef backend; - private long maxMessages; + private int maxMessages; public ConnectClientSuccessProxyV1() { // For Externalizable @@ -42,6 +42,7 @@ final class ConnectClientSuccessProxyV1 extends AbstractSuccessProxy(backendsSize); - for (int i = 0; i < backendsSize; ++i) { + final int alternatesSize = in.readInt(); + alternates = new ArrayList<>(alternatesSize); + for (int i = 0; i < alternatesSize; ++i) { alternates.add(ActorSelection.apply(ActorRef.noSender(), (String)in.readObject())); } - - maxMessages = in.readLong(); } @Override - protected ConnectClientSuccess createSuccess(final ClientIdentifier target) { - return new ConnectClientSuccess(target, backend, alternates, Optional.empty(), maxMessages); + protected ConnectClientSuccess createSuccess(final ClientIdentifier target, final long sequence) { + return new ConnectClientSuccess(target, sequence, backend, alternates, Optional.empty(), maxMessages); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java index 28a77c4426..eebcf36c4d 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java @@ -22,7 +22,11 @@ public final class CreateLocalHistoryRequest extends LocalHistoryRequest { private static final long serialVersionUID = 1L; - public DestroyLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { - super(target, replyTo); + public DestroyLocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) { + super(target, sequence, replyTo); } private DestroyLocalHistoryRequest(final DestroyLocalHistoryRequest request, final ABIVersion version) { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java index 9e953ceb21..845d0193e2 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java @@ -28,7 +28,8 @@ final class DestroyLocalHistoryRequestProxyV1 extends AbstractLocalHistoryReques } @Override - protected DestroyLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { - return new DestroyLocalHistoryRequest(target, replyTo); + protected DestroyLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final long sequence, + final ActorRef replyTo) { + return new DestroyLocalHistoryRequest(target, sequence, replyTo); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequest.java index cbb612f9ec..f689e2552f 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequest.java @@ -23,9 +23,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; public final class ExistsTransactionRequest extends AbstractReadTransactionRequest { private static final long serialVersionUID = 1L; - public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo, - final @Nonnull YangInstanceIdentifier path) { - super(identifier, replyTo, path); + public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence, + final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) { + super(identifier, sequence, replyTo, path); } private ExistsTransactionRequest(final ExistsTransactionRequest request, final ABIVersion version) { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java index 4f5bd1c42f..84f74004fe 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java @@ -29,8 +29,8 @@ final class ExistsTransactionRequestProxyV1 extends AbstractReadTransactionReque } @Override - ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo, - final YangInstanceIdentifier path) { - return new ExistsTransactionRequest(target, replyTo, path); + ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence, + final ActorRef replyTo, final YangInstanceIdentifier path) { + return new ExistsTransactionRequest(target, sequence, replyTo, path); } } \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccess.java index 5481d0de40..8a1704de76 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccess.java @@ -23,8 +23,8 @@ public final class ExistsTransactionSuccess extends TransactionSuccess { private static final long serialVersionUID = 1L; - LocalHistoryFailure(final LocalHistoryIdentifier target, final RequestException cause) { - super(target, cause); + LocalHistoryFailure(final LocalHistoryIdentifier target, final long sequence, final RequestException cause) { + super(target, sequence, cause); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java index e8cdf6d19f..0f3e533ba2 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java @@ -31,8 +31,9 @@ final class LocalHistoryFailureProxyV1 extends AbstractRequestFailureProxy> extends Request { private static final long serialVersionUID = 1L; - LocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { - super(target, replyTo); + LocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) { + super(target, sequence, replyTo); } LocalHistoryRequest(final T request, final ABIVersion version) { @@ -36,7 +36,7 @@ public abstract class LocalHistoryRequest> exte @Override public final LocalHistoryFailure toRequestFailure(final RequestException cause) { - return new LocalHistoryFailure(getTarget(), cause); + return new LocalHistoryFailure(getTarget(), getSequence(), cause); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java index 4e588cc200..3b8ed35816 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java @@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; public final class LocalHistorySuccess extends RequestSuccess { private static final long serialVersionUID = 1L; - public LocalHistorySuccess(final LocalHistoryIdentifier target) { - super(target); + public LocalHistorySuccess(final LocalHistoryIdentifier target, final long sequence) { + super(target, sequence); } private LocalHistorySuccess(final LocalHistorySuccess success, final ABIVersion version) { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java index 7806c33354..23858b20b2 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java @@ -34,7 +34,7 @@ final class LocalHistorySuccessProxyV1 extends AbstractSuccessProxy modifications; private final PersistenceProtocol protocol; - ModifyTransactionRequest(final TransactionIdentifier target, final ActorRef replyTo, + ModifyTransactionRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo, final List modifications, final PersistenceProtocol protocol) { - super(target, replyTo); + super(target, sequence, replyTo); this.modifications = ImmutableList.copyOf(modifications); this.protocol = protocol; } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java index 336902e192..4e2a8cec2a 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java @@ -30,7 +30,8 @@ public final class ModifyTransactionRequestBuilder implements Builder modifications = new ArrayList<>(1); private final TransactionIdentifier identifier; private final ActorRef replyTo; - private PersistenceProtocol protocol = null; + private PersistenceProtocol protocol; + private Long sequence; public ModifyTransactionRequestBuilder(final TransactionIdentifier identifier, final ActorRef replyTo) { this.identifier = Preconditions.checkNotNull(identifier); @@ -42,24 +43,28 @@ public final class ModifyTransactionRequestBuilder implements Builder { private static final long serialVersionUID = 1L; - public PurgeLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { - super(target, replyTo); + public PurgeLocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) { + super(target, sequence, replyTo); } private PurgeLocalHistoryRequest(final PurgeLocalHistoryRequest request, final ABIVersion version) { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java index 0aaac6e32a..1ac90daa21 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java @@ -28,7 +28,8 @@ final class PurgeLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestP } @Override - protected PurgeLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { - return new PurgeLocalHistoryRequest(target, replyTo); + protected PurgeLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final long sequence, + final ActorRef replyTo) { + return new PurgeLocalHistoryRequest(target, sequence, replyTo); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequest.java index 20b679f420..2d754cbb18 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequest.java @@ -23,9 +23,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; public final class ReadTransactionRequest extends AbstractReadTransactionRequest { private static final long serialVersionUID = 1L; - public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo, - final @Nonnull YangInstanceIdentifier path) { - super(identifier, replyTo, path); + public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence, + final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) { + super(identifier, sequence, replyTo, path); } private ReadTransactionRequest(final ReadTransactionRequest request, final ABIVersion version) { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java index ae0f6f6470..0c60b2b015 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java @@ -29,8 +29,8 @@ final class ReadTransactionRequestProxyV1 extends AbstractReadTransactionRequest } @Override - ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo, - final YangInstanceIdentifier path) { - return new ReadTransactionRequest(target, replyTo, path); + ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence, + final ActorRef replyTo, final YangInstanceIdentifier path) { + return new ReadTransactionRequest(target, sequence, replyTo, path); } } \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java index 45e7631580..f3d8395d6f 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java @@ -24,8 +24,9 @@ public final class ReadTransactionSuccess extends TransactionSuccess> data; - public ReadTransactionSuccess(final TransactionIdentifier identifier, final Optional> data) { - super(identifier); + public ReadTransactionSuccess(final TransactionIdentifier identifier, final long sequence, + final Optional> data) { + super(identifier, sequence); this.data = Preconditions.checkNotNull(data); } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java index 923284278f..ed45695d10 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java @@ -63,7 +63,7 @@ final class ReadTransactionSuccessProxyV1 extends AbstractTransactionSuccessProx } @Override - protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target) { - return new ReadTransactionSuccess(target, data); + protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence) { + return new ReadTransactionSuccess(target, sequence, data); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java index a7f132a609..b8499cc2a2 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java @@ -21,8 +21,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public final class TransactionAbortRequest extends TransactionRequest { private static final long serialVersionUID = 1L; - public TransactionAbortRequest(final TransactionIdentifier target, final ActorRef replyTo) { - super(target, replyTo); + public TransactionAbortRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) { + super(target, sequence, replyTo); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java index bc1fc582f4..0743e3e24e 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java @@ -28,7 +28,8 @@ final class TransactionAbortRequestProxyV1 extends AbstractTransactionRequestPro } @Override - protected TransactionAbortRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) { - return new TransactionAbortRequest(target, replyTo); + protected TransactionAbortRequest createRequest(final TransactionIdentifier target, final long sequence, + final ActorRef replyTo) { + return new TransactionAbortRequest(target, sequence, replyTo); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java index c9625afa68..69c6dddd8f 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java @@ -19,8 +19,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public final class TransactionAbortSuccess extends TransactionSuccess { private static final long serialVersionUID = 1L; - public TransactionAbortSuccess(final TransactionIdentifier identifier) { - super(identifier); + public TransactionAbortSuccess(final TransactionIdentifier identifier, final long sequence) { + super(identifier, sequence); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java index 2c347371e3..3cf513ae3c 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java @@ -27,7 +27,7 @@ final class TransactionAbortSuccessProxyV1 extends AbstractTransactionSuccessPro } @Override - protected TransactionAbortSuccess createSuccess(final TransactionIdentifier target) { - return new TransactionAbortSuccess(target); + protected TransactionAbortSuccess createSuccess(final TransactionIdentifier target, final long sequence) { + return new TransactionAbortSuccess(target, sequence); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java index c7d417626a..4e689b2ace 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java @@ -19,8 +19,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public final class TransactionCanCommitSuccess extends TransactionSuccess { private static final long serialVersionUID = 1L; - public TransactionCanCommitSuccess(final TransactionIdentifier identifier) { - super(identifier); + public TransactionCanCommitSuccess(final TransactionIdentifier identifier, final long sequence) { + super(identifier, sequence); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java index a8af4af2c1..b645d68885 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java @@ -40,7 +40,7 @@ final class TransactionCanCommitSuccessProxyV1 extends AbstractTransactionSucces } @Override - protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target) { - return new TransactionCanCommitSuccess(target); + protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) { + return new TransactionCanCommitSuccess(target, sequence); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java index 275b5cf1e8..6b28244484 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java @@ -19,8 +19,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public final class TransactionCommitSuccess extends TransactionSuccess { private static final long serialVersionUID = 1L; - public TransactionCommitSuccess(final TransactionIdentifier identifier) { - super(identifier); + public TransactionCommitSuccess(final TransactionIdentifier identifier, final long sequence) { + super(identifier, sequence); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java index 4628a9d7b6..aaf07c26e1 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java @@ -27,7 +27,7 @@ final class TransactionCommitSuccessProxyV1 extends AbstractTransactionSuccessPr } @Override - protected TransactionCommitSuccess createSuccess(final TransactionIdentifier target) { - return new TransactionCommitSuccess(target); + protected TransactionCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) { + return new TransactionCommitSuccess(target, sequence); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java index 6707aa199d..955c268008 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java @@ -21,8 +21,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public final class TransactionDoCommitRequest extends TransactionRequest { private static final long serialVersionUID = 1L; - public TransactionDoCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) { - super(target, replyTo); + public TransactionDoCommitRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) { + super(target, sequence, replyTo); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java index f7718446dc..ce9ca9b004 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java @@ -28,7 +28,8 @@ final class TransactionDoCommitRequestProxyV1 extends AbstractTransactionRequest } @Override - protected TransactionDoCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) { - return new TransactionDoCommitRequest(target, replyTo); + protected TransactionDoCommitRequest createRequest(final TransactionIdentifier target, final long sequence, + final ActorRef replyTo) { + return new TransactionDoCommitRequest(target, sequence, replyTo); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailure.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailure.java index 750d327e41..e0b6a59987 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailure.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailure.java @@ -22,8 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public final class TransactionFailure extends RequestFailure { private static final long serialVersionUID = 1L; - TransactionFailure(final TransactionIdentifier target, final RequestException cause) { - super(target, cause); + TransactionFailure(final TransactionIdentifier target, final long sequence, final RequestException cause) { + super(target, sequence, cause); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailureProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailureProxyV1.java index fd99749bed..15cf09c5b7 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailureProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailureProxyV1.java @@ -31,8 +31,9 @@ final class TransactionFailureProxyV1 extends AbstractRequestFailureProxy { private static final long serialVersionUID = 1L; - public TransactionPreCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) { - super(target, replyTo); + public TransactionPreCommitRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) { + super(target, sequence, replyTo); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java index dd41e6ab3a..649db56a57 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java @@ -28,7 +28,8 @@ final class TransactionPreCommitRequestProxyV1 extends AbstractTransactionReques } @Override - protected TransactionPreCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) { - return new TransactionPreCommitRequest(target, replyTo); + protected TransactionPreCommitRequest createRequest(final TransactionIdentifier target, final long sequence, + final ActorRef replyTo) { + return new TransactionPreCommitRequest(target, sequence, replyTo); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java index 8a7da4e61b..1cf00e668f 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java @@ -18,8 +18,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public final class TransactionPreCommitSuccess extends TransactionSuccess { private static final long serialVersionUID = 1L; - public TransactionPreCommitSuccess(final TransactionIdentifier identifier) { - super(identifier); + public TransactionPreCommitSuccess(final TransactionIdentifier identifier, final long sequence) { + super(identifier, sequence); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java index 2c0cdea17e..387a60c8e9 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java @@ -27,7 +27,7 @@ final class TransactionPreCommitSuccessProxyV1 extends AbstractTransactionSucces } @Override - protected TransactionPreCommitSuccess createSuccess(final TransactionIdentifier target) { - return new TransactionPreCommitSuccess(target); + protected TransactionPreCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) { + return new TransactionPreCommitSuccess(target, sequence); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionRequest.java index 154d4b39e0..7ae6b81e3b 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionRequest.java @@ -26,8 +26,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public abstract class TransactionRequest> extends Request { private static final long serialVersionUID = 1L; - TransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) { - super(identifier, replyTo); + TransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo) { + super(identifier, sequence, replyTo); } TransactionRequest(final T request, final ABIVersion version) { @@ -36,7 +36,7 @@ public abstract class TransactionRequest> extend @Override public final TransactionFailure toRequestFailure(final RequestException cause) { - return new TransactionFailure(getTarget(), cause); + return new TransactionFailure(getTarget(), getSequence(), cause); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java index daa4ba1b8b..77a6b56d1e 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java @@ -24,8 +24,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public abstract class TransactionSuccess> extends RequestSuccess { private static final long serialVersionUID = 1L; - TransactionSuccess(final TransactionIdentifier identifier) { - super(identifier); + TransactionSuccess(final TransactionIdentifier identifier, final long sequence) { + super(identifier, sequence); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java index 89458b8a07..69fc29305c 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java @@ -15,9 +15,10 @@ import org.opendaylight.yangtools.concepts.WritableObjects; abstract class AbstractEnvelopeProxy> implements Externalizable { private static final long serialVersionUID = 1L; + private T message; - private long sequence; - private long retry; + private long sessionId; + private long txSequence; public AbstractEnvelopeProxy() { // for Externalizable @@ -25,13 +26,13 @@ abstract class AbstractEnvelopeProxy> implements Externa AbstractEnvelopeProxy(final Envelope envelope) { message = envelope.getMessage(); - sequence = envelope.getSequence(); - retry = envelope.getRetry(); + txSequence = envelope.getTxSequence(); + sessionId = envelope.getSessionId(); } @Override public final void writeExternal(final ObjectOutput out) throws IOException { - WritableObjects.writeLongs(out, sequence, retry); + WritableObjects.writeLongs(out, sessionId, txSequence); out.writeObject(message); } @@ -39,14 +40,14 @@ abstract class AbstractEnvelopeProxy> implements Externa @Override public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { final byte header = WritableObjects.readLongHeader(in); - sequence = WritableObjects.readFirstLong(in, header); - retry = WritableObjects.readSecondLong(in, header); + sessionId = WritableObjects.readFirstLong(in, header); + txSequence = WritableObjects.readSecondLong(in, header); message = (T) in.readObject(); } - abstract Envelope createEnvelope(T message, long sequence, long retry); + abstract Envelope createEnvelope(T message, long sessionId, long txSequence); final Object readResolve() { - return createEnvelope(message, sequence, retry); + return createEnvelope(message, sessionId, txSequence); } } \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractMessageProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractMessageProxy.java index 4b60aecefa..48ad0d39bb 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractMessageProxy.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractMessageProxy.java @@ -15,6 +15,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import javax.annotation.Nonnull; import org.opendaylight.yangtools.concepts.WritableIdentifier; +import org.opendaylight.yangtools.concepts.WritableObjects; /** * Abstract Externalizable proxy for use with {@link Message} subclasses. @@ -27,6 +28,7 @@ import org.opendaylight.yangtools.concepts.WritableIdentifier; abstract class AbstractMessageProxy> implements Externalizable { private static final long serialVersionUID = 1L; private T target; + private long sequence; protected AbstractMessageProxy() { // For Externalizable @@ -34,22 +36,25 @@ abstract class AbstractMessageProxy> implements Immutable, Se private static final long serialVersionUID = 1L; private final T message; - private final long sequence; - private final long retry; + private final long txSequence; + private final long sessionId; - Envelope(final T message, final long sequence, final long retry) { + Envelope(final T message, final long sessionId, final long txSequence) { this.message = Preconditions.checkNotNull(message); - this.sequence = sequence; - this.retry = retry; + this.sessionId = sessionId; + this.txSequence = txSequence; } /** @@ -35,27 +35,27 @@ public abstract class Envelope> implements Immutable, Se } /** - * Get the message sequence of this envelope. + * Get the message transmission sequence of this envelope. * * @return Message sequence */ - public long getSequence() { - return sequence; + public long getTxSequence() { + return txSequence; } /** - * Get the message retry counter. + * Get the session identifier. * - * @return Retry counter + * @return Session identifier */ - public long getRetry() { - return retry; + public long getSessionId() { + return sessionId; } @Override public String toString() { - return MoreObjects.toStringHelper(Envelope.class).add("sequence", Long.toUnsignedString(sequence, 16)). - add("retry", retry).add("message", message).toString(); + return MoreObjects.toStringHelper(Envelope.class).add("sessionId", Long.toHexString(sessionId)) + .add("txSequence", Long.toHexString(txSequence)).add("message", message).toString(); } final Object writeReplace() { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java index 8da54f5954..6c32ae2554 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java @@ -10,8 +10,8 @@ package org.opendaylight.controller.cluster.access.concepts; public final class FailureEnvelope extends ResponseEnvelope> { private static final long serialVersionUID = 1L; - public FailureEnvelope(final RequestFailure message, final long sequence, final long retry) { - super(message, sequence, retry); + public FailureEnvelope(final RequestFailure message, final long sessionId, final long txSequence) { + super(message, sessionId, txSequence); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java index a14e69875f..892b44d33b 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java @@ -19,7 +19,7 @@ final class FailureEnvelopeProxy extends AbstractResponseEnvelopeProxy message, final long sequence, final long retry) { - return new FailureEnvelope(message, sequence, retry); + FailureEnvelope createEnvelope(final RequestFailure message, final long sessionId, final long txSequence) { + return new FailureEnvelope(message, sessionId, txSequence); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Message.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Message.java index 5070b7cf71..30e631eed7 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Message.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Message.java @@ -50,20 +50,23 @@ import org.opendaylight.yangtools.concepts.WritableIdentifier; public abstract class Message> implements Immutable, Serializable { private static final long serialVersionUID = 1L; - private final T target; + private final ABIVersion version; + private final long sequence; + private final T target; - private Message(final ABIVersion version, final T target) { + private Message(final ABIVersion version, final T target, final long sequence) { this.target = Preconditions.checkNotNull(target); this.version = Preconditions.checkNotNull(version); + this.sequence = sequence; } - Message(final T target) { - this(ABIVersion.current(), target); + Message(final T target, final long sequence) { + this(ABIVersion.current(), target, sequence); } Message(final C msg, final ABIVersion version) { - this(version, msg.getTarget()); + this(version, msg.getTarget(), msg.getSequence()); } /** @@ -75,8 +78,17 @@ public abstract class Message> { private static final long serialVersionUID = 1L; - public RequestEnvelope(final Request message, final long sequence, final long retry) { - super(message, sequence, retry); + public RequestEnvelope(final Request message, final long sessionId, final long txSequence) { + super(message, sessionId, txSequence); } @Override @@ -28,7 +28,7 @@ public final class RequestEnvelope extends Envelope> { * @throws NullPointerException if cause is null */ public void sendFailure(final RequestException cause) { - sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSequence(), getRetry())); + sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence())); } /** @@ -38,7 +38,7 @@ public final class RequestEnvelope extends Envelope> { * @throws NullPointerException if success is null */ public void sendSuccess(final RequestSuccess success) { - sendResponse(new SuccessEnvelope(success, getSequence(), getRetry())); + sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence())); } private void sendResponse(final ResponseEnvelope envelope) { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java index 1b499d003f..8ab450d8d3 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java @@ -19,7 +19,7 @@ final class RequestEnvelopeProxy extends AbstractEnvelopeProxy> { } @Override - RequestEnvelope createEnvelope(final Request message, final long sequence, final long retry) { - return new RequestEnvelope(message, sequence, retry); + RequestEnvelope createEnvelope(final Request message, final long sessionId, final long txSequence) { + return new RequestEnvelope(message, sessionId, txSequence); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestFailure.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestFailure.java index 301b54ee1f..ecd2ff4955 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestFailure.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestFailure.java @@ -32,8 +32,8 @@ public abstract class RequestFailure> extends Message { private static final long serialVersionUID = 1L; - Response(final @Nonnull T target) { - super(target); + Response(final @Nonnull T target, final long sequence) { + super(target, sequence); } Response(final @Nonnull C response, final @Nonnull ABIVersion version) { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java index a3625f665a..9f998e7fac 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java @@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.access.concepts; public abstract class ResponseEnvelope> extends Envelope { private static final long serialVersionUID = 1L; - ResponseEnvelope(final T message, final long sequence, final long retry) { - super(message, sequence, retry); + ResponseEnvelope(final T message, final long sessionId, final long txSequence) { + super(message, sessionId, txSequence); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java index be37dae110..d98e257ce0 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java @@ -10,8 +10,8 @@ package org.opendaylight.controller.cluster.access.concepts; public final class SuccessEnvelope extends ResponseEnvelope> { private static final long serialVersionUID = 1L; - public SuccessEnvelope(final RequestSuccess message, final long sequence, final long retry) { - super(message, sequence, retry); + public SuccessEnvelope(final RequestSuccess message, final long sessionId, final long txSequence) { + super(message, sessionId, txSequence); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java index 9f0fc2f325..50df24771f 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java @@ -19,7 +19,7 @@ final class SuccessEnvelopeProxy extends AbstractResponseEnvelopeProxy message, final long sequence, final long retry) { - return new SuccessEnvelope(message, sequence, retry); + SuccessEnvelope createEnvelope(final RequestSuccess message, final long sessionId, final long txSequence) { + return new SuccessEnvelope(message, sessionId, txSequence); } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfo.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfo.java index efdfa04b16..bdffb08c3e 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfo.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/BackendInfo.java @@ -26,10 +26,15 @@ import org.opendaylight.controller.cluster.access.ABIVersion; public class BackendInfo { private final ABIVersion version; private final ActorRef actor; + private final int maxMessages; + private final long sessionId; - protected BackendInfo(final ActorRef actor, final ABIVersion version) { + protected BackendInfo(final ActorRef actor, final long sessionId, final ABIVersion version, final int maxMessages) { this.version = Preconditions.checkNotNull(version); this.actor = Preconditions.checkNotNull(actor); + Preconditions.checkArgument(maxMessages > 0, "Maximum messages has to be positive, not %s", maxMessages); + this.maxMessages = maxMessages; + this.sessionId = sessionId; } public final ActorRef getActor() { @@ -40,6 +45,14 @@ public class BackendInfo { return version; } + public final int getMaxMessages() { + return maxMessages; + } + + public final long getSessionId() { + return sessionId; + } + @Override public final int hashCode() { return super.hashCode(); @@ -56,6 +69,7 @@ public class BackendInfo { } protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { - return toStringHelper.add("actor", actor).add("version", version); + return toStringHelper.add("actor", actor).add("sessionId", sessionId).add("version", version) + .add("maxMessages", maxMessages); } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index 3cdda596ad..8939ec977e 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -83,13 +83,12 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior request, - final RequestCallback callback) { + private ClientActorBehavior doSendRequest(final TransactionRequest request, final RequestCallback callback) { // Get or allocate queue for the request final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie()); // Note this is a tri-state return and can be null - final Optional result = queue.enqueueRequest(sequence, request, callback); + final Optional result = queue.enqueueRequest(request, callback); if (result == null) { // Happy path: we are done here return this; @@ -189,7 +188,7 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior request, final RequestCallback callback) { - context().executeInActor(cb -> cb.doSendRequest(sequence, request, callback)); + public final void sendRequest(final TransactionRequest request, final RequestCallback callback) { + context().executeInActor(cb -> cb.doSendRequest(request, callback)); } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java new file mode 100644 index 0000000000..31d863a36c --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/EmptyQueue.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import java.util.AbstractQueue; +import java.util.Collections; +import java.util.Iterator; +import java.util.Queue; + +/** + * A specialized always-empty implementation of {@link java.util.Queue}. This implementation will always refuse new + * elements in its {@link #offer(Object)} method. + + * @author Robert Varga + * + * @param the type of elements held in this collection + */ +// TODO: move this class into yangtools.util +final class EmptyQueue extends AbstractQueue { + private static final EmptyQueue INSTANCE = new EmptyQueue<>(); + + private EmptyQueue() { + // No instances + } + + @SuppressWarnings("unchecked") + static Queue getInstance() { + return (Queue) INSTANCE; + } + + @Override + public boolean offer(final T e) { + return false; + } + + @Override + public T poll() { + return null; + } + + @Override + public T peek() { + return null; + } + + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + + @Override + public int size() { + return 0; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java index 513a3f936b..5cf7873a4b 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueue.java @@ -10,16 +10,18 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; -import java.util.Deque; +import com.google.common.base.Verify; +import java.util.ArrayDeque; import java.util.Iterator; -import java.util.LinkedList; import java.util.Optional; +import java.util.Queue; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,18 +44,32 @@ final class SequencedQueue { TimeUnit.NANOSECONDS); /** - * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can - * progress at different speeds, these may be completed out of order. - * - * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to - * figure out a more efficient lookup mechanism to deal with responses which do not match the queue - * order. + * Default number of permits we start with. This value is used when we start up only, once we resolve a backend + * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful + * resolution. */ - private final Deque queue = new LinkedList<>(); + private static final int DEFAULT_TX_LIMIT = 1000; + private final Ticker ticker; private final Long cookie; - // Updated/consulted from actor context only + /* + * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing + * with the limit changing between reconnects (which imply retransmission). + * + * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one), + * one for requests that have been sent to the previous backend (and have not been transmitted to the current one), + * and one for requests which have not been transmitted at all. + * + * When transmitting we first try to drain the second queue and service the third one only when that becomes empty. + * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses + * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance + * to retransmit -- hence the second queue. + */ + private Queue currentInflight = new ArrayDeque<>(); + private Queue lastInflight = new ArrayDeque<>(); + private final Queue pending = new ArrayDeque<>(); + /** * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested @@ -62,6 +78,11 @@ final class SequencedQueue { private CompletionStage backendProof; private BackendInfo backend; + // This is not final because we need to be able to replace it. + private long txSequence; + + private int lastTxLimit = DEFAULT_TX_LIMIT; + /** * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue. */ @@ -86,12 +107,16 @@ final class SequencedQueue { Preconditions.checkState(notClosed, "Queue %s is closed", this); } + private long nextTxSequence() { + return txSequence++; + } + /** * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller * the following scenarios: * 1) The request has been enqueued and transmitted. No further actions are necessary * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer - * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that + * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that * process needs to complete before transmission occurs * * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode @@ -105,21 +130,32 @@ final class SequencedQueue { * @param callback Callback to be invoked * @return Optional duration with semantics described above. */ - @Nullable Optional enqueueRequest(final long sequence, final Request request, - final RequestCallback callback) { + @Nullable Optional enqueueRequest(final Request request, final RequestCallback callback) { checkNotClosed(); final long now = ticker.read(); - final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now); - - queue.add(e); - LOG.debug("Enqueued request {} to queue {}", request, this); - + final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now); if (backend == null) { + LOG.debug("No backend available, request resolution"); + pending.add(e); return Optional.empty(); } + if (!lastInflight.isEmpty()) { + LOG.debug("Retransmit not yet complete, delaying request {}", request); + pending.add(e); + return null; + } + if (currentInflight.size() >= lastTxLimit) { + LOG.debug("Queue is at capacity, delayed sending of request {}", request); + pending.add(e); + return null; + } - e.retransmit(backend, now); + // Ready to transmit + currentInflight.offer(e); + LOG.debug("Enqueued request {} to queue {}", request, this); + + e.retransmit(backend, nextTxSequence(), now); if (expectingTimer == null) { expectingTimer = now + REQUEST_TIMEOUT_NANOS; return Optional.of(INITIAL_REQUEST_TIMEOUT); @@ -128,53 +164,162 @@ final class SequencedQueue { } } - ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope response) { - // Responses to different targets may arrive out of order, hence we use an iterator + /* + * We are using tri-state return here to indicate one of three conditions: + * - if a matching entry is found, return an Optional containing it + * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null + * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional + */ + private static Optional findMatchingEntry(final Queue queue, + final ResponseEnvelope envelope) { + // Try to find the request in a queue. Responses may legally come back in a different order, hence we need + // to use an iterator final Iterator it = queue.iterator(); while (it.hasNext()) { final SequencedQueueEntry e = it.next(); - if (e.acceptsResponse(response)) { - lastProgress = ticker.read(); - it.remove(); - LOG.debug("Completing request {} with {}", e, response); - return e.complete(response.getMessage()); + final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails()); + + final Request request = e.getRequest(); + final Response response = envelope.getMessage(); + + // First check for matching target, or move to next entry + if (!request.getTarget().equals(response.getTarget())) { + continue; + } + + // Sanity-check logical sequence, ignore any out-of-order messages + if (request.getSequence() != response.getSequence()) { + LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope); + return Optional.empty(); + } + + // Now check session match + if (envelope.getSessionId() != txDetails.getSessionId()) { + LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope); + return Optional.empty(); + } + if (envelope.getTxSequence() != txDetails.getTxSequence()) { + LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope); + return Optional.empty(); } + + LOG.debug("Completing request {} with {}", request, envelope); + it.remove(); + return Optional.of(e); } - LOG.debug("No request matching {} found", response); - return current; + return null; + } + + ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope envelope) { + Optional maybeEntry = findMatchingEntry(currentInflight, envelope); + if (maybeEntry == null) { + maybeEntry = findMatchingEntry(lastInflight, envelope); + } + + if (maybeEntry == null || !maybeEntry.isPresent()) { + LOG.warn("No request matching {} found, ignoring response", envelope); + return current; + } + + lastProgress = ticker.read(); + final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage()); + + // We have freed up a slot, try to transmit something + if (backend != null) { + final int toSend = lastTxLimit - currentInflight.size(); + if (toSend > 0) { + runTransmit(toSend); + } + } + + return ret; + } + + private int transmitEntries(final Queue queue, final int count) { + int toSend = count; + + while (toSend > 0) { + final SequencedQueueEntry e = queue.poll(); + if (e == null) { + break; + } + + LOG.debug("Transmitting entry {}", e); + e.retransmit(backend, nextTxSequence(), lastProgress); + toSend--; + } + + return toSend; + } + + private void runTransmit(final int count) { + final int toSend; + + // Process lastInflight first, possibly clearing it + if (!lastInflight.isEmpty()) { + toSend = transmitEntries(lastInflight, count); + if (lastInflight.isEmpty()) { + // We won't be needing the queue anymore, change it to specialized implementation + lastInflight = EmptyQueue.getInstance(); + } + } else { + toSend = count; + } + + // Process pending next. + transmitEntries(pending, toSend); } Optional setBackendInfo(final CompletionStage proof, final BackendInfo backend) { + Preconditions.checkNotNull(backend); if (!proof.equals(backendProof)) { LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof); return Optional.empty(); } - this.backend = Preconditions.checkNotNull(backend); - backendProof = null; LOG.debug("Resolved backend {}", backend); - if (queue.isEmpty()) { - // No pending requests, hence no need for a timer - return Optional.empty(); + // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right + // and also not to exceed new limits + final Queue newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size()); + newLast.addAll(currentInflight); + newLast.addAll(lastInflight); + lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast; + + // Clear currentInflight, possibly compacting it + final int txLimit = backend.getMaxMessages(); + if (lastTxLimit > txLimit) { + currentInflight = new ArrayDeque<>(); + } else { + currentInflight.clear(); } - LOG.debug("Resending requests to backend {}", backend); - final long now = ticker.read(); - for (SequencedQueueEntry e : queue) { - e.retransmit(backend, now); - } + // We are ready to roll + this.backend = backend; + backendProof = null; + txSequence = 0; + lastTxLimit = txLimit; + lastProgress = ticker.read(); - if (expectingTimer != null) { - // We already have a timer going, no need to schedule a new one + // No pending requests, return + if (lastInflight.isEmpty() && pending.isEmpty()) { return Optional.empty(); } - // Above loop may have cost us some time. Recalculate timeout. - final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS; - expectingTimer = nextTicks; - return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS)); + LOG.debug("Sending up to {} requests to backend {}", txLimit, backend); + + runTransmit(lastTxLimit); + + // Calculate next timer if necessary + if (expectingTimer == null) { + // Request transmission may have cost us some time. Recalculate timeout. + final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS; + expectingTimer = nextTicks; + return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS)); + } else { + return Optional.empty(); + } } boolean expectProof(final CompletionStage proof) { @@ -189,7 +334,7 @@ final class SequencedQueue { } boolean hasCompleted() { - return !notClosed && queue.isEmpty(); + return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty(); } /** @@ -203,7 +348,7 @@ final class SequencedQueue { expectingTimer = null; final long now = ticker.read(); - if (!queue.isEmpty()) { + if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) { final long ticksSinceProgress = now - lastProgress; if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, @@ -216,7 +361,7 @@ final class SequencedQueue { } // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue - final SequencedQueueEntry head = queue.peek(); + final SequencedQueueEntry head = currentInflight.peek(); if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) { backend = null; LOG.debug("Queue {} invalidated backend info", this); @@ -226,14 +371,17 @@ final class SequencedQueue { } } + private static void poisonQueue(final Queue queue, final RequestException cause) { + queue.forEach(e -> e.poison(cause)); + queue.clear(); + } + void poison(final RequestException cause) { close(); - SequencedQueueEntry e = queue.poll(); - while (e != null) { - e.poison(cause); - e = queue.poll(); - } + poisonQueue(currentInflight, cause); + poisonQueue(lastInflight, cause); + poisonQueue(pending, cause); } // FIXME: add a caller from ClientSingleTransaction diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java index 83e3e07913..8814d50c54 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntry.java @@ -10,12 +10,11 @@ package org.opendaylight.controller.cluster.access.client; import akka.actor.ActorRef; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import java.util.Optional; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; -import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,49 +22,31 @@ import org.slf4j.LoggerFactory; * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information. * * @author Robert Varga - * - * @param Target identifier type */ final class SequencedQueueEntry { - private static final class LastTry { - final long timeTicks; - final long retry; - - LastTry(final long retry, final long timeTicks) { - this.retry = retry; - this.timeTicks = timeTicks; - } - } - private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class); private final Request request; private final RequestCallback callback; private final long enqueuedTicks; - private final long sequence; - private Optional lastTry = Optional.empty(); + private TxDetails txDetails; - SequencedQueueEntry(final Request request, final long sequence, final RequestCallback callback, + SequencedQueueEntry(final Request request, final RequestCallback callback, final long now) { this.request = Preconditions.checkNotNull(request); this.callback = Preconditions.checkNotNull(callback); this.enqueuedTicks = now; - this.sequence = sequence; } - long getSequence() { - return sequence; + Request getRequest() { + return request; } - boolean acceptsResponse(final ResponseEnvelope response) { - return getSequence() == response.getSequence() && request.getTarget().equals(response.getMessage().getTarget()); + @Nullable TxDetails getTxDetails() { + return txDetails; } - long getCurrentTry() { - return lastTry.isPresent() ? lastTry.get().retry : 0; - } - ClientActorBehavior complete(final Response response) { LOG.debug("Completing request {} with {}", request, response); return callback.complete(response); @@ -79,8 +60,8 @@ final class SequencedQueueEntry { boolean isTimedOut(final long now, final long timeoutNanos) { final long elapsed; - if (lastTry.isPresent()) { - elapsed = now - lastTry.get().timeTicks; + if (txDetails != null) { + elapsed = now - txDetails.getTimeTicks(); } else { elapsed = now - enqueuedTicks; } @@ -93,18 +74,19 @@ final class SequencedQueueEntry { } } - void retransmit(final BackendInfo backend, final long now) { - final long retry = lastTry.isPresent() ? lastTry.get().retry + 1 : 0; - final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), sequence, retry); + void retransmit(final BackendInfo backend, final long txSequence, final long now) { + final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), + backend.getSessionId(), txSequence); final ActorRef actor = backend.getActor(); - LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor); + LOG.trace("Transmitting request {} as {} to {}", request, toSend, actor); actor.tell(toSend, ActorRef.noSender()); - lastTry = Optional.of(new LastTry(retry, now)); + txDetails = new TxDetails(backend.getSessionId(), txSequence, now); } @Override public String toString() { return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString(); } + } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TxDetails.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TxDetails.java new file mode 100644 index 0000000000..84d4b62699 --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TxDetails.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +/** + * Holder class for transmission details about a particular {@link SequencedQueueEntry}. + * + * @author Robert Varga + */ +final class TxDetails { + private final long sessionId; + private final long txSequence; + private final long timeTicks; + + TxDetails(final long sessionId, final long txSequence, final long timeTicks) { + this.sessionId = sessionId; + this.txSequence = txSequence; + this.timeTicks = timeTicks; + } + + long getSessionId() { + return sessionId; + } + + long getTxSequence() { + return txSequence; + } + + long getTimeTicks() { + return timeTicks; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntryTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntryTest.java index aecd238c6d..dca396648c 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntryTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueEntryTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -49,7 +50,7 @@ public class SequencedQueueEntryTest { private static final long serialVersionUID = 1L; MockFailure(final WritableIdentifier target, final RequestException cause) { - super(target, cause); + super(target, 0, cause); } @Override @@ -67,7 +68,7 @@ public class SequencedQueueEntryTest { private static final long serialVersionUID = 1L; MockRequest(final WritableIdentifier target, final ActorRef replyTo) { - super(target, replyTo); + super(target, 0, replyTo); } @Override @@ -127,11 +128,11 @@ public class SequencedQueueEntryTest { ticker.increment(ThreadLocalRandom.current().nextLong()); mockActor = TestProbe.apply(actorSystem); - mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current()); + mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5); mockRequest = new MockRequest(mockIdentifier, mockReplyTo); mockResponse = mockRequest.toRequestFailure(mockCause); - entry = new SequencedQueueEntry(mockRequest, 0, mockCallback, ticker.read()); + entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read()); } @After @@ -140,19 +141,14 @@ public class SequencedQueueEntryTest { } @Test - public void testGetSequence() { - assertEquals(0, entry.getSequence()); - } - - @Test - public void testGetCurrentTry() { - assertEquals(0, entry.getCurrentTry()); - entry.retransmit(mockBackendInfo, ticker.read()); - assertEquals(0, entry.getCurrentTry()); - entry.retransmit(mockBackendInfo, ticker.read()); - assertEquals(1, entry.getCurrentTry()); - entry.retransmit(mockBackendInfo, ticker.read()); - assertEquals(2, entry.getCurrentTry()); + public void testGetTxDetails() { + assertNull(entry.getTxDetails()); + entry.retransmit(mockBackendInfo, 0, ticker.read()); + assertEquals(0, entry.getTxDetails().getTxSequence()); + entry.retransmit(mockBackendInfo, 1, ticker.read()); + assertEquals(1, entry.getTxDetails().getTxSequence()); + entry.retransmit(mockBackendInfo, 3, ticker.read()); + assertEquals(3, entry.getTxDetails().getTxSequence()); } @Test @@ -175,13 +171,13 @@ public class SequencedQueueEntryTest { assertTrue(entry.isTimedOut(ticker.read(), 0)); assertFalse(entry.isTimedOut(ticker.read(), 1)); - entry.retransmit(mockBackendInfo, ticker.read()); + entry.retransmit(mockBackendInfo, 0, ticker.read()); assertTrue(entry.isTimedOut(ticker.read(), 0)); ticker.increment(10); assertTrue(entry.isTimedOut(ticker.read(), 10)); assertFalse(entry.isTimedOut(ticker.read(), 20)); - entry.retransmit(mockBackendInfo, ticker.read()); + entry.retransmit(mockBackendInfo, 1, ticker.read()); assertTrue(entry.isTimedOut(ticker.read(), 0)); ticker.increment(10); assertTrue(entry.isTimedOut(ticker.read(), 10)); @@ -191,7 +187,7 @@ public class SequencedQueueEntryTest { @Test public void testRetransmit() { assertFalse(mockActor.msgAvailable()); - entry.retransmit(mockBackendInfo, ticker.read()); + entry.retransmit(mockBackendInfo, 0, ticker.read()); assertTrue(mockActor.msgAvailable()); assertRequestEquals(mockRequest, mockActor.receiveOne(Duration.apply(5, TimeUnit.SECONDS))); @@ -201,8 +197,8 @@ public class SequencedQueueEntryTest { assertTrue(o instanceof RequestEnvelope); final RequestEnvelope actual = (RequestEnvelope) o; - assertEquals(0, actual.getRetry()); - assertEquals(0, actual.getSequence()); + assertEquals(0, actual.getSessionId()); + assertEquals(0, actual.getTxSequence()); assertEquals(expected.getTarget(), actual.getMessage().getTarget()); } } diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueTest.java index b05ca3a7e0..fb92de9351 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/SequencedQueueTest.java @@ -54,7 +54,7 @@ public class SequencedQueueTest { private static final long serialVersionUID = 1L; MockFailure(final WritableIdentifier target, final RequestException cause) { - super(target, cause); + super(target, 0, cause); } @Override @@ -72,7 +72,7 @@ public class SequencedQueueTest { private static final long serialVersionUID = 1L; MockRequest(final WritableIdentifier target, final ActorRef replyTo) { - super(target, replyTo); + super(target, 0, replyTo); } @Override @@ -135,7 +135,7 @@ public class SequencedQueueTest { ticker.increment(ThreadLocalRandom.current().nextLong()); mockActor = TestProbe.apply(actorSystem); - mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current()); + mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5); mockRequest = new MockRequest(mockIdentifier, mockReplyTo); mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo); mockResponse = mockRequest.toRequestFailure(mockCause); @@ -167,7 +167,7 @@ public class SequencedQueueTest { queue.close(); // Kaboom - queue.enqueueRequest(0, mockRequest, mockCallback); + queue.enqueueRequest(mockRequest, mockCallback); } @Test @@ -178,7 +178,7 @@ public class SequencedQueueTest { @Test public void testPoison() { - queue.enqueueRequest(0, mockRequest, mockCallback); + queue.enqueueRequest(mockRequest, mockCallback); queue.poison(mockCause); final ArgumentCaptor captor = ArgumentCaptor.forClass(MockFailure.class); @@ -192,7 +192,7 @@ public class SequencedQueueTest { queue.poison(mockCause); // Kaboom - queue.enqueueRequest(0, mockRequest, mockCallback); + queue.enqueueRequest(mockRequest, mockCallback); } @Test @@ -203,7 +203,7 @@ public class SequencedQueueTest { @Test public void testEnqueueRequestNeedsBackend() { - final Optional ret = queue.enqueueRequest(0, mockRequest, mockCallback); + final Optional ret = queue.enqueueRequest(mockRequest, mockCallback); assertNotNull(ret); assertFalse(ret.isPresent()); @@ -225,7 +225,7 @@ public class SequencedQueueTest { @Test public void testSetBackendWithNoResolution() { - queue.enqueueRequest(0, mockRequest, mockCallback); + queue.enqueueRequest(mockRequest, mockCallback); final CompletableFuture proof = new CompletableFuture<>(); final Optional ret = queue.setBackendInfo(proof, mockBackendInfo); @@ -235,7 +235,7 @@ public class SequencedQueueTest { @Test public void testSetBackendWithWrongProof() { - queue.enqueueRequest(0, mockRequest, mockCallback); + queue.enqueueRequest(mockRequest, mockCallback); final CompletableFuture proof = new CompletableFuture<>(); assertTrue(queue.expectProof(proof)); @@ -252,8 +252,8 @@ public class SequencedQueueTest { } @Test - public void testSetbackedWithRequestsNoTimer() { - queue.enqueueRequest(0, mockRequest, mockCallback); + public void testSetBackendWithRequestsNoTimer() { + queue.enqueueRequest(mockRequest, mockCallback); final CompletableFuture proof = new CompletableFuture<>(); assertTrue(queue.expectProof(proof)); @@ -270,7 +270,7 @@ public class SequencedQueueTest { public void testEnqueueRequestNeedsTimer() { setupBackend(); - final Optional ret = queue.enqueueRequest(0, mockRequest, mockCallback); + final Optional ret = queue.enqueueRequest(mockRequest, mockCallback); assertNotNull(ret); assertTrue(ret.isPresent()); assertTransmit(mockRequest, 0); @@ -281,13 +281,13 @@ public class SequencedQueueTest { setupBackend(); // First request - Optional ret = queue.enqueueRequest(0, mockRequest, mockCallback); + Optional ret = queue.enqueueRequest(mockRequest, mockCallback); assertNotNull(ret); assertTrue(ret.isPresent()); assertTransmit(mockRequest, 0); // Second request, no timer fired - ret = queue.enqueueRequest(1, mockRequest2, mockCallback); + ret = queue.enqueueRequest(mockRequest2, mockCallback); assertNull(ret); assertTransmit(mockRequest2, 1); } @@ -300,14 +300,14 @@ public class SequencedQueueTest { @Test public void testRunTimeoutWithoutShift() throws NoProgressException { - queue.enqueueRequest(0, mockRequest, mockCallback); + queue.enqueueRequest(mockRequest, mockCallback); final boolean ret = queue.runTimeout(); assertFalse(ret); } @Test public void testRunTimeoutWithTimeoutLess() throws NoProgressException { - queue.enqueueRequest(0, mockRequest, mockCallback); + queue.enqueueRequest(mockRequest, mockCallback); ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1); @@ -317,7 +317,9 @@ public class SequencedQueueTest { @Test public void testRunTimeoutWithTimeoutExact() throws NoProgressException { - queue.enqueueRequest(0, mockRequest, mockCallback); + setupBackend(); + + queue.enqueueRequest(mockRequest, mockCallback); ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS); @@ -327,7 +329,9 @@ public class SequencedQueueTest { @Test public void testRunTimeoutWithTimeoutMore() throws NoProgressException { - queue.enqueueRequest(0, mockRequest, mockCallback); + setupBackend(); + + queue.enqueueRequest(mockRequest, mockCallback); ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1); @@ -337,7 +341,7 @@ public class SequencedQueueTest { @Test(expected=NoProgressException.class) public void testRunTimeoutWithoutProgressExact() throws NoProgressException { - queue.enqueueRequest(0, mockRequest, mockCallback); + queue.enqueueRequest(mockRequest, mockCallback); ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS); @@ -347,7 +351,7 @@ public class SequencedQueueTest { @Test(expected=NoProgressException.class) public void testRunTimeoutWithoutProgressMore() throws NoProgressException { - queue.enqueueRequest(0, mockRequest, mockCallback); + queue.enqueueRequest(mockRequest, mockCallback); ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1); @@ -382,7 +386,9 @@ public class SequencedQueueTest { @Test public void testCompleteSingle() { - queue.enqueueRequest(0, mockRequest, mockCallback); + setupBackend(); + + queue.enqueueRequest(mockRequest, mockCallback); ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope); verify(mockCallback).complete(mockResponse); @@ -395,7 +401,9 @@ public class SequencedQueueTest { @Test public void testCompleteNull() { - queue.enqueueRequest(0, mockRequest, mockCallback); + setupBackend(); + + queue.enqueueRequest(mockRequest, mockCallback); doReturn(null).when(mockCallback).complete(mockResponse); @@ -408,10 +416,10 @@ public class SequencedQueueTest { public void testProgressRecord() throws NoProgressException { setupBackend(); - queue.enqueueRequest(0, mockRequest, mockCallback); + queue.enqueueRequest(mockRequest, mockCallback); ticker.increment(10); - queue.enqueueRequest(1, mockRequest2, mockCallback); + queue.enqueueRequest(mockRequest2, mockCallback); queue.complete(mockBehavior, mockResponseEnvelope); ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11); @@ -436,8 +444,8 @@ public class SequencedQueueTest { assertTrue(o instanceof RequestEnvelope); final RequestEnvelope actual = (RequestEnvelope) o; - assertEquals(0, actual.getRetry()); - assertEquals(sequence, actual.getSequence()); + assertEquals(0, actual.getSessionId()); + assertEquals(sequence, actual.getTxSequence()); assertSame(expected, actual.getMessage()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index 605de67140..f59703cc21 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -57,6 +57,10 @@ com.typesafe.akka akka-slf4j_${scala.version} + + org.scala-lang.modules + scala-java8-compat_${scala.version} + com.typesafe.akka akka-testkit_${scala.version} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 60919c05a4..cd104b597b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -67,7 +67,7 @@ abstract class AbstractProxyTransaction implements Identifiable backend) { - final java.util.Optional dataTree = backend.flatMap(t -> t.getDataTree()); + final java.util.Optional dataTree = backend.flatMap(ShardBackendInfo::getDataTree); final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId); if (dataTree.isPresent()) { return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot()); @@ -141,7 +141,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret = SettableFuture.create(); - client().sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(false)), t -> { + client().sendRequest(Verify.verifyNotNull(doCommit(false)), t -> { if (t instanceof TransactionCommitSuccess) { ret.set(Boolean.TRUE); } else if (t instanceof RequestFailure) { @@ -156,7 +156,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - client.sendRequest(nextSequence(), new TransactionAbortRequest(getIdentifier(), client().self()), t -> { + client.sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), client().self()), t -> { if (t instanceof TransactionAbortSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -170,7 +170,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - client.sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(true)), t -> { + client.sendRequest(Verify.verifyNotNull(doCommit(true)), t -> { if (t instanceof TransactionCanCommitSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -184,7 +184,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - client.sendRequest(nextSequence(), new TransactionPreCommitRequest(getIdentifier(), client().self()), t-> { + client.sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> { if (t instanceof TransactionPreCommitSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -198,7 +198,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - client.sendRequest(nextSequence(), new TransactionDoCommitRequest(getIdentifier(), client().self()), t-> { + client.sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> { if (t instanceof TransactionCommitSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -224,5 +224,4 @@ abstract class AbstractProxyTransaction implements Identifiable doCommit(boolean coordinated); - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java index a99ea3dfb4..b84008ca39 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -174,8 +174,8 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple transactions.remove(transaction.getIdentifier()); } - void sendRequest(final long sequence, final TransactionRequest request, final Consumer> completer) { - sendRequest(sequence, request, response -> { + void sendRequest(final TransactionRequest request, final Consumer> completer) { + sendRequest(request, response -> { completer.accept(response); return this; }); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 1dec84631a..9e787f12e1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -87,7 +87,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { @Override void doAbort() { - client().sendRequest(nextSequence(), new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER); + client().sendRequest(new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER); modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted")); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index 0633b68f1f..359b428c99 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -7,30 +7,33 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import akka.dispatch.ExecutionContexts; -import akka.dispatch.OnComplete; +import akka.actor.ActorRef; +import akka.japi.Function; +import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableBiMap.Builder; import com.google.common.primitives.UnsignedLong; -import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.client.BackendInfo; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; -import org.opendaylight.controller.cluster.datastore.DataStoreVersions; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; +import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; +import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; +import scala.compat.java8.FutureConverters; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named @@ -40,8 +43,6 @@ import scala.concurrent.ExecutionContext; * @author Robert Varga */ final class ModuleShardBackendResolver extends BackendInfoResolver { - private static final ExecutionContext DIRECT_EXECUTION_CONTEXT = - ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()); private static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); @@ -54,6 +55,8 @@ final class ModuleShardBackendResolver extends BackendInfoResolver ret = new CompletableFuture<>(); + final CompletableFuture ret = new CompletableFuture(); + + FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> { + LOG.debug("Looking up primary info for {} from {}", shardName, info); + return FutureConverters.toJava(Patterns.ask(info.getPrimaryShardActor(), + (Function) replyTo -> new ConnectClientRequest(null, replyTo, + ABIVersion.BORON, ABIVersion.current()), DEAD_TIMEOUT)); + }).thenApply(response -> { + if (response instanceof RequestFailure) { + final RequestFailure failure = (RequestFailure) response; + LOG.debug("Connect request failed {}", failure, failure.getCause()); + throw Throwables.propagate(failure.getCause()); + } - actorContext.findPrimaryShardAsync(shardName).onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable t, final PrimaryShardInfo v) { - if (t != null) { - ret.completeExceptionally(t); - } else { - ret.complete(createBackendInfo(v, shardName, cookie)); - } + LOG.debug("Resolved backend information to {}", response); + + Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response); + final ConnectClientSuccess success = (ConnectClientSuccess) response; + + return new ShardBackendInfo(success.getBackend(), + nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), + success.getDataTree(), success.getMaxMessages()); + }).whenComplete((info, t) -> { + if (t != null) { + ret.completeExceptionally(t); + } else { + ret.complete(info); } - }, DIRECT_EXECUTION_CONTEXT); + }); LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); return ret; } - - private static ABIVersion toABIVersion(final short version) { - switch (version) { - case DataStoreVersions.BORON_VERSION: - return ABIVersion.BORON; - } - - throw new IllegalArgumentException("Unsupported version " + version); - } - - private static ShardBackendInfo createBackendInfo(final Object result, final String shardName, final Long cookie) { - Preconditions.checkArgument(result instanceof PrimaryShardInfo); - final PrimaryShardInfo info = (PrimaryShardInfo) result; - - LOG.debug("Creating backend information for {}", info); - return new ShardBackendInfo(info.getPrimaryShardActor().resolveOne(DEAD_TIMEOUT).value().get().get(), - toABIVersion(info.getPrimaryShardVersion()), shardName, UnsignedLong.fromLongBits(cookie), - info.getLocalShardDataTree()); - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 13b7fb4f56..9fb1b89580 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -95,21 +95,21 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { // Make sure we send any modifications before issuing a read ensureFlushedBuider(); - client().sendRequest(nextSequence(), request, completer); + client().sendRequest(request, completer); return MappingCheckedFuture.create(future, ReadFailedException.MAPPER); } @Override CheckedFuture doExists(final YangInstanceIdentifier path) { final SettableFuture future = SettableFuture.create(); - return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), client().self(), path), + return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), client().self(), path), t -> completeExists(future, t), future); } @Override CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { final SettableFuture>> future = SettableFuture.create(); - return sendReadRequest(new ReadTransactionRequest(getIdentifier(), client().self(), path), + return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), client().self(), path), t -> completeRead(future, t), future); } @@ -122,6 +122,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private void ensureInitializedBuider() { if (!builderBusy) { + builder.setSequence(nextSequence()); builderBusy = true; } } @@ -133,7 +134,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void flushBuilder() { - client().sendRequest(nextSequence(), builder.build(), this::completeModify); + client().sendRequest(builder.build(), this::completeModify); builderBusy = false; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ShardBackendInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ShardBackendInfo.java index 6bb7072ea3..92a213d1bf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ShardBackendInfo.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ShardBackendInfo.java @@ -30,9 +30,9 @@ final class ShardBackendInfo extends BackendInfo { private final UnsignedLong cookie; private final String shardName; - ShardBackendInfo(final ActorRef actor, final ABIVersion version, final String shardName, final UnsignedLong cookie, - final Optional dataTree) { - super(actor, version); + ShardBackendInfo(final ActorRef actor, final long sessionId, final ABIVersion version, final String shardName, + final UnsignedLong cookie, final Optional dataTree, final int maxMessages) { + super(actor, sessionId, version, maxMessages); this.shardName = Preconditions.checkNotNull(shardName); this.cookie = Preconditions.checkNotNull(cookie); this.dataTree = Preconditions.checkNotNull(dataTree);