public AbortLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
final @Nonnull ActorRef replyTo) {
- super(identifier, replyTo);
+ super(identifier, 0, replyTo);
}
}
\ No newline at end of file
abstract class AbstractLocalTransactionRequest<T extends AbstractLocalTransactionRequest<T>> extends TransactionRequest<T> {
private static final long serialVersionUID = 1L;
- AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) {
- super(identifier, replyTo);
+ AbstractLocalTransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo) {
+ super(identifier, sequence, replyTo);
}
@Override
private static final long serialVersionUID = 1L;
private final YangInstanceIdentifier path;
- AbstractReadTransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo,
+ AbstractReadTransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo,
final YangInstanceIdentifier path) {
- super(identifier, replyTo);
+ super(identifier, sequence, replyTo);
this.path = Preconditions.checkNotNull(path);
}
}
@Override
- protected final T createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
- return createReadRequest(target, replyTo, path);
+ protected final T createRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+ return createReadRequest(target, sequence, replyTo, path);
}
- abstract T createReadRequest(TransactionIdentifier target, ActorRef replyTo, YangInstanceIdentifier path);
+ abstract T createReadRequest(TransactionIdentifier target, long sequence, ActorRef replyTo,
+ YangInstanceIdentifier path);
}
public CommitLocalTransactionRequest(final @Nonnull TransactionIdentifier identifier,
final @Nonnull ActorRef replyTo, final @Nonnull DataTreeModification mod, final boolean coordinated) {
- super(identifier, replyTo);
+ super(identifier, 0, replyTo);
this.mod = Preconditions.checkNotNull(mod);
this.coordinated = coordinated;
}
public final class ConnectClientFailure extends RequestFailure<ClientIdentifier, ConnectClientFailure> {
private static final long serialVersionUID = 1L;
- ConnectClientFailure(final ClientIdentifier target, final RequestException cause) {
- super(target, cause);
+ ConnectClientFailure(final ClientIdentifier target, final long sequence, final RequestException cause) {
+ super(target, sequence, cause);
}
private ConnectClientFailure(final ConnectClientFailure failure, final ABIVersion version) {
}
@Override
- protected ConnectClientFailure createFailure(final ClientIdentifier target, final RequestException cause) {
- return new ConnectClientFailure(target, cause);
+ protected ConnectClientFailure createFailure(final ClientIdentifier target, final long sequence,
+ final RequestException cause) {
+ return new ConnectClientFailure(target, sequence, cause);
}
@Override
private final ABIVersion minVersion;
private final ABIVersion maxVersion;
- private final long resumeSequence;
- public ConnectClientRequest(final ClientIdentifier identifier, final ActorRef replyTo, final ABIVersion minVersion,
- final ABIVersion maxVersion) {
- this(identifier, replyTo, minVersion, maxVersion, 0);
+ ConnectClientRequest(final ClientIdentifier identifier, final long txSequence, final ActorRef replyTo,
+ final ABIVersion minVersion, final ABIVersion maxVersion) {
+ super(identifier, txSequence, replyTo);
+ this.minVersion = Preconditions.checkNotNull(minVersion);
+ this.maxVersion = Preconditions.checkNotNull(maxVersion);
}
public ConnectClientRequest(final ClientIdentifier identifier, final ActorRef replyTo, final ABIVersion minVersion,
- final ABIVersion maxVersion, final long resumeSequence) {
- super(identifier, replyTo);
- this.minVersion = Preconditions.checkNotNull(minVersion);
- this.maxVersion = Preconditions.checkNotNull(maxVersion);
- this.resumeSequence = resumeSequence;
+ final ABIVersion maxVersion) {
+ this(identifier, 0, replyTo, minVersion, maxVersion);
}
private ConnectClientRequest(final ConnectClientRequest request, final ABIVersion version) {
super(request, version);
this.minVersion = request.minVersion;
this.maxVersion = request.maxVersion;
- this.resumeSequence = request.resumeSequence;
}
public ABIVersion getMinVersion() {
return maxVersion;
}
- public long getResumeSequence() {
- return resumeSequence;
- }
-
@Override
public final ConnectClientFailure toRequestFailure(final RequestException cause) {
- return new ConnectClientFailure(getTarget(), cause);
+ return new ConnectClientFailure(getTarget(), getSequence(), cause);
}
@Override
@Override
protected @Nonnull ToStringHelper addToStringAttributes(final @Nonnull ToStringHelper toStringHelper) {
- return super.addToStringAttributes(toStringHelper).add("minVersion", minVersion).add("maxVersion", maxVersion)
- .add("resumeSequence", resumeSequence);
+ return super.addToStringAttributes(toStringHelper).add("minVersion", minVersion).add("maxVersion", maxVersion);
}
}
import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.yangtools.concepts.WritableObjects;
/**
* Externalizable proxy for use with {@link ConnectClientRequest}. It implements the initial (Boron) serialization
final class ConnectClientRequestProxyV1 extends AbstractRequestProxy<ClientIdentifier, ConnectClientRequest> {
private ABIVersion minVersion;
private ABIVersion maxVersion;
- private long resumeSequence;
public ConnectClientRequestProxyV1() {
// for Externalizable
super(request);
this.minVersion = request.getMinVersion();
this.maxVersion = request.getMaxVersion();
- this.resumeSequence = request.getResumeSequence();
}
@Override
super.writeExternal(out);
minVersion.writeTo(out);
maxVersion.writeTo(out);
- WritableObjects.writeLong(out, resumeSequence);
}
@Override
super.readExternal(in);
minVersion = ABIVersion.inexactReadFrom(in);
maxVersion = ABIVersion.inexactReadFrom(in);
- resumeSequence = WritableObjects.readLong(in);
}
@Override
- protected ConnectClientRequest createRequest(final ClientIdentifier target, final ActorRef replyTo) {
- return new ConnectClientRequest(target, replyTo, minVersion, maxVersion, resumeSequence);
+ protected ConnectClientRequest createRequest(final ClientIdentifier target, final long sequence,
+ final ActorRef replyTo) {
+ return new ConnectClientRequest(target, sequence, replyTo, minVersion, maxVersion);
}
@Override
private final List<ActorSelection> alternates;
private final DataTree dataTree;
private final ActorRef backend;
- private final long maxMessages;
+ private final int maxMessages;
- ConnectClientSuccess(final ClientIdentifier target, final ActorRef backend, final List<ActorSelection> alternates,
- final Optional<DataTree> dataTree, final long maxMessages) {
- super(target);
+ ConnectClientSuccess(final ClientIdentifier target, final long sequence, final ActorRef backend,
+ final List<ActorSelection> alternates, final Optional<DataTree> dataTree, final int maxMessages) {
+ super(target, sequence);
this.backend = Preconditions.checkNotNull(backend);
this.alternates = ImmutableList.copyOf(alternates);
this.dataTree = dataTree.orElse(null);
this.maxMessages = maxMessages;
}
- public ConnectClientSuccess(final @Nonnull ClientIdentifier target, final @Nonnull ActorRef backend,
- final @Nonnull List<ActorSelection> alternates, final @Nonnull DataTree dataTree, final long maxMessages) {
- this(target, backend, alternates, Optional.of(dataTree), maxMessages);
+ public ConnectClientSuccess(final @Nonnull ClientIdentifier target, final long sequence,
+ final @Nonnull ActorRef backend, final @Nonnull List<ActorSelection> alternates,
+ final @Nonnull DataTree dataTree, final int maxMessages) {
+ this(target, sequence, backend, alternates, Optional.of(dataTree), maxMessages);
}
/**
return Optional.ofNullable(dataTree);
}
- public long getMaxMessages() {
+ public int getMaxMessages() {
return maxMessages;
}
@Override
protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
- return super.addToStringAttributes(toStringHelper).add("alternates", alternates).add("dataTree", dataTree);
+ return super.addToStringAttributes(toStringHelper).add("alternates", alternates).add("dataTree", dataTree)
+ .add("maxMessages", maxMessages);
}
}
private List<ActorSelection> alternates;
private ActorRef backend;
- private long maxMessages;
+ private int maxMessages;
public ConnectClientSuccessProxyV1() {
// For Externalizable
super(success);
this.alternates = success.getAlternates();
this.backend = success.getBackend();
+ this.maxMessages = success.getMaxMessages();
// We are ignoring the DataTree, it is not serializable anyway
}
public void writeExternal(final ObjectOutput out) throws IOException {
super.writeExternal(out);
- out.writeUTF(Serialization.serializedActorPath(backend));
+ out.writeObject(Serialization.serializedActorPath(backend));
+ out.writeInt(maxMessages);
out.writeInt(alternates.size());
for (ActorSelection b : alternates) {
out.writeObject(b.toSerializationFormat());
}
-
- out.writeLong(maxMessages);
}
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
- backend = JavaSerializer.currentSystem().value().provider().resolveActorRef(in.readUTF());
-
- final int backendsSize = in.readInt();
- if (backendsSize < 1) {
- throw new IOException("Illegal number of backends " + backendsSize);
- }
+ backend = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
+ maxMessages = in.readInt();
- alternates = new ArrayList<>(backendsSize);
- for (int i = 0; i < backendsSize; ++i) {
+ final int alternatesSize = in.readInt();
+ alternates = new ArrayList<>(alternatesSize);
+ for (int i = 0; i < alternatesSize; ++i) {
alternates.add(ActorSelection.apply(ActorRef.noSender(), (String)in.readObject()));
}
-
- maxMessages = in.readLong();
}
@Override
- protected ConnectClientSuccess createSuccess(final ClientIdentifier target) {
- return new ConnectClientSuccess(target, backend, alternates, Optional.empty(), maxMessages);
+ protected ConnectClientSuccess createSuccess(final ClientIdentifier target, final long sequence) {
+ return new ConnectClientSuccess(target, sequence, backend, alternates, Optional.empty(), maxMessages);
}
@Override
private static final long serialVersionUID = 1L;
public CreateLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
- super(target, replyTo);
+ this(target, 0, replyTo);
+ }
+
+ CreateLocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+ super(target, sequence, replyTo);
}
private CreateLocalHistoryRequest(final CreateLocalHistoryRequest request, final ABIVersion version) {
}
@Override
- protected CreateLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
- return new CreateLocalHistoryRequest(target, replyTo);
+ protected CreateLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final long sequence,
+ final ActorRef replyTo) {
+ return new CreateLocalHistoryRequest(target, sequence, replyTo);
}
}
public final class DestroyLocalHistoryRequest extends LocalHistoryRequest<DestroyLocalHistoryRequest> {
private static final long serialVersionUID = 1L;
- public DestroyLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
- super(target, replyTo);
+ public DestroyLocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+ super(target, sequence, replyTo);
}
private DestroyLocalHistoryRequest(final DestroyLocalHistoryRequest request, final ABIVersion version) {
}
@Override
- protected DestroyLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
- return new DestroyLocalHistoryRequest(target, replyTo);
+ protected DestroyLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final long sequence,
+ final ActorRef replyTo) {
+ return new DestroyLocalHistoryRequest(target, sequence, replyTo);
}
}
public final class ExistsTransactionRequest extends AbstractReadTransactionRequest<ExistsTransactionRequest> {
private static final long serialVersionUID = 1L;
- public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo,
- final @Nonnull YangInstanceIdentifier path) {
- super(identifier, replyTo, path);
+ public ExistsTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
+ final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
+ super(identifier, sequence, replyTo, path);
}
private ExistsTransactionRequest(final ExistsTransactionRequest request, final ABIVersion version) {
}
@Override
- ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo,
- final YangInstanceIdentifier path) {
- return new ExistsTransactionRequest(target, replyTo, path);
+ ExistsTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
+ final ActorRef replyTo, final YangInstanceIdentifier path) {
+ return new ExistsTransactionRequest(target, sequence, replyTo, path);
}
}
\ No newline at end of file
private static final long serialVersionUID = 1L;
private final boolean exists;
- public ExistsTransactionSuccess(final TransactionIdentifier target, final boolean exists) {
- super(target);
+ public ExistsTransactionSuccess(final TransactionIdentifier target, final long sequence, final boolean exists) {
+ super(target, sequence);
this.exists = exists;
}
}
@Override
- protected ExistsTransactionSuccess createSuccess(final TransactionIdentifier target) {
- return new ExistsTransactionSuccess(target, exists);
+ protected ExistsTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+ return new ExistsTransactionSuccess(target, sequence, exists);
}
}
public final class LocalHistoryFailure extends RequestFailure<LocalHistoryIdentifier, LocalHistoryFailure> {
private static final long serialVersionUID = 1L;
- LocalHistoryFailure(final LocalHistoryIdentifier target, final RequestException cause) {
- super(target, cause);
+ LocalHistoryFailure(final LocalHistoryIdentifier target, final long sequence, final RequestException cause) {
+ super(target, sequence, cause);
}
@Override
}
@Override
- protected LocalHistoryFailure createFailure(final LocalHistoryIdentifier target, final RequestException cause) {
- return new LocalHistoryFailure(target, cause);
+ protected LocalHistoryFailure createFailure(final LocalHistoryIdentifier target, final long sequence,
+ final RequestException cause) {
+ return new LocalHistoryFailure(target, sequence, cause);
}
@Override
public abstract class LocalHistoryRequest<T extends LocalHistoryRequest<T>> extends Request<LocalHistoryIdentifier, T> {
private static final long serialVersionUID = 1L;
- LocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
- super(target, replyTo);
+ LocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+ super(target, sequence, replyTo);
}
LocalHistoryRequest(final T request, final ABIVersion version) {
@Override
public final LocalHistoryFailure toRequestFailure(final RequestException cause) {
- return new LocalHistoryFailure(getTarget(), cause);
+ return new LocalHistoryFailure(getTarget(), getSequence(), cause);
}
@Override
public final class LocalHistorySuccess extends RequestSuccess<LocalHistoryIdentifier, LocalHistorySuccess> {
private static final long serialVersionUID = 1L;
- public LocalHistorySuccess(final LocalHistoryIdentifier target) {
- super(target);
+ public LocalHistorySuccess(final LocalHistoryIdentifier target, final long sequence) {
+ super(target, sequence);
}
private LocalHistorySuccess(final LocalHistorySuccess success, final ABIVersion version) {
}
@Override
- protected LocalHistorySuccess createSuccess(final LocalHistoryIdentifier target) {
- return new LocalHistorySuccess(target);
+ protected LocalHistorySuccess createSuccess(final LocalHistoryIdentifier target, final long sequence) {
+ return new LocalHistorySuccess(target, sequence);
}
}
private final List<TransactionModification> modifications;
private final PersistenceProtocol protocol;
- ModifyTransactionRequest(final TransactionIdentifier target, final ActorRef replyTo,
+ ModifyTransactionRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo,
final List<TransactionModification> modifications, final PersistenceProtocol protocol) {
- super(target, replyTo);
+ super(target, sequence, replyTo);
this.modifications = ImmutableList.copyOf(modifications);
this.protocol = protocol;
}
private final List<TransactionModification> modifications = new ArrayList<>(1);
private final TransactionIdentifier identifier;
private final ActorRef replyTo;
- private PersistenceProtocol protocol = null;
+ private PersistenceProtocol protocol;
+ private Long sequence;
public ModifyTransactionRequestBuilder(final TransactionIdentifier identifier, final ActorRef replyTo) {
this.identifier = Preconditions.checkNotNull(identifier);
return identifier;
}
- private void checkFinished() {
- Preconditions.checkState(protocol != null, "Batch has already been finished");
+ private void checkNotFinished() {
+ Preconditions.checkState(protocol == null, "Batch has already been finished");
}
public void addModification(final TransactionModification modification) {
- checkFinished();
+ checkNotFinished();
modifications.add(Preconditions.checkNotNull(modification));
}
+ public void setSequence(final long sequence) {
+ this.sequence = sequence;
+ }
+
public void setAbort() {
- checkFinished();
+ checkNotFinished();
// Transaction is being aborted, no need to transmit operations
modifications.clear();
protocol = PersistenceProtocol.ABORT;
}
public void setCommit(final boolean coordinated) {
- checkFinished();
+ checkNotFinished();
protocol = coordinated ? PersistenceProtocol.THREE_PHASE : PersistenceProtocol.SIMPLE;
}
@Override
public ModifyTransactionRequest build() {
- final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, replyTo, modifications, protocol);
+ Preconditions.checkState(sequence != null, "Request sequence has not been set");
+
+ final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, sequence, replyTo, modifications,
+ protocol);
modifications.clear();
protocol = null;
+ sequence = null;
return ret;
}
}
}
@Override
- protected ModifyTransactionRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
- return new ModifyTransactionRequest(target, replyTo, modifications, protocol.orElse(null));
+ protected ModifyTransactionRequest createRequest(final TransactionIdentifier target, final long sequence,
+ final ActorRef replyTo) {
+ return new ModifyTransactionRequest(target, sequence, replyTo, modifications, protocol.orElse(null));
}
}
public final class PurgeLocalHistoryRequest extends LocalHistoryRequest<PurgeLocalHistoryRequest> {
private static final long serialVersionUID = 1L;
- public PurgeLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
- super(target, replyTo);
+ public PurgeLocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
+ super(target, sequence, replyTo);
}
private PurgeLocalHistoryRequest(final PurgeLocalHistoryRequest request, final ABIVersion version) {
}
@Override
- protected PurgeLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
- return new PurgeLocalHistoryRequest(target, replyTo);
+ protected PurgeLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final long sequence,
+ final ActorRef replyTo) {
+ return new PurgeLocalHistoryRequest(target, sequence, replyTo);
}
}
public final class ReadTransactionRequest extends AbstractReadTransactionRequest<ReadTransactionRequest> {
private static final long serialVersionUID = 1L;
- public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final @Nonnull ActorRef replyTo,
- final @Nonnull YangInstanceIdentifier path) {
- super(identifier, replyTo, path);
+ public ReadTransactionRequest(final @Nonnull TransactionIdentifier identifier, final long sequence,
+ final @Nonnull ActorRef replyTo, final @Nonnull YangInstanceIdentifier path) {
+ super(identifier, sequence, replyTo, path);
}
private ReadTransactionRequest(final ReadTransactionRequest request, final ABIVersion version) {
}
@Override
- ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final ActorRef replyTo,
- final YangInstanceIdentifier path) {
- return new ReadTransactionRequest(target, replyTo, path);
+ ReadTransactionRequest createReadRequest(final TransactionIdentifier target, final long sequence,
+ final ActorRef replyTo, final YangInstanceIdentifier path) {
+ return new ReadTransactionRequest(target, sequence, replyTo, path);
}
}
\ No newline at end of file
private static final long serialVersionUID = 1L;
private final Optional<NormalizedNode<?, ?>> data;
- public ReadTransactionSuccess(final TransactionIdentifier identifier, final Optional<NormalizedNode<?, ?>> data) {
- super(identifier);
+ public ReadTransactionSuccess(final TransactionIdentifier identifier, final long sequence,
+ final Optional<NormalizedNode<?, ?>> data) {
+ super(identifier, sequence);
this.data = Preconditions.checkNotNull(data);
}
}
@Override
- protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target) {
- return new ReadTransactionSuccess(target, data);
+ protected ReadTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+ return new ReadTransactionSuccess(target, sequence, data);
}
}
public final class TransactionAbortRequest extends TransactionRequest<TransactionAbortRequest> {
private static final long serialVersionUID = 1L;
- public TransactionAbortRequest(final TransactionIdentifier target, final ActorRef replyTo) {
- super(target, replyTo);
+ public TransactionAbortRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+ super(target, sequence, replyTo);
}
@Override
}
@Override
- protected TransactionAbortRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
- return new TransactionAbortRequest(target, replyTo);
+ protected TransactionAbortRequest createRequest(final TransactionIdentifier target, final long sequence,
+ final ActorRef replyTo) {
+ return new TransactionAbortRequest(target, sequence, replyTo);
}
}
public final class TransactionAbortSuccess extends TransactionSuccess<TransactionAbortSuccess> {
private static final long serialVersionUID = 1L;
- public TransactionAbortSuccess(final TransactionIdentifier identifier) {
- super(identifier);
+ public TransactionAbortSuccess(final TransactionIdentifier identifier, final long sequence) {
+ super(identifier, sequence);
}
@Override
}
@Override
- protected TransactionAbortSuccess createSuccess(final TransactionIdentifier target) {
- return new TransactionAbortSuccess(target);
+ protected TransactionAbortSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+ return new TransactionAbortSuccess(target, sequence);
}
}
public final class TransactionCanCommitSuccess extends TransactionSuccess<TransactionCanCommitSuccess> {
private static final long serialVersionUID = 1L;
- public TransactionCanCommitSuccess(final TransactionIdentifier identifier) {
- super(identifier);
+ public TransactionCanCommitSuccess(final TransactionIdentifier identifier, final long sequence) {
+ super(identifier, sequence);
}
@Override
}
@Override
- protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target) {
- return new TransactionCanCommitSuccess(target);
+ protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+ return new TransactionCanCommitSuccess(target, sequence);
}
}
public final class TransactionCommitSuccess extends TransactionSuccess<TransactionCommitSuccess> {
private static final long serialVersionUID = 1L;
- public TransactionCommitSuccess(final TransactionIdentifier identifier) {
- super(identifier);
+ public TransactionCommitSuccess(final TransactionIdentifier identifier, final long sequence) {
+ super(identifier, sequence);
}
@Override
}
@Override
- protected TransactionCommitSuccess createSuccess(final TransactionIdentifier target) {
- return new TransactionCommitSuccess(target);
+ protected TransactionCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+ return new TransactionCommitSuccess(target, sequence);
}
}
public final class TransactionDoCommitRequest extends TransactionRequest<TransactionDoCommitRequest> {
private static final long serialVersionUID = 1L;
- public TransactionDoCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) {
- super(target, replyTo);
+ public TransactionDoCommitRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+ super(target, sequence, replyTo);
}
@Override
}
@Override
- protected TransactionDoCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
- return new TransactionDoCommitRequest(target, replyTo);
+ protected TransactionDoCommitRequest createRequest(final TransactionIdentifier target, final long sequence,
+ final ActorRef replyTo) {
+ return new TransactionDoCommitRequest(target, sequence, replyTo);
}
}
public final class TransactionFailure extends RequestFailure<TransactionIdentifier, TransactionFailure> {
private static final long serialVersionUID = 1L;
- TransactionFailure(final TransactionIdentifier target, final RequestException cause) {
- super(target, cause);
+ TransactionFailure(final TransactionIdentifier target, final long sequence, final RequestException cause) {
+ super(target, sequence, cause);
}
@Override
}
@Override
- protected TransactionFailure createFailure(final TransactionIdentifier target, final RequestException cause) {
- return new TransactionFailure(target, cause);
+ protected TransactionFailure createFailure(final TransactionIdentifier target, final long sequence,
+ final RequestException cause) {
+ return new TransactionFailure(target, sequence, cause);
}
@Override
public final class TransactionPreCommitRequest extends TransactionRequest<TransactionPreCommitRequest> {
private static final long serialVersionUID = 1L;
- public TransactionPreCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) {
- super(target, replyTo);
+ public TransactionPreCommitRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+ super(target, sequence, replyTo);
}
@Override
}
@Override
- protected TransactionPreCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
- return new TransactionPreCommitRequest(target, replyTo);
+ protected TransactionPreCommitRequest createRequest(final TransactionIdentifier target, final long sequence,
+ final ActorRef replyTo) {
+ return new TransactionPreCommitRequest(target, sequence, replyTo);
}
}
public final class TransactionPreCommitSuccess extends TransactionSuccess<TransactionPreCommitSuccess> {
private static final long serialVersionUID = 1L;
- public TransactionPreCommitSuccess(final TransactionIdentifier identifier) {
- super(identifier);
+ public TransactionPreCommitSuccess(final TransactionIdentifier identifier, final long sequence) {
+ super(identifier, sequence);
}
@Override
}
@Override
- protected TransactionPreCommitSuccess createSuccess(final TransactionIdentifier target) {
- return new TransactionPreCommitSuccess(target);
+ protected TransactionPreCommitSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+ return new TransactionPreCommitSuccess(target, sequence);
}
}
public abstract class TransactionRequest<T extends TransactionRequest<T>> extends Request<TransactionIdentifier, T> {
private static final long serialVersionUID = 1L;
- TransactionRequest(final TransactionIdentifier identifier, final ActorRef replyTo) {
- super(identifier, replyTo);
+ TransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo) {
+ super(identifier, sequence, replyTo);
}
TransactionRequest(final T request, final ABIVersion version) {
@Override
public final TransactionFailure toRequestFailure(final RequestException cause) {
- return new TransactionFailure(getTarget(), cause);
+ return new TransactionFailure(getTarget(), getSequence(), cause);
}
@Override
public abstract class TransactionSuccess<T extends TransactionSuccess<T>> extends RequestSuccess<TransactionIdentifier, T> {
private static final long serialVersionUID = 1L;
- TransactionSuccess(final TransactionIdentifier identifier) {
- super(identifier);
+ TransactionSuccess(final TransactionIdentifier identifier, final long sequence) {
+ super(identifier, sequence);
}
@Override
abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externalizable {
private static final long serialVersionUID = 1L;
+
private T message;
- private long sequence;
- private long retry;
+ private long sessionId;
+ private long txSequence;
public AbstractEnvelopeProxy() {
// for Externalizable
AbstractEnvelopeProxy(final Envelope<T> envelope) {
message = envelope.getMessage();
- sequence = envelope.getSequence();
- retry = envelope.getRetry();
+ txSequence = envelope.getTxSequence();
+ sessionId = envelope.getSessionId();
}
@Override
public final void writeExternal(final ObjectOutput out) throws IOException {
- WritableObjects.writeLongs(out, sequence, retry);
+ WritableObjects.writeLongs(out, sessionId, txSequence);
out.writeObject(message);
}
@Override
public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
final byte header = WritableObjects.readLongHeader(in);
- sequence = WritableObjects.readFirstLong(in, header);
- retry = WritableObjects.readSecondLong(in, header);
+ sessionId = WritableObjects.readFirstLong(in, header);
+ txSequence = WritableObjects.readSecondLong(in, header);
message = (T) in.readObject();
}
- abstract Envelope<T> createEnvelope(T message, long sequence, long retry);
+ abstract Envelope<T> createEnvelope(T message, long sessionId, long txSequence);
final Object readResolve() {
- return createEnvelope(message, sequence, retry);
+ return createEnvelope(message, sessionId, txSequence);
}
}
\ No newline at end of file
import java.io.ObjectOutput;
import javax.annotation.Nonnull;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
/**
* Abstract Externalizable proxy for use with {@link Message} subclasses.
abstract class AbstractMessageProxy<T extends WritableIdentifier, C extends Message<T, C>> implements Externalizable {
private static final long serialVersionUID = 1L;
private T target;
+ private long sequence;
protected AbstractMessageProxy() {
// For Externalizable
AbstractMessageProxy(final @Nonnull C message) {
this.target = message.getTarget();
+ this.sequence = message.getSequence();
}
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
target.writeTo(out);
+ WritableObjects.writeLong(out, sequence);
}
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
target = Verify.verifyNotNull(readTarget(in));
+ sequence = WritableObjects.readLong(in);
}
protected final Object readResolve() {
- return Verify.verifyNotNull(createMessage(target));
+ return Verify.verifyNotNull(createMessage(target, sequence));
}
protected abstract @Nonnull T readTarget(@Nonnull DataInput in) throws IOException;
- abstract @Nonnull C createMessage(@Nonnull T target);
+ abstract @Nonnull C createMessage(@Nonnull T target, long sequence);
}
\ No newline at end of file
}
@Override
- final C createResponse(final T target) {
- return createFailure(target, cause);
+ final C createResponse(final T target, final long sequence) {
+ return createFailure(target, sequence, cause);
}
- protected abstract @Nonnull C createFailure(@Nonnull T target, @Nonnull RequestException cause);
+ protected abstract @Nonnull C createFailure(@Nonnull T target, long sequence, @Nonnull RequestException cause);
}
\ No newline at end of file
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
super.writeExternal(out);
- out.writeUTF(Serialization.serializedActorPath(replyTo));
+ out.writeObject(Serialization.serializedActorPath(replyTo));
}
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
- replyTo = JavaSerializer.currentSystem().value().provider().resolveActorRef(in.readUTF());
+ replyTo = JavaSerializer.currentSystem().value().provider().resolveActorRef((String) in.readObject());
}
@Override
- final @Nonnull C createMessage(@Nonnull final T target) {
- return createRequest(target, replyTo);
+ final @Nonnull C createMessage(@Nonnull final T target, final long sequence) {
+ return createRequest(target, sequence, replyTo);
}
- protected abstract @Nonnull C createRequest(@Nonnull T target, @Nonnull ActorRef replyTo);
+ protected abstract @Nonnull C createRequest(@Nonnull T target, long sequence, @Nonnull ActorRef replyTo);
}
\ No newline at end of file
}
@Override
- final C createMessage(final T target) {
- return createResponse(target);
+ final C createMessage(final T target, final long sequence) {
+ return createResponse(target, sequence);
}
- abstract @Nonnull C createResponse(@Nonnull T target);
+ abstract @Nonnull C createResponse(@Nonnull T target, long sequence);
}
}
@Override
- final C createResponse(final T target) {
- return createSuccess(target);
+ final C createResponse(final T target, final long sequence) {
+ return createSuccess(target, sequence);
}
- protected abstract @Nonnull C createSuccess(@Nonnull T target);
+ protected abstract @Nonnull C createSuccess(@Nonnull T target, long sequence);
}
\ No newline at end of file
private static final long serialVersionUID = 1L;
private final T message;
- private final long sequence;
- private final long retry;
+ private final long txSequence;
+ private final long sessionId;
- Envelope(final T message, final long sequence, final long retry) {
+ Envelope(final T message, final long sessionId, final long txSequence) {
this.message = Preconditions.checkNotNull(message);
- this.sequence = sequence;
- this.retry = retry;
+ this.sessionId = sessionId;
+ this.txSequence = txSequence;
}
/**
}
/**
- * Get the message sequence of this envelope.
+ * Get the message transmission sequence of this envelope.
*
* @return Message sequence
*/
- public long getSequence() {
- return sequence;
+ public long getTxSequence() {
+ return txSequence;
}
/**
- * Get the message retry counter.
+ * Get the session identifier.
*
- * @return Retry counter
+ * @return Session identifier
*/
- public long getRetry() {
- return retry;
+ public long getSessionId() {
+ return sessionId;
}
@Override
public String toString() {
- return MoreObjects.toStringHelper(Envelope.class).add("sequence", Long.toUnsignedString(sequence, 16)).
- add("retry", retry).add("message", message).toString();
+ return MoreObjects.toStringHelper(Envelope.class).add("sessionId", Long.toHexString(sessionId))
+ .add("txSequence", Long.toHexString(txSequence)).add("message", message).toString();
}
final Object writeReplace() {
public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
private static final long serialVersionUID = 1L;
- public FailureEnvelope(final RequestFailure<?, ?> message, final long sequence, final long retry) {
- super(message, sequence, retry);
+ public FailureEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
+ super(message, sessionId, txSequence);
}
@Override
}
@Override
- FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sequence, final long retry) {
- return new FailureEnvelope(message, sequence, retry);
+ FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
+ return new FailureEnvelope(message, sessionId, txSequence);
}
}
public abstract class Message<T extends WritableIdentifier, C extends Message<T, C>> implements Immutable,
Serializable {
private static final long serialVersionUID = 1L;
- private final T target;
+
private final ABIVersion version;
+ private final long sequence;
+ private final T target;
- private Message(final ABIVersion version, final T target) {
+ private Message(final ABIVersion version, final T target, final long sequence) {
this.target = Preconditions.checkNotNull(target);
this.version = Preconditions.checkNotNull(version);
+ this.sequence = sequence;
}
- Message(final T target) {
- this(ABIVersion.current(), target);
+ Message(final T target, final long sequence) {
+ this(ABIVersion.current(), target, sequence);
}
Message(final C msg, final ABIVersion version) {
- this(version, msg.getTarget());
+ this(version, msg.getTarget(), msg.getSequence());
}
/**
return target;
}
+ /**
+ * Get the logical sequence number.
+ *
+ * @return logical sequence number
+ */
+ public final long getSequence() {
+ return sequence;
+ }
+
@VisibleForTesting
- public final ABIVersion getVersion() {
+ public final @Nonnull ABIVersion getVersion() {
return version;
}
* @throws NullPointerException if toStringHelper is null
*/
protected @Nonnull ToStringHelper addToStringAttributes(final @Nonnull ToStringHelper toStringHelper) {
- return toStringHelper.add("target", target);
+ return toStringHelper.add("target", target).add("sequence", Long.toUnsignedString(sequence));
}
/**
private static final long serialVersionUID = 1L;
private final ActorRef replyTo;
- protected Request(final @Nonnull T target, final @Nonnull ActorRef replyTo) {
- super(target);
+ protected Request(final @Nonnull T target, final long sequence, final @Nonnull ActorRef replyTo) {
+ super(target, sequence);
this.replyTo = Preconditions.checkNotNull(replyTo);
}
public final class RequestEnvelope extends Envelope<Request<?, ?>> {
private static final long serialVersionUID = 1L;
- public RequestEnvelope(final Request<?, ?> message, final long sequence, final long retry) {
- super(message, sequence, retry);
+ public RequestEnvelope(final Request<?, ?> message, final long sessionId, final long txSequence) {
+ super(message, sessionId, txSequence);
}
@Override
* @throws NullPointerException if cause is null
*/
public void sendFailure(final RequestException cause) {
- sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSequence(), getRetry()));
+ sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence()));
}
/**
* @throws NullPointerException if success is null
*/
public void sendSuccess(final RequestSuccess<?, ?> success) {
- sendResponse(new SuccessEnvelope(success, getSequence(), getRetry()));
+ sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence()));
}
private void sendResponse(final ResponseEnvelope<?> envelope) {
}
@Override
- RequestEnvelope createEnvelope(final Request<?, ?> message, final long sequence, final long retry) {
- return new RequestEnvelope(message, sequence, retry);
+ RequestEnvelope createEnvelope(final Request<?, ?> message, final long sessionId, final long txSequence) {
+ return new RequestEnvelope(message, sessionId, txSequence);
}
}
this.cause = Preconditions.checkNotNull(failure.getCause());
}
- protected RequestFailure(final @Nonnull T target, final @Nonnull RequestException cause) {
- super(target);
+ protected RequestFailure(final @Nonnull T target, final long sequence, final @Nonnull RequestException cause) {
+ super(target, sequence);
this.cause = Preconditions.checkNotNull(cause);
}
super(success, version);
}
- protected RequestSuccess(final @Nonnull T target) {
- super(target);
+ protected RequestSuccess(final @Nonnull T target, final long sequence) {
+ super(target, sequence);
}
@Override
public abstract class Response<T extends WritableIdentifier, C extends Response<T, C>> extends Message<T, C> {
private static final long serialVersionUID = 1L;
- Response(final @Nonnull T target) {
- super(target);
+ Response(final @Nonnull T target, final long sequence) {
+ super(target, sequence);
}
Response(final @Nonnull C response, final @Nonnull ABIVersion version) {
public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
private static final long serialVersionUID = 1L;
- ResponseEnvelope(final T message, final long sequence, final long retry) {
- super(message, sequence, retry);
+ ResponseEnvelope(final T message, final long sessionId, final long txSequence) {
+ super(message, sessionId, txSequence);
}
}
public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
private static final long serialVersionUID = 1L;
- public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sequence, final long retry) {
- super(message, sequence, retry);
+ public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
+ super(message, sessionId, txSequence);
}
@Override
}
@Override
- SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sequence, final long retry) {
- return new SuccessEnvelope(message, sequence, retry);
+ SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
+ return new SuccessEnvelope(message, sessionId, txSequence);
}
}
public class BackendInfo {
private final ABIVersion version;
private final ActorRef actor;
+ private final int maxMessages;
+ private final long sessionId;
- protected BackendInfo(final ActorRef actor, final ABIVersion version) {
+ protected BackendInfo(final ActorRef actor, final long sessionId, final ABIVersion version, final int maxMessages) {
this.version = Preconditions.checkNotNull(version);
this.actor = Preconditions.checkNotNull(actor);
+ Preconditions.checkArgument(maxMessages > 0, "Maximum messages has to be positive, not %s", maxMessages);
+ this.maxMessages = maxMessages;
+ this.sessionId = sessionId;
}
public final ActorRef getActor() {
return version;
}
+ public final int getMaxMessages() {
+ return maxMessages;
+ }
+
+ public final long getSessionId() {
+ return sessionId;
+ }
+
@Override
public final int hashCode() {
return super.hashCode();
}
protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
- return toStringHelper.add("actor", actor).add("version", version);
+ return toStringHelper.add("actor", actor).add("sessionId", sessionId).add("version", version)
+ .add("maxMessages", maxMessages);
}
}
}
// This method is executing in the actor context, hence we can safely interact with the queue
- private ClientActorBehavior doSendRequest(final long sequence, final TransactionRequest<?> request,
- final RequestCallback callback) {
+ private ClientActorBehavior doSendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
// Get or allocate queue for the request
final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
// Note this is a tri-state return and can be null
- final Optional<FiniteDuration> result = queue.enqueueRequest(sequence, request, callback);
+ final Optional<FiniteDuration> result = queue.enqueueRequest(request, callback);
if (result == null) {
// Happy path: we are done here
return this;
* @param request Request to send
* @param callback Callback to invoke
*/
- public final void sendRequest(final long sequence, final TransactionRequest<?> request, final RequestCallback callback) {
- context().executeInActor(cb -> cb.doSendRequest(sequence, request, callback));
+ public final void sendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
+ context().executeInActor(cb -> cb.doSendRequest(request, callback));
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import java.util.AbstractQueue;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Queue;
+
+/**
+ * A specialized always-empty implementation of {@link java.util.Queue}. This implementation will always refuse new
+ * elements in its {@link #offer(Object)} method.
+
+ * @author Robert Varga
+ *
+ * @param <E> the type of elements held in this collection
+ */
+// TODO: move this class into yangtools.util
+final class EmptyQueue<T> extends AbstractQueue<T> {
+ private static final EmptyQueue<?> INSTANCE = new EmptyQueue<>();
+
+ private EmptyQueue() {
+ // No instances
+ }
+
+ @SuppressWarnings("unchecked")
+ static <T> Queue<T> getInstance() {
+ return (Queue<T>) INSTANCE;
+ }
+
+ @Override
+ public boolean offer(final T e) {
+ return false;
+ }
+
+ @Override
+ public T poll() {
+ return null;
+ }
+
+ @Override
+ public T peek() {
+ return null;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return Collections.emptyIterator();
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+}
\ No newline at end of file
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
-import java.util.Deque;
+import com.google.common.base.Verify;
+import java.util.ArrayDeque;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Optional;
+import java.util.Queue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
TimeUnit.NANOSECONDS);
/**
- * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can
- * progress at different speeds, these may be completed out of order.
- *
- * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to
- * figure out a more efficient lookup mechanism to deal with responses which do not match the queue
- * order.
+ * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
+ * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
+ * resolution.
*/
- private final Deque<SequencedQueueEntry> queue = new LinkedList<>();
+ private static final int DEFAULT_TX_LIMIT = 1000;
+
private final Ticker ticker;
private final Long cookie;
- // Updated/consulted from actor context only
+ /*
+ * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
+ * with the limit changing between reconnects (which imply retransmission).
+ *
+ * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
+ * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
+ * and one for requests which have not been transmitted at all.
+ *
+ * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
+ * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
+ * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
+ * to retransmit -- hence the second queue.
+ */
+ private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
+ private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
+ private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
+
/**
* Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
* resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
private CompletionStage<? extends BackendInfo> backendProof;
private BackendInfo backend;
+ // This is not final because we need to be able to replace it.
+ private long txSequence;
+
+ private int lastTxLimit = DEFAULT_TX_LIMIT;
+
/**
* Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
*/
Preconditions.checkState(notClosed, "Queue %s is closed", this);
}
+ private long nextTxSequence() {
+ return txSequence++;
+ }
+
/**
* Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
* the following scenarios:
* 1) The request has been enqueued and transmitted. No further actions are necessary
* 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
- * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that
+ * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
* process needs to complete before transmission occurs
*
* These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
* @param callback Callback to be invoked
* @return Optional duration with semantics described above.
*/
- @Nullable Optional<FiniteDuration> enqueueRequest(final long sequence, final Request<?, ?> request,
- final RequestCallback callback) {
+ @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
checkNotClosed();
final long now = ticker.read();
- final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now);
-
- queue.add(e);
- LOG.debug("Enqueued request {} to queue {}", request, this);
-
+ final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
if (backend == null) {
+ LOG.debug("No backend available, request resolution");
+ pending.add(e);
return Optional.empty();
}
+ if (!lastInflight.isEmpty()) {
+ LOG.debug("Retransmit not yet complete, delaying request {}", request);
+ pending.add(e);
+ return null;
+ }
+ if (currentInflight.size() >= lastTxLimit) {
+ LOG.debug("Queue is at capacity, delayed sending of request {}", request);
+ pending.add(e);
+ return null;
+ }
- e.retransmit(backend, now);
+ // Ready to transmit
+ currentInflight.offer(e);
+ LOG.debug("Enqueued request {} to queue {}", request, this);
+
+ e.retransmit(backend, nextTxSequence(), now);
if (expectingTimer == null) {
expectingTimer = now + REQUEST_TIMEOUT_NANOS;
return Optional.of(INITIAL_REQUEST_TIMEOUT);
}
}
- ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
- // Responses to different targets may arrive out of order, hence we use an iterator
+ /*
+ * We are using tri-state return here to indicate one of three conditions:
+ * - if a matching entry is found, return an Optional containing it
+ * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
+ * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
+ */
+ private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
+ final ResponseEnvelope<?> envelope) {
+ // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
+ // to use an iterator
final Iterator<SequencedQueueEntry> it = queue.iterator();
while (it.hasNext()) {
final SequencedQueueEntry e = it.next();
- if (e.acceptsResponse(response)) {
- lastProgress = ticker.read();
- it.remove();
- LOG.debug("Completing request {} with {}", e, response);
- return e.complete(response.getMessage());
+ final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
+
+ final Request<?, ?> request = e.getRequest();
+ final Response<?, ?> response = envelope.getMessage();
+
+ // First check for matching target, or move to next entry
+ if (!request.getTarget().equals(response.getTarget())) {
+ continue;
+ }
+
+ // Sanity-check logical sequence, ignore any out-of-order messages
+ if (request.getSequence() != response.getSequence()) {
+ LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
+ return Optional.empty();
+ }
+
+ // Now check session match
+ if (envelope.getSessionId() != txDetails.getSessionId()) {
+ LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
+ return Optional.empty();
+ }
+ if (envelope.getTxSequence() != txDetails.getTxSequence()) {
+ LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
+ return Optional.empty();
}
+
+ LOG.debug("Completing request {} with {}", request, envelope);
+ it.remove();
+ return Optional.of(e);
}
- LOG.debug("No request matching {} found", response);
- return current;
+ return null;
+ }
+
+ ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
+ Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
+ if (maybeEntry == null) {
+ maybeEntry = findMatchingEntry(lastInflight, envelope);
+ }
+
+ if (maybeEntry == null || !maybeEntry.isPresent()) {
+ LOG.warn("No request matching {} found, ignoring response", envelope);
+ return current;
+ }
+
+ lastProgress = ticker.read();
+ final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
+
+ // We have freed up a slot, try to transmit something
+ if (backend != null) {
+ final int toSend = lastTxLimit - currentInflight.size();
+ if (toSend > 0) {
+ runTransmit(toSend);
+ }
+ }
+
+ return ret;
+ }
+
+ private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
+ int toSend = count;
+
+ while (toSend > 0) {
+ final SequencedQueueEntry e = queue.poll();
+ if (e == null) {
+ break;
+ }
+
+ LOG.debug("Transmitting entry {}", e);
+ e.retransmit(backend, nextTxSequence(), lastProgress);
+ toSend--;
+ }
+
+ return toSend;
+ }
+
+ private void runTransmit(final int count) {
+ final int toSend;
+
+ // Process lastInflight first, possibly clearing it
+ if (!lastInflight.isEmpty()) {
+ toSend = transmitEntries(lastInflight, count);
+ if (lastInflight.isEmpty()) {
+ // We won't be needing the queue anymore, change it to specialized implementation
+ lastInflight = EmptyQueue.getInstance();
+ }
+ } else {
+ toSend = count;
+ }
+
+ // Process pending next.
+ transmitEntries(pending, toSend);
}
Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
+ Preconditions.checkNotNull(backend);
if (!proof.equals(backendProof)) {
LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
return Optional.empty();
}
- this.backend = Preconditions.checkNotNull(backend);
- backendProof = null;
LOG.debug("Resolved backend {}", backend);
- if (queue.isEmpty()) {
- // No pending requests, hence no need for a timer
- return Optional.empty();
+ // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
+ // and also not to exceed new limits
+ final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
+ newLast.addAll(currentInflight);
+ newLast.addAll(lastInflight);
+ lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
+
+ // Clear currentInflight, possibly compacting it
+ final int txLimit = backend.getMaxMessages();
+ if (lastTxLimit > txLimit) {
+ currentInflight = new ArrayDeque<>();
+ } else {
+ currentInflight.clear();
}
- LOG.debug("Resending requests to backend {}", backend);
- final long now = ticker.read();
- for (SequencedQueueEntry e : queue) {
- e.retransmit(backend, now);
- }
+ // We are ready to roll
+ this.backend = backend;
+ backendProof = null;
+ txSequence = 0;
+ lastTxLimit = txLimit;
+ lastProgress = ticker.read();
- if (expectingTimer != null) {
- // We already have a timer going, no need to schedule a new one
+ // No pending requests, return
+ if (lastInflight.isEmpty() && pending.isEmpty()) {
return Optional.empty();
}
- // Above loop may have cost us some time. Recalculate timeout.
- final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
- expectingTimer = nextTicks;
- return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS));
+ LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
+
+ runTransmit(lastTxLimit);
+
+ // Calculate next timer if necessary
+ if (expectingTimer == null) {
+ // Request transmission may have cost us some time. Recalculate timeout.
+ final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
+ expectingTimer = nextTicks;
+ return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
+ } else {
+ return Optional.empty();
+ }
}
boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
}
boolean hasCompleted() {
- return !notClosed && queue.isEmpty();
+ return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
}
/**
expectingTimer = null;
final long now = ticker.read();
- if (!queue.isEmpty()) {
+ if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
final long ticksSinceProgress = now - lastProgress;
if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
}
// We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
- final SequencedQueueEntry head = queue.peek();
+ final SequencedQueueEntry head = currentInflight.peek();
if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
backend = null;
LOG.debug("Queue {} invalidated backend info", this);
}
}
+ private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
+ queue.forEach(e -> e.poison(cause));
+ queue.clear();
+ }
+
void poison(final RequestException cause) {
close();
- SequencedQueueEntry e = queue.poll();
- while (e != null) {
- e.poison(cause);
- e = queue.poll();
- }
+ poisonQueue(currentInflight, cause);
+ poisonQueue(lastInflight, cause);
+ poisonQueue(pending, cause);
}
// FIXME: add a caller from ClientSingleTransaction
import akka.actor.ActorRef;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
-import java.util.Optional;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information.
*
* @author Robert Varga
- *
- * @param <I> Target identifier type
*/
final class SequencedQueueEntry {
- private static final class LastTry {
- final long timeTicks;
- final long retry;
-
- LastTry(final long retry, final long timeTicks) {
- this.retry = retry;
- this.timeTicks = timeTicks;
- }
- }
-
private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class);
private final Request<?, ?> request;
private final RequestCallback callback;
private final long enqueuedTicks;
- private final long sequence;
- private Optional<LastTry> lastTry = Optional.empty();
+ private TxDetails txDetails;
- SequencedQueueEntry(final Request<?, ?> request, final long sequence, final RequestCallback callback,
+ SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback,
final long now) {
this.request = Preconditions.checkNotNull(request);
this.callback = Preconditions.checkNotNull(callback);
this.enqueuedTicks = now;
- this.sequence = sequence;
}
- long getSequence() {
- return sequence;
+ Request<?, ?> getRequest() {
+ return request;
}
- boolean acceptsResponse(final ResponseEnvelope<?> response) {
- return getSequence() == response.getSequence() && request.getTarget().equals(response.getMessage().getTarget());
+ @Nullable TxDetails getTxDetails() {
+ return txDetails;
}
- long getCurrentTry() {
- return lastTry.isPresent() ? lastTry.get().retry : 0;
- }
-
ClientActorBehavior complete(final Response<?, ?> response) {
LOG.debug("Completing request {} with {}", request, response);
return callback.complete(response);
boolean isTimedOut(final long now, final long timeoutNanos) {
final long elapsed;
- if (lastTry.isPresent()) {
- elapsed = now - lastTry.get().timeTicks;
+ if (txDetails != null) {
+ elapsed = now - txDetails.getTimeTicks();
} else {
elapsed = now - enqueuedTicks;
}
}
}
- void retransmit(final BackendInfo backend, final long now) {
- final long retry = lastTry.isPresent() ? lastTry.get().retry + 1 : 0;
- final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), sequence, retry);
+ void retransmit(final BackendInfo backend, final long txSequence, final long now) {
+ final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()),
+ backend.getSessionId(), txSequence);
final ActorRef actor = backend.getActor();
- LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
+ LOG.trace("Transmitting request {} as {} to {}", request, toSend, actor);
actor.tell(toSend, ActorRef.noSender());
- lastTry = Optional.of(new LastTry(retry, now));
+ txDetails = new TxDetails(backend.getSessionId(), txSequence, now);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString();
}
+
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+/**
+ * Holder class for transmission details about a particular {@link SequencedQueueEntry}.
+ *
+ * @author Robert Varga
+ */
+final class TxDetails {
+ private final long sessionId;
+ private final long txSequence;
+ private final long timeTicks;
+
+ TxDetails(final long sessionId, final long txSequence, final long timeTicks) {
+ this.sessionId = sessionId;
+ this.txSequence = txSequence;
+ this.timeTicks = timeTicks;
+ }
+
+ long getSessionId() {
+ return sessionId;
+ }
+
+ long getTxSequence() {
+ return txSequence;
+ }
+
+ long getTimeTicks() {
+ return timeTicks;
+ }
+}
\ No newline at end of file
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
private static final long serialVersionUID = 1L;
MockFailure(final WritableIdentifier target, final RequestException cause) {
- super(target, cause);
+ super(target, 0, cause);
}
@Override
private static final long serialVersionUID = 1L;
MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
- super(target, replyTo);
+ super(target, 0, replyTo);
}
@Override
ticker.increment(ThreadLocalRandom.current().nextLong());
mockActor = TestProbe.apply(actorSystem);
- mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
+ mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
mockResponse = mockRequest.toRequestFailure(mockCause);
- entry = new SequencedQueueEntry(mockRequest, 0, mockCallback, ticker.read());
+ entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read());
}
@After
}
@Test
- public void testGetSequence() {
- assertEquals(0, entry.getSequence());
- }
-
- @Test
- public void testGetCurrentTry() {
- assertEquals(0, entry.getCurrentTry());
- entry.retransmit(mockBackendInfo, ticker.read());
- assertEquals(0, entry.getCurrentTry());
- entry.retransmit(mockBackendInfo, ticker.read());
- assertEquals(1, entry.getCurrentTry());
- entry.retransmit(mockBackendInfo, ticker.read());
- assertEquals(2, entry.getCurrentTry());
+ public void testGetTxDetails() {
+ assertNull(entry.getTxDetails());
+ entry.retransmit(mockBackendInfo, 0, ticker.read());
+ assertEquals(0, entry.getTxDetails().getTxSequence());
+ entry.retransmit(mockBackendInfo, 1, ticker.read());
+ assertEquals(1, entry.getTxDetails().getTxSequence());
+ entry.retransmit(mockBackendInfo, 3, ticker.read());
+ assertEquals(3, entry.getTxDetails().getTxSequence());
}
@Test
assertTrue(entry.isTimedOut(ticker.read(), 0));
assertFalse(entry.isTimedOut(ticker.read(), 1));
- entry.retransmit(mockBackendInfo, ticker.read());
+ entry.retransmit(mockBackendInfo, 0, ticker.read());
assertTrue(entry.isTimedOut(ticker.read(), 0));
ticker.increment(10);
assertTrue(entry.isTimedOut(ticker.read(), 10));
assertFalse(entry.isTimedOut(ticker.read(), 20));
- entry.retransmit(mockBackendInfo, ticker.read());
+ entry.retransmit(mockBackendInfo, 1, ticker.read());
assertTrue(entry.isTimedOut(ticker.read(), 0));
ticker.increment(10);
assertTrue(entry.isTimedOut(ticker.read(), 10));
@Test
public void testRetransmit() {
assertFalse(mockActor.msgAvailable());
- entry.retransmit(mockBackendInfo, ticker.read());
+ entry.retransmit(mockBackendInfo, 0, ticker.read());
assertTrue(mockActor.msgAvailable());
assertRequestEquals(mockRequest, mockActor.receiveOne(Duration.apply(5, TimeUnit.SECONDS)));
assertTrue(o instanceof RequestEnvelope);
final RequestEnvelope actual = (RequestEnvelope) o;
- assertEquals(0, actual.getRetry());
- assertEquals(0, actual.getSequence());
+ assertEquals(0, actual.getSessionId());
+ assertEquals(0, actual.getTxSequence());
assertEquals(expected.getTarget(), actual.getMessage().getTarget());
}
}
private static final long serialVersionUID = 1L;
MockFailure(final WritableIdentifier target, final RequestException cause) {
- super(target, cause);
+ super(target, 0, cause);
}
@Override
private static final long serialVersionUID = 1L;
MockRequest(final WritableIdentifier target, final ActorRef replyTo) {
- super(target, replyTo);
+ super(target, 0, replyTo);
}
@Override
ticker.increment(ThreadLocalRandom.current().nextLong());
mockActor = TestProbe.apply(actorSystem);
- mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
+ mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
mockResponse = mockRequest.toRequestFailure(mockCause);
queue.close();
// Kaboom
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ queue.enqueueRequest(mockRequest, mockCallback);
}
@Test
@Test
public void testPoison() {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ queue.enqueueRequest(mockRequest, mockCallback);
queue.poison(mockCause);
final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
queue.poison(mockCause);
// Kaboom
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ queue.enqueueRequest(mockRequest, mockCallback);
}
@Test
@Test
public void testEnqueueRequestNeedsBackend() {
- final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
+ final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
assertNotNull(ret);
assertFalse(ret.isPresent());
@Test
public void testSetBackendWithNoResolution() {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ queue.enqueueRequest(mockRequest, mockCallback);
final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
@Test
public void testSetBackendWithWrongProof() {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ queue.enqueueRequest(mockRequest, mockCallback);
final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
assertTrue(queue.expectProof(proof));
}
@Test
- public void testSetbackedWithRequestsNoTimer() {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ public void testSetBackendWithRequestsNoTimer() {
+ queue.enqueueRequest(mockRequest, mockCallback);
final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
assertTrue(queue.expectProof(proof));
public void testEnqueueRequestNeedsTimer() {
setupBackend();
- final Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
+ final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
assertNotNull(ret);
assertTrue(ret.isPresent());
assertTransmit(mockRequest, 0);
setupBackend();
// First request
- Optional<FiniteDuration> ret = queue.enqueueRequest(0, mockRequest, mockCallback);
+ Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
assertNotNull(ret);
assertTrue(ret.isPresent());
assertTransmit(mockRequest, 0);
// Second request, no timer fired
- ret = queue.enqueueRequest(1, mockRequest2, mockCallback);
+ ret = queue.enqueueRequest(mockRequest2, mockCallback);
assertNull(ret);
assertTransmit(mockRequest2, 1);
}
@Test
public void testRunTimeoutWithoutShift() throws NoProgressException {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ queue.enqueueRequest(mockRequest, mockCallback);
final boolean ret = queue.runTimeout();
assertFalse(ret);
}
@Test
public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ queue.enqueueRequest(mockRequest, mockCallback);
ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1);
@Test
public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ setupBackend();
+
+ queue.enqueueRequest(mockRequest, mockCallback);
ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS);
@Test
public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ setupBackend();
+
+ queue.enqueueRequest(mockRequest, mockCallback);
ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1);
@Test(expected=NoProgressException.class)
public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ queue.enqueueRequest(mockRequest, mockCallback);
ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
@Test(expected=NoProgressException.class)
public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ queue.enqueueRequest(mockRequest, mockCallback);
ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
@Test
public void testCompleteSingle() {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ setupBackend();
+
+ queue.enqueueRequest(mockRequest, mockCallback);
ClientActorBehavior ret = queue.complete(mockBehavior, mockResponseEnvelope);
verify(mockCallback).complete(mockResponse);
@Test
public void testCompleteNull() {
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ setupBackend();
+
+ queue.enqueueRequest(mockRequest, mockCallback);
doReturn(null).when(mockCallback).complete(mockResponse);
public void testProgressRecord() throws NoProgressException {
setupBackend();
- queue.enqueueRequest(0, mockRequest, mockCallback);
+ queue.enqueueRequest(mockRequest, mockCallback);
ticker.increment(10);
- queue.enqueueRequest(1, mockRequest2, mockCallback);
+ queue.enqueueRequest(mockRequest2, mockCallback);
queue.complete(mockBehavior, mockResponseEnvelope);
ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11);
assertTrue(o instanceof RequestEnvelope);
final RequestEnvelope actual = (RequestEnvelope) o;
- assertEquals(0, actual.getRetry());
- assertEquals(sequence, actual.getSequence());
+ assertEquals(0, actual.getSessionId());
+ assertEquals(sequence, actual.getTxSequence());
assertSame(expected, actual.getMessage());
}
}
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_${scala.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.scala-lang.modules</groupId>
+ <artifactId>scala-java8-compat_${scala.version}</artifactId>
+ </dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.version}</artifactId>
final LocalHistoryIdentifier historyId, final long transactionId,
final java.util.Optional<ShardBackendInfo> backend) {
- final java.util.Optional<DataTree> dataTree = backend.flatMap(t -> t.getDataTree());
+ final java.util.Optional<DataTree> dataTree = backend.flatMap(ShardBackendInfo::getDataTree);
final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId);
if (dataTree.isPresent()) {
return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot());
checkSealed();
final SettableFuture<Boolean> ret = SettableFuture.create();
- client().sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(false)), t -> {
+ client().sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
if (t instanceof TransactionCommitSuccess) {
ret.set(Boolean.TRUE);
} else if (t instanceof RequestFailure) {
void abort(final VotingFuture<Void> ret) {
checkSealed();
- client.sendRequest(nextSequence(), new TransactionAbortRequest(getIdentifier(), client().self()), t -> {
+ client.sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), client().self()), t -> {
if (t instanceof TransactionAbortSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
void canCommit(final VotingFuture<?> ret) {
checkSealed();
- client.sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(true)), t -> {
+ client.sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
if (t instanceof TransactionCanCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
void preCommit(final VotingFuture<?> ret) {
checkSealed();
- client.sendRequest(nextSequence(), new TransactionPreCommitRequest(getIdentifier(), client().self()), t-> {
+ client.sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
if (t instanceof TransactionPreCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
void doCommit(final VotingFuture<?> ret) {
checkSealed();
- client.sendRequest(nextSequence(), new TransactionDoCommitRequest(getIdentifier(), client().self()), t-> {
+ client.sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
if (t instanceof TransactionCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
abstract void doAbort();
abstract TransactionRequest<?> doCommit(boolean coordinated);
-
}
transactions.remove(transaction.getIdentifier());
}
- void sendRequest(final long sequence, final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
- sendRequest(sequence, request, response -> {
+ void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
+ sendRequest(request, response -> {
completer.accept(response);
return this;
});
@Override
void doAbort() {
- client().sendRequest(nextSequence(), new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER);
+ client().sendRequest(new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER);
modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted"));
}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.OnComplete;
+import akka.actor.ActorRef;
+import akka.japi.Function;
+import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableBiMap.Builder;
import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.client.BackendInfo;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
+import scala.compat.java8.FutureConverters;
/**
* {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
* @author Robert Varga
*/
final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
- private static final ExecutionContext DIRECT_EXECUTION_CONTEXT =
- ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
private final ActorContext actorContext;
+ // FIXME: this counter should be in superclass somewhere
+ private final AtomicLong nextSessionId = new AtomicLong();
@GuardedBy("this")
private long nextShard = 1;
return NULL_FUTURE;
}
- final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<>();
+ final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<ShardBackendInfo>();
+
+ FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
+ LOG.debug("Looking up primary info for {} from {}", shardName, info);
+ return FutureConverters.toJava(Patterns.ask(info.getPrimaryShardActor(),
+ (Function<ActorRef, Object>) replyTo -> new ConnectClientRequest(null, replyTo,
+ ABIVersion.BORON, ABIVersion.current()), DEAD_TIMEOUT));
+ }).thenApply(response -> {
+ if (response instanceof RequestFailure) {
+ final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
+ LOG.debug("Connect request failed {}", failure, failure.getCause());
+ throw Throwables.propagate(failure.getCause());
+ }
- actorContext.findPrimaryShardAsync(shardName).onComplete(new OnComplete<PrimaryShardInfo>() {
- @Override
- public void onComplete(final Throwable t, final PrimaryShardInfo v) {
- if (t != null) {
- ret.completeExceptionally(t);
- } else {
- ret.complete(createBackendInfo(v, shardName, cookie));
- }
+ LOG.debug("Resolved backend information to {}", response);
+
+ Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
+ final ConnectClientSuccess success = (ConnectClientSuccess) response;
+
+ return new ShardBackendInfo(success.getBackend(),
+ nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
+ success.getDataTree(), success.getMaxMessages());
+ }).whenComplete((info, t) -> {
+ if (t != null) {
+ ret.completeExceptionally(t);
+ } else {
+ ret.complete(info);
}
- }, DIRECT_EXECUTION_CONTEXT);
+ });
LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
return ret;
}
-
- private static ABIVersion toABIVersion(final short version) {
- switch (version) {
- case DataStoreVersions.BORON_VERSION:
- return ABIVersion.BORON;
- }
-
- throw new IllegalArgumentException("Unsupported version " + version);
- }
-
- private static ShardBackendInfo createBackendInfo(final Object result, final String shardName, final Long cookie) {
- Preconditions.checkArgument(result instanceof PrimaryShardInfo);
- final PrimaryShardInfo info = (PrimaryShardInfo) result;
-
- LOG.debug("Creating backend information for {}", info);
- return new ShardBackendInfo(info.getPrimaryShardActor().resolveOne(DEAD_TIMEOUT).value().get().get(),
- toABIVersion(info.getPrimaryShardVersion()), shardName, UnsignedLong.fromLongBits(cookie),
- info.getLocalShardDataTree());
- }
}
// Make sure we send any modifications before issuing a read
ensureFlushedBuider();
- client().sendRequest(nextSequence(), request, completer);
+ client().sendRequest(request, completer);
return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
}
@Override
CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
final SettableFuture<Boolean> future = SettableFuture.create();
- return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), client().self(), path),
+ return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
t -> completeExists(future, t), future);
}
@Override
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
- return sendReadRequest(new ReadTransactionRequest(getIdentifier(), client().self(), path),
+ return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
t -> completeRead(future, t), future);
}
private void ensureInitializedBuider() {
if (!builderBusy) {
+ builder.setSequence(nextSequence());
builderBusy = true;
}
}
}
private void flushBuilder() {
- client().sendRequest(nextSequence(), builder.build(), this::completeModify);
+ client().sendRequest(builder.build(), this::completeModify);
builderBusy = false;
}
private final UnsignedLong cookie;
private final String shardName;
- ShardBackendInfo(final ActorRef actor, final ABIVersion version, final String shardName, final UnsignedLong cookie,
- final Optional<DataTree> dataTree) {
- super(actor, version);
+ ShardBackendInfo(final ActorRef actor, final long sessionId, final ABIVersion version, final String shardName,
+ final UnsignedLong cookie, final Optional<DataTree> dataTree, final int maxMessages) {
+ super(actor, sessionId, version, maxMessages);
this.shardName = Preconditions.checkNotNull(shardName);
this.cookie = Preconditions.checkNotNull(cookie);
this.dataTree = Preconditions.checkNotNull(dataTree);