--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A blank transaction request. This is used to provide backfill requests in converted retransmit scenarios, such as
+ * when a initial request to a transaction (such as a {@link ReadTransactionRequest}) is satisfied by the backend
+ * before the need to replay the transaction to a different remote backend.
+ *
+ * @author Robert Varga
+ */
+public final class IncrementTransactionSequenceRequest extends TransactionRequest<IncrementTransactionSequenceRequest> {
+ private static final long serialVersionUID = 1L;
+
+ private final long increment;
+
+ public IncrementTransactionSequenceRequest(final TransactionIdentifier identifier, final long sequence,
+ final ActorRef replyTo, final long increment) {
+ super(identifier, sequence, replyTo);
+ Preconditions.checkArgument(increment >= 0);
+ this.increment = increment;
+ }
+
+ @Override
+ protected IncrementTransactionSequenceRequestProxyV1 externalizableProxy(final ABIVersion version) {
+ return new IncrementTransactionSequenceRequestProxyV1(this);
+ }
+
+ @Override
+ protected IncrementTransactionSequenceRequest cloneAsVersion(final ABIVersion targetVersion) {
+ return this;
+ }
+
+ /**
+ * Return the sequence increment beyond this request's sequence.
+ *
+ * @return Sequence increment, guaranteed to be non-negative.
+ */
+ public long getIncrement() {
+ return increment;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
+
+final class IncrementTransactionSequenceRequestProxyV1
+ extends AbstractTransactionRequestProxy<IncrementTransactionSequenceRequest> {
+ private long increment;
+
+ // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+ // be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public IncrementTransactionSequenceRequestProxyV1() {
+ // For Externalizable
+ }
+
+ IncrementTransactionSequenceRequestProxyV1(final IncrementTransactionSequenceRequest request) {
+ super(request);
+ this.increment = request.getIncrement();
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ WritableObjects.writeLong(out, increment);
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws ClassNotFoundException, IOException {
+ super.readExternal(in);
+ increment = WritableObjects.readLong(in);
+ }
+
+ @Override
+ protected IncrementTransactionSequenceRequest createRequest(final TransactionIdentifier target, final long sequence,
+ final ActorRef replyToActor) {
+ return new IncrementTransactionSequenceRequest(target, sequence, replyToActor, increment);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to an {@link IncrementTransactionSequenceRequest}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class IncrementTransactionSequenceSuccess extends TransactionSuccess<IncrementTransactionSequenceSuccess> {
+ private static final long serialVersionUID = 1L;
+
+ public IncrementTransactionSequenceSuccess(final TransactionIdentifier target, final long sequence) {
+ super(target, sequence);
+ }
+
+ @Override
+ protected IncrementTransactionSequenceSuccessProxyV1 externalizableProxy(final ABIVersion version) {
+ return new IncrementTransactionSequenceSuccessProxyV1(this);
+ }
+
+ @Override
+ protected IncrementTransactionSequenceSuccess cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link IncrementTransactionSequenceSuccess}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class IncrementTransactionSequenceSuccessProxyV1
+ extends AbstractTransactionSuccessProxy<IncrementTransactionSequenceSuccess> {
+ private static final long serialVersionUID = 1L;
+
+ // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+ // be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public IncrementTransactionSequenceSuccessProxyV1() {
+ // For Externalizable
+ }
+
+ IncrementTransactionSequenceSuccessProxyV1(final IncrementTransactionSequenceSuccess request) {
+ super(request);
+ }
+
+ @Override
+ protected IncrementTransactionSequenceSuccess createSuccess(final TransactionIdentifier target,
+ final long sequence) {
+ return new IncrementTransactionSequenceSuccess(target, sequence);
+ }
+}
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
*/
@NotThreadSafe
private static final class IncrementSequence {
- private long delta = 1;
+ private final long sequence;
+ private long delta = 0;
+
+ IncrementSequence(final long sequence) {
+ this.sequence = sequence;
+ }
long getDelta() {
return delta;
}
+ long getSequence() {
+ return sequence;
+ }
+
void incrementDelta() {
delta++;
}
return parent.localActor();
}
- private void incrementSequence(final long delta) {
+ final void incrementSequence(final long delta) {
sequence += delta;
LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
}
successfulRequests.add(Verify.verifyNotNull(req));
}
- final void recordFinishedRequest() {
+ final void recordFinishedRequest(final Response<?, ?> response) {
final Object last = successfulRequests.peekLast();
if (last instanceof IncrementSequence) {
((IncrementSequence) last).incrementDelta();
} else {
- successfulRequests.addLast(new IncrementSequence());
+ successfulRequests.addLast(new IncrementSequence(response.getSequence()));
}
}
successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
} else {
Verify.verify(obj instanceof IncrementSequence);
- successor.incrementSequence(((IncrementSequence) obj).getDelta());
+ final IncrementSequence increment = (IncrementSequence) obj;
+ successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
+ increment.getSequence(), localActor(), increment.getDelta()), resp -> { }, now);
+ LOG.debug("Incrementing sequence {} to successor {}", obj, successor);
}
}
LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
// No-op
} else if (request instanceof TransactionPurgeRequest) {
enqueuePurge(enqueuedTicks);
+ } else if (request instanceof IncrementTransactionSequenceRequest) {
+ // Local transactions do not have non-replayable requests which would be visible to the backend,
+ // hence we can skip sequence increments.
+ LOG.debug("Not replaying {}", request);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
failFuture(future, response);
}
- recordFinishedRequest();
+ recordFinishedRequest(response);
}
private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
failFuture(future, response);
}
- recordFinishedRequest();
+ recordFinishedRequest(response);
}
private ModifyTransactionRequest abortRequest() {
ensureFlushedBuider();
sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
callback.accept(resp);
});
} else if (request instanceof ExistsTransactionRequest) {
ensureFlushedBuider();
sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
callback.accept(resp);
});
} else if (request instanceof TransactionPreCommitRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
cb.accept(resp);
}, enqueuedTicks);
} else if (request instanceof ExistsTransactionRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
cb.accept(resp);
}, enqueuedTicks);
} else if (request instanceof TransactionPreCommitRequest) {
enqueueAbort(callback, enqueuedTicks);
} else if (request instanceof TransactionPurgeRequest) {
enqueuePurge(enqueuedTicks);
+ } else if (request instanceof IncrementTransactionSequenceRequest) {
+ final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
+ ensureFlushedBuider(optTicks);
+ enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
+ req.getIncrement()), callback, enqueuedTicks);
+ incrementSequence(req.getIncrement());
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}
import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
tx = createTransaction(request, id);
transactions.put(id, tx);
- } else {
+ } else if (!(request instanceof IncrementTransactionSequenceRequest)) {
final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
if (maybeReplay.isPresent()) {
final TransactionSuccess<?> replay = maybeReplay.get();
import java.util.Queue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceSuccess;
import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
@SuppressWarnings("checkstyle:IllegalCatch")
final @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request,
final RequestEnvelope envelope, final long now) throws RequestException {
+ if (request instanceof IncrementTransactionSequenceRequest) {
+ final IncrementTransactionSequenceRequest incr = (IncrementTransactionSequenceRequest) request;
+ expectedSequence += incr.getIncrement();
+
+ return recordSuccess(incr.getSequence(), new IncrementTransactionSequenceSuccess(incr.getTarget(),
+ incr.getSequence()));
+ }
+
if (previousFailure != null) {
LOG.debug("{}: Rejecting request {} due to previous failure", persistenceId(), request, previousFailure);
throw previousFailure;