public final class AbortLocalTransactionRequest extends AbstractLocalTransactionRequest<AbortLocalTransactionRequest> {
private static final long serialVersionUID = 1L;
- public AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
+ public AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
final @Nonnull ActorRef replyTo) {
- this(identifier, sequence, 0, replyTo);
- }
-
- AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
- final long retry, final @Nonnull ActorRef replyTo) {
- super(identifier, sequence, retry, replyTo);
- }
-
- private AbortLocalTransactionRequest(final @Nonnull AbortLocalTransactionRequest request, final long retry) {
- super(request, retry);
- }
-
- @Override
- protected AbortLocalTransactionRequest cloneAsRetry(final long retry) {
- return new AbortLocalTransactionRequest(this, retry);
+ super(identifier, replyTo);
}
}
\ No newline at end of file
abstract class AbstractLocalTransactionRequest<T extends AbstractLocalTransactionRequest<T>> extends TransactionRequest<T> {
private static final long serialVersionUID = 1L;
- AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final long sequence, final long retry,
- final ActorRef replyTo) {
- super(identifier, sequence, retry, replyTo);
- }
-
- AbstractLocalTransactionRequest(final T request, final long retry) {
- super(request, retry);
+ AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) {
+ super(identifier, replyTo);
}
@Override
private static final long serialVersionUID = 1L;
private final YangInstanceIdentifier path;
- AbstractReadTransactionRequest(final TransactionIdentifier identifier, final long sequence, final long retry,
- final ActorRef replyTo, final YangInstanceIdentifier path) {
- super(identifier, sequence, retry, replyTo);
+ AbstractReadTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo,
+ final YangInstanceIdentifier path) {
+ super(identifier, replyTo);
this.path = Preconditions.checkNotNull(path);
}
this.path = request.getPath();
}
- AbstractReadTransactionRequest(final T request, final long retry) {
- super(request, retry);
- this.path = request.getPath();
- }
-
public final @Nonnull YangInstanceIdentifier getPath() {
return path;
}
}
@Override
- protected final T createRequest(final TransactionIdentifier target, final long sequence, final long retry,
- final ActorRef replyTo) {
- return createReadRequest(target, sequence, retry, replyTo, path);
+ protected final T createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+ return createReadRequest(target, replyTo, path);
}
- abstract T createReadRequest(TransactionIdentifier target, long sequence, long retry, ActorRef replyTo,
- YangInstanceIdentifier path);
+ abstract T createReadRequest(TransactionIdentifier target, ActorRef replyTo, YangInstanceIdentifier path);
}
private final DataTreeModification mod;
private final boolean coordinated;
- public CommitLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
+ public CommitLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
final @Nonnull ActorRef replyTo, final @Nonnull DataTreeModification mod, final boolean coordinated) {
- this(identifier, sequence, 0, replyTo, mod, coordinated);
- }
-
- CommitLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
- final long retry, final @Nonnull ActorRef replyTo, final @Nonnull DataTreeModification mod,
- final boolean coordinated) {
- super(identifier, sequence, retry, replyTo);
+ super(identifier, replyTo);
this.mod = Preconditions.checkNotNull(mod);
this.coordinated = coordinated;
}
-
- private CommitLocalTransactionRequest(final CommitLocalTransactionRequest request, final long retry) {
- super(request, retry);
- this.mod = request.mod;
- this.coordinated = request.coordinated;
- }
-
public DataTreeModification getModification() {
return mod;
}
protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
return super.addToStringAttributes(toStringHelper).add("coordinated", coordinated);
}
-
- @Override
- protected CommitLocalTransactionRequest cloneAsRetry(final long retry) {
- return new CommitLocalTransactionRequest(this, retry);
- }
}
\ No newline at end of file
public final class ExistsTransactionRequest extends AbstractReadTransactionRequest<ExistsTransactionRequest> {
private static final long serialVersionUID = 1L;
- public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
- final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
- this(identifier, sequence, 0, replyTo, path);
- }
-
- ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
- final long retry, final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
- super(identifier, sequence, retry, replyTo, path);
+ public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo,
+ final @Nonnull YangInstanceIdentifier path) {
+ super(identifier, replyTo, path);
}
private ExistsTransactionRequest(final ExistsTransactionRequest request, final ABIVersion version) {
super(request, version);
}
- private ExistsTransactionRequest(final ExistsTransactionRequest request, final long retry) {
- super(request, retry);
- }
-
@Override
protected ExistsTransactionRequest cloneAsVersion(final ABIVersion version) {
return new ExistsTransactionRequest(this, version);
protected ExistsTransactionRequestProxyV1 externalizableProxy(final ABIVersion version) {
return new ExistsTransactionRequestProxyV1(this);
}
-
- @Override
- protected ExistsTransactionRequest cloneAsRetry(final long retry) {
- return new ExistsTransactionRequest(this, retry);
- }
}
}
@Override
- ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
- final long retry, final ActorRef replyTo, final YangInstanceIdentifier path) {
- return new ExistsTransactionRequest(target, sequence, retry, replyTo, path);
+ ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo,
+ final YangInstanceIdentifier path) {
+ return new ExistsTransactionRequest(target, replyTo, path);
}
}
\ No newline at end of file
private static final long serialVersionUID = 1L;
private final boolean exists;
- public ExistsTransactionSuccess(final TransactionIdentifier target, final long sequence, final boolean exists) {
- this(target, sequence, 0, exists);
- }
-
- ExistsTransactionSuccess(final TransactionIdentifier target, final long sequence, final long retry,
- final boolean exists) {
- super(target, sequence, retry);
+ public ExistsTransactionSuccess(final TransactionIdentifier target, final boolean exists) {
+ super(target);
this.exists = exists;
}
}
@Override
- protected ExistsTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence,
- final long retry) {
- return new ExistsTransactionSuccess(target, sequence, retry, exists);
+ protected ExistsTransactionSuccess createSuccess(final TransactionIdentifier target) {
+ return new ExistsTransactionSuccess(target, exists);
}
}
private final List<TransactionModification> modifications;
private final PersistenceProtocol protocol;
- private ModifyTransactionRequest(final ModifyTransactionRequest request, final long retry) {
- super(request, retry);
- this.modifications = request.modifications;
- this.protocol = request.protocol;
- }
-
- ModifyTransactionRequest(final TransactionIdentifier target, final long sequence, final long retry,
- final ActorRef replyTo, final List<TransactionModification> modifications, final PersistenceProtocol protocol) {
- super(target, sequence, retry, replyTo);
+ ModifyTransactionRequest(final TransactionIdentifier target, final ActorRef replyTo,
+ final List<TransactionModification> modifications, final PersistenceProtocol protocol) {
+ super(target, replyTo);
this.modifications = ImmutableList.copyOf(modifications);
this.protocol = protocol;
}
return new ModifyTransactionRequestProxyV1(this);
}
- @Override
- protected ModifyTransactionRequest cloneAsRetry(final long retry) {
- return new ModifyTransactionRequest(this, retry);
- }
-
@Override
protected ModifyTransactionRequest cloneAsVersion(final ABIVersion version) {
return this;
}
-
}
private final TransactionIdentifier identifier;
private final ActorRef replyTo;
private PersistenceProtocol protocol = null;
- private long sequence;
public ModifyTransactionRequestBuilder(final TransactionIdentifier identifier, final ActorRef replyTo) {
this.identifier = Preconditions.checkNotNull(identifier);
Preconditions.checkState(protocol != null, "Batch has already been finished");
}
- public void setSequence(final long sequence) {
- checkFinished();
- Preconditions.checkState(modifications.isEmpty(), "Sequence must be set first");
- this.sequence = sequence;
- }
-
public void addModification(final TransactionModification modification) {
checkFinished();
modifications.add(Preconditions.checkNotNull(modification));
@Override
public ModifyTransactionRequest build() {
- final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, sequence, 0, replyTo,
- modifications, protocol);
+ final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, replyTo, modifications, protocol);
modifications.clear();
protocol = null;
- sequence = 0;
return ret;
}
-
}
}
@Override
- protected ModifyTransactionRequest createRequest(final TransactionIdentifier target, final long sequence,
- final long retry, final ActorRef replyTo) {
- return new ModifyTransactionRequest(target, sequence, retry, replyTo, modifications, protocol.orElse(null));
+ protected ModifyTransactionRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+ return new ModifyTransactionRequest(target, replyTo, modifications, protocol.orElse(null));
}
}
public final class ReadTransactionRequest extends AbstractReadTransactionRequest<ReadTransactionRequest> {
private static final long serialVersionUID = 1L;
- public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
- final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
- this(identifier, sequence, 0, replyTo, path);
- }
-
- ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
- final long retry, final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
- super(identifier, sequence, retry, replyTo, path);
+ public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo,
+ final @Nonnull YangInstanceIdentifier path) {
+ super(identifier, replyTo, path);
}
private ReadTransactionRequest(final ReadTransactionRequest request, final ABIVersion version) {
super(request, version);
}
- private ReadTransactionRequest(final ReadTransactionRequest request, final long retry) {
- super(request, retry);
- }
-
- @Override
- protected ReadTransactionRequest cloneAsRetry(final long retry) {
- return new ReadTransactionRequest(this, retry);
- }
-
@Override
protected ReadTransactionRequest cloneAsVersion(final ABIVersion version) {
return new ReadTransactionRequest(this, version);
}
@Override
- ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence, final long retry,
- final ActorRef replyTo, final YangInstanceIdentifier path) {
- return new ReadTransactionRequest(target, sequence, retry, replyTo, path);
+ ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo,
+ final YangInstanceIdentifier path) {
+ return new ReadTransactionRequest(target, replyTo, path);
}
}
\ No newline at end of file
private static final long serialVersionUID = 1L;
private final Optional<NormalizedNode<?, ?>> data;
- public ReadTransactionSuccess(final TransactionIdentifier identifier, final long sequence,
- final Optional<NormalizedNode<?, ?>> data) {
- this(identifier, sequence, 0, data);
- }
-
- ReadTransactionSuccess(final TransactionIdentifier identifier, final long sequence, final long retry,
- final Optional<NormalizedNode<?, ?>> data) {
- super(identifier, sequence, retry);
+ public ReadTransactionSuccess(final TransactionIdentifier identifier, final Optional<NormalizedNode<?, ?>> data) {
+ super(identifier);
this.data = Preconditions.checkNotNull(data);
}
}
@Override
- protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence,
- final long retry) {
- return new ReadTransactionSuccess(target, sequence, retry, data);
+ protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target) {
+ return new ReadTransactionSuccess(target, data);
}
}
private static final long serialVersionUID = 1L;
private final ActorRef cohort;
- public TransactionCanCommitSuccess(final TransactionIdentifier identifier, final long sequence, final long retry,
- final ActorRef cohort) {
- super(identifier, sequence, retry);
+ public TransactionCanCommitSuccess(final TransactionIdentifier identifier, final ActorRef cohort) {
+ super(identifier);
this.cohort = Preconditions.checkNotNull(cohort);
}
}
@Override
- protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence,
- final long retry) {
- return new TransactionCanCommitSuccess(target, sequence, retry, cohort);
+ protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target) {
+ return new TransactionCanCommitSuccess(target, cohort);
}
}
public final class TransactionFailure extends RequestFailure<TransactionIdentifier, TransactionFailure> {
private static final long serialVersionUID = 1L;
- TransactionFailure(final TransactionIdentifier target, final long sequence, final long retry,
- final RequestException cause) {
- super(target, sequence, retry, cause);
+ TransactionFailure(final TransactionIdentifier target, final RequestException cause) {
+ super(target, cause);
}
@Override
}
@Override
- protected TransactionFailure createFailure(final TransactionIdentifier target, final long sequence,
- final long retry, final RequestException cause) {
- return new TransactionFailure(target, sequence, retry, cause);
+ protected TransactionFailure createFailure(final TransactionIdentifier target, final RequestException cause) {
+ return new TransactionFailure(target, cause);
}
@Override
public abstract class TransactionRequest<T extends TransactionRequest<T>> extends Request<TransactionIdentifier, T> {
private static final long serialVersionUID = 1L;
- TransactionRequest(final TransactionIdentifier identifier, final long sequence, final long retry,
- final ActorRef replyTo) {
- super(identifier, sequence, retry, replyTo);
+ TransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) {
+ super(identifier, replyTo);
}
TransactionRequest(final T request, final ABIVersion version) {
super(request, version);
}
- TransactionRequest(final T request, final long retry) {
- super(request, retry);
- }
-
@Override
public final TransactionFailure toRequestFailure(final RequestException cause) {
- return new TransactionFailure(getTarget(), getSequence(), getRetry(), cause);
+ return new TransactionFailure(getTarget(), cause);
}
@Override
public abstract class TransactionSuccess<T extends TransactionSuccess<T>> extends RequestSuccess<TransactionIdentifier, T> {
private static final long serialVersionUID = 1L;
- TransactionSuccess(final TransactionIdentifier identifier, final long sequence, final long retry) {
- super(identifier, sequence, retry);
+ TransactionSuccess(final TransactionIdentifier identifier) {
+ super(identifier);
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.yangtools.concepts.WritableObjects;
+
+abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externalizable {
+ private static final long serialVersionUID = 1L;
+ private T message;
+ private long sequence;
+ private long retry;
+
+ public AbstractEnvelopeProxy() {
+ // for Externalizable
+ }
+
+ AbstractEnvelopeProxy(final Envelope<T> envelope) {
+ message = envelope.getMessage();
+ sequence = envelope.getSequence();
+ retry = envelope.getRetry();
+ }
+
+ @Override
+ public final void writeExternal(final ObjectOutput out) throws IOException {
+ WritableObjects.writeLongs(out, sequence, retry);
+ out.writeObject(message);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ final byte header = WritableObjects.readLongHeader(in);
+ sequence = WritableObjects.readFirstLong(in, header);
+ retry = WritableObjects.readSecondLong(in, header);
+ message = (T) in.readObject();
+ }
+
+ abstract Envelope<T> createEnvelope(T message, long sequence, long retry);
+
+ final Object readResolve() {
+ return createEnvelope(message, sequence, retry);
+ }
+}
\ No newline at end of file
import java.io.ObjectOutput;
import javax.annotation.Nonnull;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
-import org.opendaylight.yangtools.concepts.WritableObjects;
/**
* Abstract Externalizable proxy for use with {@link Message} subclasses.
abstract class AbstractMessageProxy<T extends WritableIdentifier, C extends Message<T, C>> implements Externalizable {
private static final long serialVersionUID = 1L;
private T target;
- private long sequence;
- private long retry;
protected AbstractMessageProxy() {
// For Externalizable
AbstractMessageProxy(final @Nonnull C message) {
this.target = message.getTarget();
- this.sequence = message.getSequence();
- this.retry = message.getRetry();
}
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
target.writeTo(out);
- WritableObjects.writeLongs(out, sequence, retry);
}
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
target = Verify.verifyNotNull(readTarget(in));
-
- final byte header = WritableObjects.readLongHeader(in);
- sequence = WritableObjects.readFirstLong(in, header);
- retry = WritableObjects.readSecondLong(in, header);
}
protected final Object readResolve() {
- return Verify.verifyNotNull(createMessage(target, sequence, retry));
+ return Verify.verifyNotNull(createMessage(target));
}
protected abstract @Nonnull T readTarget(@Nonnull DataInput in) throws IOException;
- abstract @Nonnull C createMessage(@Nonnull T target, long sequence, long retry);
+ abstract @Nonnull C createMessage(@Nonnull T target);
}
\ No newline at end of file
}
@Override
- final C createResponse(final T target, final long sequence, final long retry) {
- return createFailure(target, sequence, retry, cause);
+ final C createResponse(final T target) {
+ return createFailure(target, cause);
}
- protected abstract @Nonnull C createFailure(@Nonnull T target, long sequence, long retry,
- @Nonnull RequestException cause);
+ protected abstract @Nonnull C createFailure(@Nonnull T target, @Nonnull RequestException cause);
}
\ No newline at end of file
}
@Override
- protected final C createMessage(final T target, final long sequence, final long retry) {
- return createRequest(target, sequence, retry, replyTo);
+ final @Nonnull C createMessage(@Nonnull final T target) {
+ return createRequest(target, replyTo);
}
- protected abstract @Nonnull C createRequest(@Nonnull T target, long sequence, long retry, @Nonnull ActorRef replyTo);
+ protected abstract @Nonnull C createRequest(@Nonnull T target, @Nonnull ActorRef replyTo);
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+abstract class AbstractResponseEnvelopeProxy<T extends Response<?, ?>> extends AbstractEnvelopeProxy<T> {
+ private static final long serialVersionUID = 1L;
+
+ public AbstractResponseEnvelopeProxy() {
+ // for Externalizable
+ }
+
+ AbstractResponseEnvelopeProxy(final ResponseEnvelope<T> envelope) {
+ super(envelope);
+ }
+
+ @Override
+ abstract ResponseEnvelope<T> createEnvelope(T message, long sequence, long retry);
+}
\ No newline at end of file
}
@Override
- final C createMessage(final T target, final long sequence, final long retry) {
- return createResponse(target, sequence, retry);
+ final C createMessage(final T target) {
+ return createResponse(target);
}
- abstract @Nonnull C createResponse(@Nonnull T target, long sequence, long retry);
+ abstract @Nonnull C createResponse(@Nonnull T target);
}
}
@Override
- final C createResponse(final T target, final long sequence, final long retry) {
- return createSuccess(target, sequence, retry);
+ final C createResponse(final T target) {
+ return createSuccess(target);
}
- protected abstract @Nonnull C createSuccess(@Nonnull T target, long sequence, long retry);
+ protected abstract @Nonnull C createSuccess(@Nonnull T target);
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import org.opendaylight.yangtools.concepts.Immutable;
+
+public abstract class Envelope<T extends Message<?, ?>> implements Immutable, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final T message;
+ private final long sequence;
+ private final long retry;
+
+ Envelope(final T message, final long sequence, final long retry) {
+ this.message = Preconditions.checkNotNull(message);
+ this.sequence = sequence;
+ this.retry = retry;
+ }
+
+ /**
+ * Get the enclosed message
+ *
+ * @return enclose message
+ */
+ public T getMessage() {
+ return message;
+ }
+
+ /**
+ * Get the message sequence of this envelope.
+ *
+ * @return Message sequence
+ */
+ public long getSequence() {
+ return sequence;
+ }
+
+ /**
+ * Get the message retry counter.
+ *
+ * @return Retry counter
+ */
+ public long getRetry() {
+ return retry;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(Envelope.class).add("sequence", Long.toUnsignedString(sequence, 16)).
+ add("retry", retry).add("message", message).toString();
+ }
+
+ final Object writeReplace() {
+ return createProxy();
+ }
+
+ abstract AbstractEnvelopeProxy<T> createProxy();
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
+ private static final long serialVersionUID = 1L;
+
+ public FailureEnvelope(final RequestFailure<?, ?> message, final long sequence, final long retry) {
+ super(message, sequence, retry);
+ }
+
+ @Override
+ FailureEnvelopeProxy createProxy() {
+ return new FailureEnvelopeProxy(this);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+final class FailureEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestFailure<?, ?>> {
+ private static final long serialVersionUID = 1L;
+
+ public FailureEnvelopeProxy() {
+ // for Externalizable
+ }
+
+ FailureEnvelopeProxy(final FailureEnvelope envelope) {
+ super(envelope);
+ }
+
+ @Override
+ FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sequence, final long retry) {
+ return new FailureEnvelope(message, sequence, retry);
+ }
+}
Serializable {
private static final long serialVersionUID = 1L;
private final T target;
- private final long sequence;
private final ABIVersion version;
- private final long retry;
- private Message(final ABIVersion version, final T target, final long sequence, final long retry) {
+ private Message(final ABIVersion version, final T target) {
this.target = Preconditions.checkNotNull(target);
this.version = Preconditions.checkNotNull(version);
- this.sequence = sequence;
- this.retry = retry;
}
- Message(final T target, final long sequence, final long retry) {
- this(ABIVersion.current(), target, sequence, retry);
+ Message(final T target) {
+ this(ABIVersion.current(), target);
}
Message(final C msg, final ABIVersion version) {
- this(version, msg.getTarget(), msg.getSequence(), msg.getRetry());
- }
-
- Message(final C msg, final long retry) {
- this(msg.getVersion(), msg.getTarget(), msg.getSequence(), retry);
+ this(version, msg.getTarget());
}
/**
return target;
}
- /**
- * Get the message sequence of this message.
- *
- * @return Message sequence
- */
- public final long getSequence() {
- return sequence;
- }
-
@VisibleForTesting
public final ABIVersion getVersion() {
return version;
}
- /**
- * Get the message retry counter.
- *
- * @return Retry counter
- */
- public final long getRetry() {
- return retry;
- }
-
/**
* Return a message which will end up being serialized in the specified {@link ABIVersion}.
*
*/
protected abstract @Nonnull C cloneAsVersion(@Nonnull ABIVersion version);
- /**
- * Return a message which will have the retry counter incremented by one.
- *
- * @return A message with the specified retry counter
- */
- public final @Nonnull C incrementRetry() {
- return Verify.verifyNotNull(cloneAsRetry(retry +1));
- }
-
- /**
- * Create a copy of this message which will have its retry count bumped. This method should be implemented by
- * the concrete final message class and should invoked the equivalent of {@link #Message(Message, long)}.
- *
- * @param retry new retry count
- * @return A message with the specified retry counter
- */
- protected abstract @Nonnull C cloneAsRetry(long retry);
-
@Override
public final String toString() {
return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues()).toString();
* @throws NullPointerException if toStringHelper is null
*/
protected @Nonnull ToStringHelper addToStringAttributes(final @Nonnull ToStringHelper toStringHelper) {
- return toStringHelper.add("target", target).add("sequence", Long.toUnsignedString(sequence, 16));
+ return toStringHelper.add("target", target);
}
/**
private static final long serialVersionUID = 1L;
private final ActorRef replyTo;
- protected Request(final @Nonnull T target, final long sequence, final long retry, final @Nonnull ActorRef replyTo) {
- super(target, sequence, retry);
+ protected Request(final @Nonnull T target, final @Nonnull ActorRef replyTo) {
+ super(target);
this.replyTo = Preconditions.checkNotNull(replyTo);
}
this.replyTo = Preconditions.checkNotNull(request.getReplyTo());
}
- protected Request(final C request, final long retry) {
- super(request, retry);
- this.replyTo = Preconditions.checkNotNull(request.getReplyTo());
- }
-
/**
* Return the return address where responses to this request should be directed to.
*
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import akka.actor.ActorRef;
+
+public final class RequestEnvelope extends Envelope<Request<?, ?>> {
+ private static final long serialVersionUID = 1L;
+
+ public RequestEnvelope(final Request<?, ?> message, final long sequence, final long retry) {
+ super(message, sequence, retry);
+ }
+
+ @Override
+ RequestEnvelopeProxy createProxy() {
+ return new RequestEnvelopeProxy(this);
+ }
+
+ /**
+ * Respond to this envelope with a {@link RequestFailure} caused by specified {@link RequestException}.
+ *
+ * @param cause Cause of this {@link RequestFailure}
+ * @throws NullPointerException if cause is null
+ */
+ public void sendFailure(final RequestException cause) {
+ sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSequence(), getRetry()));
+ }
+
+ /**
+ * Respond to this envelope with a {@link RequestSuccess}.
+ *
+ * @param success Successful response
+ * @throws NullPointerException if success is null
+ */
+ public void sendSuccess(final RequestSuccess<?, ?> success) {
+ sendResponse(new SuccessEnvelope(success, getSequence(), getRetry()));
+ }
+
+ private void sendResponse(final ResponseEnvelope<?> envelope) {
+ getMessage().getReplyTo().tell(envelope, ActorRef.noSender());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+final class RequestEnvelopeProxy extends AbstractEnvelopeProxy<Request<?, ?>> {
+ private static final long serialVersionUID = 1L;
+
+ public RequestEnvelopeProxy() {
+ // for Externalizable
+ }
+
+ RequestEnvelopeProxy(final RequestEnvelope envelope) {
+ super(envelope);
+ }
+
+ @Override
+ RequestEnvelope createEnvelope(final Request<?, ?> message, final long sequence, final long retry) {
+ return new RequestEnvelope(message, sequence, retry);
+ }
+}
this.cause = Preconditions.checkNotNull(failure.getCause());
}
- protected RequestFailure(final @Nonnull T target, final long sequence, final long retry,
- final @Nonnull RequestException cause) {
- super(target, sequence, retry);
+ protected RequestFailure(final @Nonnull T target, final @Nonnull RequestException cause) {
+ super(target);
this.cause = Preconditions.checkNotNull(cause);
}
super(success, version);
}
- protected RequestSuccess(final @Nonnull T target, final long sequence, final long retry) {
- super(target, sequence, retry);
+ protected RequestSuccess(final @Nonnull T target) {
+ super(target);
}
@Override
public abstract class Response<T extends WritableIdentifier, C extends Response<T, C>> extends Message<T, C> {
private static final long serialVersionUID = 1L;
- Response(final @Nonnull C response, final @Nonnull ABIVersion version) {
- super(response, version);
- }
-
- Response(final @Nonnull T target, final long sequence, final long retry) {
- super(target, sequence, retry);
+ Response(final @Nonnull T target) {
+ super(target);
}
- @Override
- protected final C cloneAsRetry(final long retry) {
- throw new UnsupportedOperationException("Responses do not support retries");
+ Response(final @Nonnull C response, final @Nonnull ABIVersion version) {
+ super(response, version);
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
+ private static final long serialVersionUID = 1L;
+
+ ResponseEnvelope(final T message, final long sequence, final long retry) {
+ super(message, sequence, retry);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
+ private static final long serialVersionUID = 1L;
+
+ public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sequence, final long retry) {
+ super(message, sequence, retry);
+ }
+
+ @Override
+ SuccessEnvelopeProxy createProxy() {
+ return new SuccessEnvelopeProxy(this);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+final class SuccessEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestSuccess<?, ?>> {
+ private static final long serialVersionUID = 1L;
+
+ public SuccessEnvelopeProxy() {
+ // for Externalizable
+ }
+
+ SuccessEnvelopeProxy(final SuccessEnvelope envelope) {
+ super(envelope);
+ }
+
+ @Override
+ SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sequence, final long retry) {
+ return new SuccessEnvelope(message, sequence, retry);
+ }
+}
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
-import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
+import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
if (command instanceof InternalCommand) {
return ((InternalCommand) command).execute(this);
}
- if (command instanceof RequestSuccess) {
- return onRequestSuccess((RequestSuccess<?, ?>) command);
+ if (command instanceof SuccessEnvelope) {
+ return onRequestSuccess((SuccessEnvelope) command);
}
- if (command instanceof RequestFailure) {
- return onRequestFailure((RequestFailure<?, ?>) command);
+ if (command instanceof FailureEnvelope) {
+ return onRequestFailure((FailureEnvelope) command);
}
return onCommand(command);
}
- private ClientActorBehavior onRequestSuccess(final RequestSuccess<?, ?> success) {
- return context().completeRequest(this, success);
+ private ClientActorBehavior onRequestSuccess(final SuccessEnvelope command) {
+ return context().completeRequest(this, command);
}
- private ClientActorBehavior onRequestFailure(final RequestFailure<?, ?> failure) {
+ private ClientActorBehavior onRequestFailure(final FailureEnvelope command) {
+ final RequestFailure<?, ?> failure = command.getMessage();
final RequestException cause = failure.getCause();
if (cause instanceof RetiredGenerationException) {
LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
}
if (failure.isHardFailure()) {
- return context().completeRequest(this, failure);
+ return context().completeRequest(this, command);
}
// TODO: add instanceof checks on cause to detect more problems
- LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), failure);
- return context().completeRequest(this, failure);
+ LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), command);
+ return context().completeRequest(this, command);
}
// This method is executing in the actor context, hence we can safely interact with the queue
- private ClientActorBehavior doSendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
+ private ClientActorBehavior doSendRequest(final long sequence, final TransactionRequest<?> request,
+ final RequestCallback callback) {
// Get or allocate queue for the request
final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
// Note this is a tri-state return and can be null
- final Optional<FiniteDuration> result = queue.enqueueRequest(request, callback);
+ final Optional<FiniteDuration> result = queue.enqueueRequest(sequence, request, callback);
if (result == null) {
// Happy path: we are done here
return this;
* @param request Request to send
* @param callback Callback to invoke
*/
- public final void sendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
- context().executeInActor(cb -> cb.doSendRequest(request, callback));
+ public final void sendRequest(final long sequence, final TransactionRequest<?> request, final RequestCallback callback) {
+ context().executeInActor(cb -> cb.doSendRequest(sequence, request, callback));
}
}
import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
queues.remove(queue.getCookie(), queue);
}
- ClientActorBehavior completeRequest(final ClientActorBehavior current, final Response<?, ?> response) {
- final WritableIdentifier id = response.getTarget();
+ ClientActorBehavior completeRequest(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
+ final WritableIdentifier id = response.getMessage().getTarget();
// FIXME: this will need to be updated for other Request/Response types to extract cookie
Preconditions.checkArgument(id instanceof TransactionIdentifier);
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
* @param callback Callback to be invoked
* @return Optional duration with semantics described above.
*/
- @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
+ @Nullable Optional<FiniteDuration> enqueueRequest(final long sequence, final Request<?, ?> request,
+ final RequestCallback callback) {
+ checkNotClosed();
+
final long now = ticker.read();
- final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
+ final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now);
- // We could have check first, but argument checking needs to happen first
- checkNotClosed();
queue.add(e);
LOG.debug("Enqueued request {} to queue {}", request, this);
}
}
- ClientActorBehavior complete(final ClientActorBehavior current, final Response<?, ?> response) {
+ ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
// Responses to different targets may arrive out of order, hence we use an iterator
final Iterator<SequencedQueueEntry> it = queue.iterator();
while (it.hasNext()) {
lastProgress = ticker.read();
it.remove();
LOG.debug("Completing request {} with {}", e, response);
- return e.complete(response);
+ return e.complete(response.getMessage());
}
}
import com.google.common.base.Preconditions;
import java.util.Optional;
import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*/
final class SequencedQueueEntry {
private static final class LastTry {
- final Request<?, ?> request;
final long timeTicks;
+ final long retry;
- LastTry(final Request<?, ?> request, final long when) {
- this.request = Preconditions.checkNotNull(request);
- this.timeTicks = when;
+ LastTry(final long retry, final long timeTicks) {
+ this.retry = retry;
+ this.timeTicks = timeTicks;
}
}
private final Request<?, ?> request;
private final RequestCallback callback;
private final long enqueuedTicks;
+ private final long sequence;
private Optional<LastTry> lastTry = Optional.empty();
- SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback, final long now) {
+ SequencedQueueEntry(final Request<?, ?> request, final long sequence, final RequestCallback callback,
+ final long now) {
this.request = Preconditions.checkNotNull(request);
this.callback = Preconditions.checkNotNull(callback);
this.enqueuedTicks = now;
+ this.sequence = sequence;
}
long getSequence() {
- return request.getSequence();
+ return sequence;
}
- boolean acceptsResponse(final Response<?, ?> response) {
- return getSequence() == response.getSequence() && request.getTarget().equals(response.getTarget());
+ boolean acceptsResponse(final ResponseEnvelope<?> response) {
+ return getSequence() == response.getSequence() && request.getTarget().equals(response.getMessage().getTarget());
}
long getCurrentTry() {
- final Request<?, ?> req = lastTry.isPresent() ? lastTry.get().request : request;
- return req.getRetry();
+ return lastTry.isPresent() ? lastTry.get().retry : 0;
}
ClientActorBehavior complete(final Response<?, ?> response) {
}
boolean isTimedOut(final long now, final long timeoutNanos) {
- final Request<?, ?> req;
final long elapsed;
if (lastTry.isPresent()) {
- final LastTry t = lastTry.get();
- elapsed = now - t.timeTicks;
- req = t.request;
+ elapsed = now - lastTry.get().timeTicks;
} else {
elapsed = now - enqueuedTicks;
- req = request;
}
if (elapsed >= timeoutNanos) {
- LOG.debug("Request {} timed out after {}ns", req, elapsed);
+ LOG.debug("Request {} timed out after {}ns", request, elapsed);
return true;
} else {
return false;
}
void retransmit(final BackendInfo backend, final long now) {
- final Request<?, ?> nextTry = lastTry.isPresent() ? lastTry.get().request.incrementRetry() : request;
- final Request<?, ?> toSend = nextTry.toVersion(backend.getVersion());
- final ActorRef actor = backend.getActor();
+ final long retry = lastTry.isPresent() ? lastTry.get().retry + 1 : 0;
+ final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), sequence, retry);
+ final ActorRef actor = backend.getActor();
LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
actor.tell(toSend, ActorRef.noSender());
- lastTry = Optional.of(new LastTry(toSend, now));
+ lastTry = Optional.of(new LastTry(retry, now));
}
@Override
import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.access.concepts.Response;
private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
private static final long serialVersionUID = 1L;
- MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) {
- super(target, sequence, retry, cause);
+ MockFailure(final WritableIdentifier target, final RequestException cause) {
+ super(target, cause);
}
@Override
private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
private static final long serialVersionUID = 1L;
- MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) {
- super(target, sequence, 0, replyTo);
- }
-
-
- MockRequest(final MockRequest request, final long retry) {
- super(request, retry);
+ MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
+ super(target, replyTo);
}
@Override
public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
- return new MockFailure(getTarget(), getSequence(), getRetry(), cause);
+ return new MockFailure(getTarget(), cause);
}
@Override
protected MockRequest cloneAsVersion(final ABIVersion version) {
return this;
}
-
- @Override
- protected MockRequest cloneAsRetry(final long retry) {
- return new MockRequest(this, retry);
- }
};
@Mock
mockActor = TestProbe.apply(actorSystem);
mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
- mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo);
+ mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
mockResponse = mockRequest.toRequestFailure(mockCause);
- entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read());
+ entry = new SequencedQueueEntry(mockRequest, 0, mockCallback, ticker.read());
}
@After
@Test
public void testGetSequence() {
- assertEquals(mockRequest.getSequence(), entry.getSequence());
+ assertEquals(0, entry.getSequence());
}
@Test
}
private static void assertRequestEquals(final Request<?, ?> expected, final Object o) {
- final Request<?, ?> actual = (Request<?, ?>) o;
- assertEquals(expected.getRetry(), actual.getRetry());
- assertEquals(expected.getSequence(), actual.getSequence());
- assertEquals(expected.getTarget(), actual.getTarget());
+ assertTrue(o instanceof RequestEnvelope);
+
+ final RequestEnvelope actual = (RequestEnvelope) o;
+ assertEquals(0, actual.getRetry());
+ assertEquals(0, actual.getSequence());
+ assertEquals(expected.getTarget(), actual.getMessage().getTarget());
}
}
import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
+import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
-import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.common.actor.TestTicker;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
import scala.concurrent.duration.FiniteDuration;
private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
private static final long serialVersionUID = 1L;
- MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) {
- super(target, sequence, retry, cause);
+ MockFailure(final WritableIdentifier target, final RequestException cause) {
+ super(target, cause);
}
@Override
private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
private static final long serialVersionUID = 1L;
- MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) {
- super(target, sequence, 0, replyTo);
- }
-
-
- MockRequest(final MockRequest request, final long retry) {
- super(request, retry);
+ MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
+ super(target, replyTo);
}
@Override
public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
- return new MockFailure(getTarget(), getSequence(), getRetry(), cause);
+ return new MockFailure(getTarget(), cause);
}
@Override
protected MockRequest cloneAsVersion(final ABIVersion version) {
return this;
}
-
- @Override
- protected MockRequest cloneAsRetry(final long retry) {
- return new MockRequest(this, retry);
- }
};
@Mock
private BackendInfo mockBackendInfo;
private MockRequest mockRequest;
private MockRequest mockRequest2;
- private Response<WritableIdentifier, ?> mockResponse;
+ private RequestFailure<WritableIdentifier, ?> mockResponse;
+ private FailureEnvelope mockResponseEnvelope;
private Long mockCookie;
private static ActorSystem actorSystem;
mockActor = TestProbe.apply(actorSystem);
mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
- mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo);
- mockRequest2 = new MockRequest(mockIdentifier, mockRequest.getSequence() + 1, mockReplyTo);
+ mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
+ mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
mockResponse = mockRequest.toRequestFailure(mockCause);
+ mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0);
mockCookie = ThreadLocalRandom.current().nextLong();
queue = new SequencedQueue(mockCookie, ticker);
queue.close();
// Kaboom
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
}
@Test
@Test
public void testPoison() {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
queue.poison(mockCause);
final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
queue.poison(mockCause);
// Kaboom
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
}
@Test
@Test
public void testEnqueueRequestNeedsBackend() {
- final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+ final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
assertNotNull(ret);
assertFalse(ret.isPresent());
@Test
public void testSetBackendWithNoResolution() {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
@Test
public void testSetBackendWithWrongProof() {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
assertTrue(queue.expectProof(proof));
@Test
public void testSetbackedWithRequestsNoTimer() {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
assertTrue(queue.expectProof(proof));
assertNotNull(ret);
assertTrue(ret.isPresent());
- assertTransmit(mockRequest);
+ assertTransmit(mockRequest, 0);
}
@Test
public void testEnqueueRequestNeedsTimer() {
setupBackend();
- final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+ final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
assertNotNull(ret);
assertTrue(ret.isPresent());
- assertTransmit(mockRequest);
+ assertTransmit(mockRequest, 0);
}
@Test
setupBackend();
// First request
- Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+ Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
assertNotNull(ret);
assertTrue(ret.isPresent());
- assertTransmit(mockRequest);
+ assertTransmit(mockRequest, 0);
// Second request, no timer fired
- ret = queue.enqueueRequest(mockRequest2, mockCallback);
+ ret = queue.enqueueRequest(1, mockRequest2, mockCallback);
assertNull(ret);
- assertTransmit(mockRequest2);
+ assertTransmit(mockRequest2, 1);
}
@Test
@Test
public void testRunTimeoutWithoutShift() throws NoProgressException {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
final boolean ret = queue.runTimeout();
assertFalse(ret);
}
@Test
public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1);
@Test
public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS);
@Test
public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1);
@Test(expected=NoProgressException.class)
public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
@Test(expected=NoProgressException.class)
public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
@Test
public void testCompleteEmpty() {
- final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+ final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
assertSame(mockBehavior, ret);
verifyNoMoreInteractions(mockCallback);
}
@Test
public void testCompleteSingle() {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
- ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+ ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
verify(mockCallback).complete(mockResponse);
assertSame(mockBehavior, ret);
- ret = queue.complete(mockBehavior, mockResponse);
+ ret = queue.complete(mockBehavior, mockResponseEnvelope);
assertSame(mockBehavior, ret);
verifyNoMoreInteractions(mockCallback);
}
@Test
public void testCompleteNull() {
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
doReturn(null).when(mockCallback).complete(mockResponse);
- ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+ ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
verify(mockCallback).complete(mockResponse);
assertNull(ret);
}
public void testProgressRecord() throws NoProgressException {
setupBackend();
- queue.enqueueRequest(mockRequest, mockCallback);
+ queue.enqueueRequest(0, mockRequest, mockCallback);
ticker.increment(10);
- queue.enqueueRequest(mockRequest2, mockCallback);
- queue.complete(mockBehavior, mockResponse);
+ queue.enqueueRequest(1, mockRequest2, mockCallback);
+ queue.complete(mockBehavior, mockResponseEnvelope);
ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11);
assertTrue(queue.runTimeout());
assertFalse(mockActor.msgAvailable());
}
- private void assertTransmit(final Request<?, ?> expected) {
+ private void assertTransmit(final Request<?, ?> expected, final long sequence) {
assertTrue(mockActor.msgAvailable());
- assertRequestEquals(expected, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
+ assertRequestEquals(expected, sequence, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
}
- private static void assertRequestEquals(final Request<?, ?> expected, final Object o) {
- final Request<?, ?> actual = (Request<?, ?>) o;
- assertEquals(expected.getRetry(), actual.getRetry());
- assertEquals(expected.getSequence(), actual.getSequence());
- assertEquals(expected.getTarget(), actual.getTarget());
+ private static void assertRequestEquals(final Request<?, ?> expected, final long sequence, final Object o) {
+ assertTrue(o instanceof RequestEnvelope);
+
+ final RequestEnvelope actual = (RequestEnvelope) o;
+ assertEquals(0, actual.getRetry());
+ assertEquals(sequence, actual.getSequence());
+ assertSame(expected, actual.getMessage());
}
}