From: Robert Varga Date: Thu, 30 Jun 2016 09:57:38 +0000 (+0200) Subject: BUG-5280: introduce request/response Envelope X-Git-Tag: release/boron~79 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c9d61ee66367d819319bb8ccfa9f9b0555264d86 BUG-5280: introduce request/response Envelope This is a follow-up patch to move sequence information from request/response structure and making it part of an Envelope, which is allocated by the SequencedQueue. Change-Id: I341118850d9c5835bab0b491f59b95264f31e5ef Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbortLocalTransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbortLocalTransactionRequest.java index 46351fae66..104530651d 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbortLocalTransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbortLocalTransactionRequest.java @@ -23,22 +23,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public final class AbortLocalTransactionRequest extends AbstractLocalTransactionRequest { private static final long serialVersionUID = 1L; - public AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence, + public AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo) { - this(identifier, sequence, 0, replyTo); - } - - AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence, - final long retry, final @Nonnull ActorRef replyTo) { - super(identifier, sequence, retry, replyTo); - } - - private AbortLocalTransactionRequest(final @Nonnull AbortLocalTransactionRequest request, final long retry) { - super(request, retry); - } - - @Override - protected AbortLocalTransactionRequest cloneAsRetry(final long retry) { - return new AbortLocalTransactionRequest(this, retry); + super(identifier, replyTo); } } \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java index 9579ba497d..7730c5d615 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalTransactionRequest.java @@ -23,13 +23,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier abstract class AbstractLocalTransactionRequest> extends TransactionRequest { private static final long serialVersionUID = 1L; - AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final long sequence, final long retry, - final ActorRef replyTo) { - super(identifier, sequence, retry, replyTo); - } - - AbstractLocalTransactionRequest(final T request, final long retry) { - super(request, retry); + AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) { + super(identifier, replyTo); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java index ddc5fef25f..d23430fa03 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java @@ -32,9 +32,9 @@ public abstract class AbstractReadTransactionRequest { private static final long serialVersionUID = 1L; - public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence, - final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) { - this(identifier, sequence, 0, replyTo, path); - } - - ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence, - final long retry, final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) { - super(identifier, sequence, retry, replyTo, path); + public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo, + final @Nonnull YangInstanceIdentifier path) { + super(identifier, replyTo, path); } private ExistsTransactionRequest(final ExistsTransactionRequest request, final ABIVersion version) { super(request, version); } - private ExistsTransactionRequest(final ExistsTransactionRequest request, final long retry) { - super(request, retry); - } - @Override protected ExistsTransactionRequest cloneAsVersion(final ABIVersion version) { return new ExistsTransactionRequest(this, version); @@ -50,9 +41,4 @@ public final class ExistsTransactionRequest extends AbstractReadTransactionReque protected ExistsTransactionRequestProxyV1 externalizableProxy(final ABIVersion version) { return new ExistsTransactionRequestProxyV1(this); } - - @Override - protected ExistsTransactionRequest cloneAsRetry(final long retry) { - return new ExistsTransactionRequest(this, retry); - } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java index ee6d3fd73a..4f5bd1c42f 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionRequestProxyV1.java @@ -29,8 +29,8 @@ final class ExistsTransactionRequestProxyV1 extends AbstractReadTransactionReque } @Override - ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence, - final long retry, final ActorRef replyTo, final YangInstanceIdentifier path) { - return new ExistsTransactionRequest(target, sequence, retry, replyTo, path); + ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo, + final YangInstanceIdentifier path) { + return new ExistsTransactionRequest(target, replyTo, path); } } \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccess.java index 5f081c77cd..33167921a3 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ExistsTransactionSuccess.java @@ -23,13 +23,8 @@ public final class ExistsTransactionSuccess extends TransactionSuccess modifications; private final PersistenceProtocol protocol; - private ModifyTransactionRequest(final ModifyTransactionRequest request, final long retry) { - super(request, retry); - this.modifications = request.modifications; - this.protocol = request.protocol; - } - - ModifyTransactionRequest(final TransactionIdentifier target, final long sequence, final long retry, - final ActorRef replyTo, final List modifications, final PersistenceProtocol protocol) { - super(target, sequence, retry, replyTo); + ModifyTransactionRequest(final TransactionIdentifier target, final ActorRef replyTo, + final List modifications, final PersistenceProtocol protocol) { + super(target, replyTo); this.modifications = ImmutableList.copyOf(modifications); this.protocol = protocol; } @@ -59,14 +53,8 @@ public final class ModifyTransactionRequest extends TransactionRequest { private static final long serialVersionUID = 1L; - public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence, - final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) { - this(identifier, sequence, 0, replyTo, path); - } - - ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence, - final long retry, final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) { - super(identifier, sequence, retry, replyTo, path); + public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo, + final @Nonnull YangInstanceIdentifier path) { + super(identifier, replyTo, path); } private ReadTransactionRequest(final ReadTransactionRequest request, final ABIVersion version) { super(request, version); } - private ReadTransactionRequest(final ReadTransactionRequest request, final long retry) { - super(request, retry); - } - - @Override - protected ReadTransactionRequest cloneAsRetry(final long retry) { - return new ReadTransactionRequest(this, retry); - } - @Override protected ReadTransactionRequest cloneAsVersion(final ABIVersion version) { return new ReadTransactionRequest(this, version); diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java index 461b1c2c07..ae0f6f6470 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionRequestProxyV1.java @@ -29,8 +29,8 @@ final class ReadTransactionRequestProxyV1 extends AbstractReadTransactionRequest } @Override - ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence, final long retry, - final ActorRef replyTo, final YangInstanceIdentifier path) { - return new ReadTransactionRequest(target, sequence, retry, replyTo, path); + ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo, + final YangInstanceIdentifier path) { + return new ReadTransactionRequest(target, replyTo, path); } } \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java index e83adee10e..45e7631580 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccess.java @@ -24,14 +24,8 @@ public final class ReadTransactionSuccess extends TransactionSuccess> data; - public ReadTransactionSuccess(final TransactionIdentifier identifier, final long sequence, - final Optional> data) { - this(identifier, sequence, 0, data); - } - - ReadTransactionSuccess(final TransactionIdentifier identifier, final long sequence, final long retry, - final Optional> data) { - super(identifier, sequence, retry); + public ReadTransactionSuccess(final TransactionIdentifier identifier, final Optional> data) { + super(identifier); this.data = Preconditions.checkNotNull(data); } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java index 13e1cf0d5a..923284278f 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java @@ -63,8 +63,7 @@ final class ReadTransactionSuccessProxyV1 extends AbstractTransactionSuccessProx } @Override - protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence, - final long retry) { - return new ReadTransactionSuccess(target, sequence, retry, data); + protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target) { + return new ReadTransactionSuccess(target, data); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java index 7dc4b19039..e31d75f077 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java @@ -22,9 +22,8 @@ public final class TransactionCanCommitSuccess extends TransactionSuccess { private static final long serialVersionUID = 1L; - TransactionFailure(final TransactionIdentifier target, final long sequence, final long retry, - final RequestException cause) { - super(target, sequence, retry, cause); + TransactionFailure(final TransactionIdentifier target, final RequestException cause) { + super(target, cause); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailureProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailureProxyV1.java index 1a35ec1165..fd99749bed 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailureProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionFailureProxyV1.java @@ -31,9 +31,8 @@ final class TransactionFailureProxyV1 extends AbstractRequestFailureProxy> extends Request { private static final long serialVersionUID = 1L; - TransactionRequest(final TransactionIdentifier identifier, final long sequence, final long retry, - final ActorRef replyTo) { - super(identifier, sequence, retry, replyTo); + TransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) { + super(identifier, replyTo); } TransactionRequest(final T request, final ABIVersion version) { super(request, version); } - TransactionRequest(final T request, final long retry) { - super(request, retry); - } - @Override public final TransactionFailure toRequestFailure(final RequestException cause) { - return new TransactionFailure(getTarget(), getSequence(), getRetry(), cause); + return new TransactionFailure(getTarget(), cause); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java index ff56340a9e..daa4ba1b8b 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java @@ -24,8 +24,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public abstract class TransactionSuccess> extends RequestSuccess { private static final long serialVersionUID = 1L; - TransactionSuccess(final TransactionIdentifier identifier, final long sequence, final long retry) { - super(identifier, sequence, retry); + TransactionSuccess(final TransactionIdentifier identifier) { + super(identifier); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java new file mode 100644 index 0000000000..89458b8a07 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.concepts; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.opendaylight.yangtools.concepts.WritableObjects; + +abstract class AbstractEnvelopeProxy> implements Externalizable { + private static final long serialVersionUID = 1L; + private T message; + private long sequence; + private long retry; + + public AbstractEnvelopeProxy() { + // for Externalizable + } + + AbstractEnvelopeProxy(final Envelope envelope) { + message = envelope.getMessage(); + sequence = envelope.getSequence(); + retry = envelope.getRetry(); + } + + @Override + public final void writeExternal(final ObjectOutput out) throws IOException { + WritableObjects.writeLongs(out, sequence, retry); + out.writeObject(message); + } + + @SuppressWarnings("unchecked") + @Override + public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + final byte header = WritableObjects.readLongHeader(in); + sequence = WritableObjects.readFirstLong(in, header); + retry = WritableObjects.readSecondLong(in, header); + message = (T) in.readObject(); + } + + abstract Envelope createEnvelope(T message, long sequence, long retry); + + final Object readResolve() { + return createEnvelope(message, sequence, retry); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractMessageProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractMessageProxy.java index 04e70379a1..4b60aecefa 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractMessageProxy.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractMessageProxy.java @@ -15,7 +15,6 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import javax.annotation.Nonnull; import org.opendaylight.yangtools.concepts.WritableIdentifier; -import org.opendaylight.yangtools.concepts.WritableObjects; /** * Abstract Externalizable proxy for use with {@link Message} subclasses. @@ -28,8 +27,6 @@ import org.opendaylight.yangtools.concepts.WritableObjects; abstract class AbstractMessageProxy> implements Externalizable { private static final long serialVersionUID = 1L; private T target; - private long sequence; - private long retry; protected AbstractMessageProxy() { // For Externalizable @@ -37,29 +34,22 @@ abstract class AbstractMessageProxy> extends AbstractEnvelopeProxy { + private static final long serialVersionUID = 1L; + + public AbstractResponseEnvelopeProxy() { + // for Externalizable + } + + AbstractResponseEnvelopeProxy(final ResponseEnvelope envelope) { + super(envelope); + } + + @Override + abstract ResponseEnvelope createEnvelope(T message, long sequence, long retry); +} \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseProxy.java index 08c31356ac..8262bb29f9 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseProxy.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseProxy.java @@ -31,9 +31,9 @@ abstract class AbstractResponseProxy> implements Immutable, Serializable { + private static final long serialVersionUID = 1L; + + private final T message; + private final long sequence; + private final long retry; + + Envelope(final T message, final long sequence, final long retry) { + this.message = Preconditions.checkNotNull(message); + this.sequence = sequence; + this.retry = retry; + } + + /** + * Get the enclosed message + * + * @return enclose message + */ + public T getMessage() { + return message; + } + + /** + * Get the message sequence of this envelope. + * + * @return Message sequence + */ + public long getSequence() { + return sequence; + } + + /** + * Get the message retry counter. + * + * @return Retry counter + */ + public long getRetry() { + return retry; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(Envelope.class).add("sequence", Long.toUnsignedString(sequence, 16)). + add("retry", retry).add("message", message).toString(); + } + + final Object writeReplace() { + return createProxy(); + } + + abstract AbstractEnvelopeProxy createProxy(); +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java new file mode 100644 index 0000000000..8da54f5954 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.concepts; + +public final class FailureEnvelope extends ResponseEnvelope> { + private static final long serialVersionUID = 1L; + + public FailureEnvelope(final RequestFailure message, final long sequence, final long retry) { + super(message, sequence, retry); + } + + @Override + FailureEnvelopeProxy createProxy() { + return new FailureEnvelopeProxy(this); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java new file mode 100644 index 0000000000..a14e69875f --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.concepts; + +final class FailureEnvelopeProxy extends AbstractResponseEnvelopeProxy> { + private static final long serialVersionUID = 1L; + + public FailureEnvelopeProxy() { + // for Externalizable + } + + FailureEnvelopeProxy(final FailureEnvelope envelope) { + super(envelope); + } + + @Override + FailureEnvelope createEnvelope(final RequestFailure message, final long sequence, final long retry) { + return new FailureEnvelope(message, sequence, retry); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Message.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Message.java index 87b0e6ef34..5070b7cf71 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Message.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/Message.java @@ -51,27 +51,19 @@ public abstract class Message> { + private static final long serialVersionUID = 1L; + + public RequestEnvelope(final Request message, final long sequence, final long retry) { + super(message, sequence, retry); + } + + @Override + RequestEnvelopeProxy createProxy() { + return new RequestEnvelopeProxy(this); + } + + /** + * Respond to this envelope with a {@link RequestFailure} caused by specified {@link RequestException}. + * + * @param cause Cause of this {@link RequestFailure} + * @throws NullPointerException if cause is null + */ + public void sendFailure(final RequestException cause) { + sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSequence(), getRetry())); + } + + /** + * Respond to this envelope with a {@link RequestSuccess}. + * + * @param success Successful response + * @throws NullPointerException if success is null + */ + public void sendSuccess(final RequestSuccess success) { + sendResponse(new SuccessEnvelope(success, getSequence(), getRetry())); + } + + private void sendResponse(final ResponseEnvelope envelope) { + getMessage().getReplyTo().tell(envelope, ActorRef.noSender()); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java new file mode 100644 index 0000000000..1b499d003f --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelopeProxy.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.concepts; + +final class RequestEnvelopeProxy extends AbstractEnvelopeProxy> { + private static final long serialVersionUID = 1L; + + public RequestEnvelopeProxy() { + // for Externalizable + } + + RequestEnvelopeProxy(final RequestEnvelope envelope) { + super(envelope); + } + + @Override + RequestEnvelope createEnvelope(final Request message, final long sequence, final long retry) { + return new RequestEnvelope(message, sequence, retry); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestFailure.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestFailure.java index 7fd9009517..301b54ee1f 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestFailure.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestFailure.java @@ -32,9 +32,8 @@ public abstract class RequestFailure> extends Message { private static final long serialVersionUID = 1L; - Response(final @Nonnull C response, final @Nonnull ABIVersion version) { - super(response, version); - } - - Response(final @Nonnull T target, final long sequence, final long retry) { - super(target, sequence, retry); + Response(final @Nonnull T target) { + super(target); } - @Override - protected final C cloneAsRetry(final long retry) { - throw new UnsupportedOperationException("Responses do not support retries"); + Response(final @Nonnull C response, final @Nonnull ABIVersion version) { + super(response, version); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java new file mode 100644 index 0000000000..a3625f665a --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.concepts; + +public abstract class ResponseEnvelope> extends Envelope { + private static final long serialVersionUID = 1L; + + ResponseEnvelope(final T message, final long sequence, final long retry) { + super(message, sequence, retry); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java new file mode 100644 index 0000000000..be37dae110 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.concepts; + +public final class SuccessEnvelope extends ResponseEnvelope> { + private static final long serialVersionUID = 1L; + + public SuccessEnvelope(final RequestSuccess message, final long sequence, final long retry) { + super(message, sequence, retry); + } + + @Override + SuccessEnvelopeProxy createProxy() { + return new SuccessEnvelopeProxy(this); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java new file mode 100644 index 0000000000..9f0fc2f325 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.concepts; + +final class SuccessEnvelopeProxy extends AbstractResponseEnvelopeProxy> { + private static final long serialVersionUID = 1L; + + public SuccessEnvelopeProxy() { + // for Externalizable + } + + SuccessEnvelopeProxy(final SuccessEnvelope envelope) { + super(envelope); + } + + @Override + SuccessEnvelope createEnvelope(final RequestSuccess message, final long sequence, final long retry) { + return new SuccessEnvelope(message, sequence, retry); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java index c3df604ef2..110a414939 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java @@ -14,10 +14,11 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; -import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; +import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.yangtools.concepts.Identifiable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,21 +50,22 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior) command); + if (command instanceof SuccessEnvelope) { + return onRequestSuccess((SuccessEnvelope) command); } - if (command instanceof RequestFailure) { - return onRequestFailure((RequestFailure) command); + if (command instanceof FailureEnvelope) { + return onRequestFailure((FailureEnvelope) command); } return onCommand(command); } - private ClientActorBehavior onRequestSuccess(final RequestSuccess success) { - return context().completeRequest(this, success); + private ClientActorBehavior onRequestSuccess(final SuccessEnvelope command) { + return context().completeRequest(this, command); } - private ClientActorBehavior onRequestFailure(final RequestFailure failure) { + private ClientActorBehavior onRequestFailure(final FailureEnvelope command) { + final RequestFailure failure = command.getMessage(); final RequestException cause = failure.getCause(); if (cause instanceof RetiredGenerationException) { LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause); @@ -73,22 +75,23 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior request, final RequestCallback callback) { + private ClientActorBehavior doSendRequest(final long sequence, final TransactionRequest request, + final RequestCallback callback) { // Get or allocate queue for the request final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie()); // Note this is a tri-state return and can be null - final Optional result = queue.enqueueRequest(request, callback); + final Optional result = queue.enqueueRequest(sequence, request, callback); if (result == null) { // Happy path: we are done here return this; @@ -188,7 +191,7 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior request, final RequestCallback callback) { - context().executeInActor(cb -> cb.doSendRequest(request, callback)); + public final void sendRequest(final long sequence, final TransactionRequest request, final RequestCallback callback) { + context().executeInActor(cb -> cb.doSendRequest(sequence, request, callback)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java index 9f4fd137a4..530d7e277f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java @@ -19,7 +19,7 @@ import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestException; -import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.WritableIdentifier; @@ -94,8 +94,8 @@ public class ClientActorContext extends AbstractClientActorContext implements Id queues.remove(queue.getCookie(), queue); } - ClientActorBehavior completeRequest(final ClientActorBehavior current, final Response response) { - final WritableIdentifier id = response.getTarget(); + ClientActorBehavior completeRequest(final ClientActorBehavior current, final ResponseEnvelope response) { + final WritableIdentifier id = response.getMessage().getTarget(); // FIXME: this will need to be updated for other Request/Response types to extract cookie Preconditions.checkArgument(id instanceof TransactionIdentifier); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueue.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueue.java index 8cae0e133a..50c600f52d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueue.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueue.java @@ -20,7 +20,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestException; -import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; @@ -105,12 +105,13 @@ final class SequencedQueue { * @param callback Callback to be invoked * @return Optional duration with semantics described above. */ - @Nullable Optional enqueueRequest(final Request request, final RequestCallback callback) { + @Nullable Optional enqueueRequest(final long sequence, final Request request, + final RequestCallback callback) { + checkNotClosed(); + final long now = ticker.read(); - final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now); + final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now); - // We could have check first, but argument checking needs to happen first - checkNotClosed(); queue.add(e); LOG.debug("Enqueued request {} to queue {}", request, this); @@ -127,7 +128,7 @@ final class SequencedQueue { } } - ClientActorBehavior complete(final ClientActorBehavior current, final Response response) { + ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope response) { // Responses to different targets may arrive out of order, hence we use an iterator final Iterator it = queue.iterator(); while (it.hasNext()) { @@ -136,7 +137,7 @@ final class SequencedQueue { lastProgress = ticker.read(); it.remove(); LOG.debug("Completing request {} with {}", e, response); - return e.complete(response); + return e.complete(response.getMessage()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntry.java index 54940cd8ff..b99b1a3ceb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntry.java @@ -12,8 +12,10 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import java.util.Optional; import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,12 +28,12 @@ import org.slf4j.LoggerFactory; */ final class SequencedQueueEntry { private static final class LastTry { - final Request request; final long timeTicks; + final long retry; - LastTry(final Request request, final long when) { - this.request = Preconditions.checkNotNull(request); - this.timeTicks = when; + LastTry(final long retry, final long timeTicks) { + this.retry = retry; + this.timeTicks = timeTicks; } } @@ -40,26 +42,28 @@ final class SequencedQueueEntry { private final Request request; private final RequestCallback callback; private final long enqueuedTicks; + private final long sequence; private Optional lastTry = Optional.empty(); - SequencedQueueEntry(final Request request, final RequestCallback callback, final long now) { + SequencedQueueEntry(final Request request, final long sequence, final RequestCallback callback, + final long now) { this.request = Preconditions.checkNotNull(request); this.callback = Preconditions.checkNotNull(callback); this.enqueuedTicks = now; + this.sequence = sequence; } long getSequence() { - return request.getSequence(); + return sequence; } - boolean acceptsResponse(final Response response) { - return getSequence() == response.getSequence() && request.getTarget().equals(response.getTarget()); + boolean acceptsResponse(final ResponseEnvelope response) { + return getSequence() == response.getSequence() && request.getTarget().equals(response.getMessage().getTarget()); } long getCurrentTry() { - final Request req = lastTry.isPresent() ? lastTry.get().request : request; - return req.getRetry(); + return lastTry.isPresent() ? lastTry.get().retry : 0; } ClientActorBehavior complete(final Response response) { @@ -73,20 +77,16 @@ final class SequencedQueueEntry { } boolean isTimedOut(final long now, final long timeoutNanos) { - final Request req; final long elapsed; if (lastTry.isPresent()) { - final LastTry t = lastTry.get(); - elapsed = now - t.timeTicks; - req = t.request; + elapsed = now - lastTry.get().timeTicks; } else { elapsed = now - enqueuedTicks; - req = request; } if (elapsed >= timeoutNanos) { - LOG.debug("Request {} timed out after {}ns", req, elapsed); + LOG.debug("Request {} timed out after {}ns", request, elapsed); return true; } else { return false; @@ -94,13 +94,13 @@ final class SequencedQueueEntry { } void retransmit(final BackendInfo backend, final long now) { - final Request nextTry = lastTry.isPresent() ? lastTry.get().request.incrementRetry() : request; - final Request toSend = nextTry.toVersion(backend.getVersion()); - final ActorRef actor = backend.getActor(); + final long retry = lastTry.isPresent() ? lastTry.get().retry + 1 : 0; + final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), sequence, retry); + final ActorRef actor = backend.getActor(); LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor); actor.tell(toSend, ActorRef.noSender()); - lastTry = Optional.of(new LastTry(toSend, now)); + lastTry = Optional.of(new LastTry(retry, now)); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntryTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntryTest.java index 68a665e445..86e053155c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntryTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntryTest.java @@ -31,6 +31,7 @@ import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy; import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy; import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; @@ -47,8 +48,8 @@ public class SequencedQueueEntryTest { private static class MockFailure extends RequestFailure { private static final long serialVersionUID = 1L; - MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) { - super(target, sequence, retry, cause); + MockFailure(final WritableIdentifier target, final RequestException cause) { + super(target, cause); } @Override @@ -65,18 +66,13 @@ public class SequencedQueueEntryTest { private static class MockRequest extends Request { private static final long serialVersionUID = 1L; - MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) { - super(target, sequence, 0, replyTo); - } - - - MockRequest(final MockRequest request, final long retry) { - super(request, retry); + MockRequest(final WritableIdentifier target, final ActorRef replyTo) { + super(target, replyTo); } @Override public RequestFailure toRequestFailure(final RequestException cause) { - return new MockFailure(getTarget(), getSequence(), getRetry(), cause); + return new MockFailure(getTarget(), cause); } @Override @@ -88,11 +84,6 @@ public class SequencedQueueEntryTest { protected MockRequest cloneAsVersion(final ABIVersion version) { return this; } - - @Override - protected MockRequest cloneAsRetry(final long retry) { - return new MockRequest(this, retry); - } }; @Mock @@ -137,10 +128,10 @@ public class SequencedQueueEntryTest { mockActor = TestProbe.apply(actorSystem); mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current()); - mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo); + mockRequest = new MockRequest(mockIdentifier, mockReplyTo); mockResponse = mockRequest.toRequestFailure(mockCause); - entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read()); + entry = new SequencedQueueEntry(mockRequest, 0, mockCallback, ticker.read()); } @After @@ -150,7 +141,7 @@ public class SequencedQueueEntryTest { @Test public void testGetSequence() { - assertEquals(mockRequest.getSequence(), entry.getSequence()); + assertEquals(0, entry.getSequence()); } @Test @@ -207,9 +198,11 @@ public class SequencedQueueEntryTest { } private static void assertRequestEquals(final Request expected, final Object o) { - final Request actual = (Request) o; - assertEquals(expected.getRetry(), actual.getRetry()); - assertEquals(expected.getSequence(), actual.getSequence()); - assertEquals(expected.getTarget(), actual.getTarget()); + assertTrue(o instanceof RequestEnvelope); + + final RequestEnvelope actual = (RequestEnvelope) o; + assertEquals(0, actual.getRetry()); + assertEquals(0, actual.getSequence()); + assertEquals(expected.getTarget(), actual.getMessage().getTarget()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueTest.java index f1caeb57fd..176265036a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueTest.java @@ -35,10 +35,11 @@ import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy; import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy; +import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; -import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.common.actor.TestTicker; import org.opendaylight.yangtools.concepts.WritableIdentifier; import scala.concurrent.duration.FiniteDuration; @@ -52,8 +53,8 @@ public class SequencedQueueTest { private static class MockFailure extends RequestFailure { private static final long serialVersionUID = 1L; - MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) { - super(target, sequence, retry, cause); + MockFailure(final WritableIdentifier target, final RequestException cause) { + super(target, cause); } @Override @@ -70,18 +71,13 @@ public class SequencedQueueTest { private static class MockRequest extends Request { private static final long serialVersionUID = 1L; - MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) { - super(target, sequence, 0, replyTo); - } - - - MockRequest(final MockRequest request, final long retry) { - super(request, retry); + MockRequest(final WritableIdentifier target, final ActorRef replyTo) { + super(target, replyTo); } @Override public RequestFailure toRequestFailure(final RequestException cause) { - return new MockFailure(getTarget(), getSequence(), getRetry(), cause); + return new MockFailure(getTarget(), cause); } @Override @@ -93,11 +89,6 @@ public class SequencedQueueTest { protected MockRequest cloneAsVersion(final ABIVersion version) { return this; } - - @Override - protected MockRequest cloneAsRetry(final long retry) { - return new MockRequest(this, retry); - } }; @Mock @@ -115,7 +106,8 @@ public class SequencedQueueTest { private BackendInfo mockBackendInfo; private MockRequest mockRequest; private MockRequest mockRequest2; - private Response mockResponse; + private RequestFailure mockResponse; + private FailureEnvelope mockResponseEnvelope; private Long mockCookie; private static ActorSystem actorSystem; @@ -144,9 +136,10 @@ public class SequencedQueueTest { mockActor = TestProbe.apply(actorSystem); mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current()); - mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo); - mockRequest2 = new MockRequest(mockIdentifier, mockRequest.getSequence() + 1, mockReplyTo); + mockRequest = new MockRequest(mockIdentifier, mockReplyTo); + mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo); mockResponse = mockRequest.toRequestFailure(mockCause); + mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0); mockCookie = ThreadLocalRandom.current().nextLong(); queue = new SequencedQueue(mockCookie, ticker); @@ -174,7 +167,7 @@ public class SequencedQueueTest { queue.close(); // Kaboom - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); } @Test @@ -185,7 +178,7 @@ public class SequencedQueueTest { @Test public void testPoison() { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); queue.poison(mockCause); final ArgumentCaptor captor = ArgumentCaptor.forClass(MockFailure.class); @@ -199,7 +192,7 @@ public class SequencedQueueTest { queue.poison(mockCause); // Kaboom - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); } @Test @@ -210,7 +203,7 @@ public class SequencedQueueTest { @Test public void testEnqueueRequestNeedsBackend() { - final Optional ret = queue.enqueueRequest(mockRequest, mockCallback); + final Optional ret = queue.enqueueRequest(0, mockRequest, mockCallback); assertNotNull(ret); assertFalse(ret.isPresent()); @@ -232,7 +225,7 @@ public class SequencedQueueTest { @Test public void testSetBackendWithNoResolution() { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); final CompletableFuture proof = new CompletableFuture<>(); final Optional ret = queue.setBackendInfo(proof, mockBackendInfo); @@ -242,7 +235,7 @@ public class SequencedQueueTest { @Test public void testSetBackendWithWrongProof() { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); final CompletableFuture proof = new CompletableFuture<>(); assertTrue(queue.expectProof(proof)); @@ -260,7 +253,7 @@ public class SequencedQueueTest { @Test public void testSetbackedWithRequestsNoTimer() { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); final CompletableFuture proof = new CompletableFuture<>(); assertTrue(queue.expectProof(proof)); @@ -270,17 +263,17 @@ public class SequencedQueueTest { assertNotNull(ret); assertTrue(ret.isPresent()); - assertTransmit(mockRequest); + assertTransmit(mockRequest, 0); } @Test public void testEnqueueRequestNeedsTimer() { setupBackend(); - final Optional ret = queue.enqueueRequest(mockRequest, mockCallback); + final Optional ret = queue.enqueueRequest(0, mockRequest, mockCallback); assertNotNull(ret); assertTrue(ret.isPresent()); - assertTransmit(mockRequest); + assertTransmit(mockRequest, 0); } @Test @@ -288,15 +281,15 @@ public class SequencedQueueTest { setupBackend(); // First request - Optional ret = queue.enqueueRequest(mockRequest, mockCallback); + Optional ret = queue.enqueueRequest(0, mockRequest, mockCallback); assertNotNull(ret); assertTrue(ret.isPresent()); - assertTransmit(mockRequest); + assertTransmit(mockRequest, 0); // Second request, no timer fired - ret = queue.enqueueRequest(mockRequest2, mockCallback); + ret = queue.enqueueRequest(1, mockRequest2, mockCallback); assertNull(ret); - assertTransmit(mockRequest2); + assertTransmit(mockRequest2, 1); } @Test @@ -307,14 +300,14 @@ public class SequencedQueueTest { @Test public void testRunTimeoutWithoutShift() throws NoProgressException { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); final boolean ret = queue.runTimeout(); assertFalse(ret); } @Test public void testRunTimeoutWithTimeoutLess() throws NoProgressException { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1); @@ -324,7 +317,7 @@ public class SequencedQueueTest { @Test public void testRunTimeoutWithTimeoutExact() throws NoProgressException { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS); @@ -334,7 +327,7 @@ public class SequencedQueueTest { @Test public void testRunTimeoutWithTimeoutMore() throws NoProgressException { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1); @@ -344,7 +337,7 @@ public class SequencedQueueTest { @Test(expected=NoProgressException.class) public void testRunTimeoutWithoutProgressExact() throws NoProgressException { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS); @@ -354,7 +347,7 @@ public class SequencedQueueTest { @Test(expected=NoProgressException.class) public void testRunTimeoutWithoutProgressMore() throws NoProgressException { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1); @@ -382,31 +375,31 @@ public class SequencedQueueTest { @Test public void testCompleteEmpty() { - final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse); + final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope); assertSame(mockBehavior, ret); verifyNoMoreInteractions(mockCallback); } @Test public void testCompleteSingle() { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); - ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse); + ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope); verify(mockCallback).complete(mockResponse); assertSame(mockBehavior, ret); - ret = queue.complete(mockBehavior, mockResponse); + ret = queue.complete(mockBehavior, mockResponseEnvelope); assertSame(mockBehavior, ret); verifyNoMoreInteractions(mockCallback); } @Test public void testCompleteNull() { - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); doReturn(null).when(mockCallback).complete(mockResponse); - ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse); + ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope); verify(mockCallback).complete(mockResponse); assertNull(ret); } @@ -415,11 +408,11 @@ public class SequencedQueueTest { public void testProgressRecord() throws NoProgressException { setupBackend(); - queue.enqueueRequest(mockRequest, mockCallback); + queue.enqueueRequest(0, mockRequest, mockCallback); ticker.increment(10); - queue.enqueueRequest(mockRequest2, mockCallback); - queue.complete(mockBehavior, mockResponse); + queue.enqueueRequest(1, mockRequest2, mockCallback); + queue.complete(mockBehavior, mockResponseEnvelope); ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11); assertTrue(queue.runTimeout()); @@ -434,15 +427,17 @@ public class SequencedQueueTest { assertFalse(mockActor.msgAvailable()); } - private void assertTransmit(final Request expected) { + private void assertTransmit(final Request expected, final long sequence) { assertTrue(mockActor.msgAvailable()); - assertRequestEquals(expected, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS))); + assertRequestEquals(expected, sequence, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS))); } - private static void assertRequestEquals(final Request expected, final Object o) { - final Request actual = (Request) o; - assertEquals(expected.getRetry(), actual.getRetry()); - assertEquals(expected.getSequence(), actual.getSequence()); - assertEquals(expected.getTarget(), actual.getTarget()); + private static void assertRequestEquals(final Request expected, final long sequence, final Object o) { + assertTrue(o instanceof RequestEnvelope); + + final RequestEnvelope actual = (RequestEnvelope) o; + assertEquals(0, actual.getRetry()); + assertEquals(sequence, actual.getSequence()); + assertSame(expected, actual.getMessage()); } }