AbstractReadTransactionRequestProxyV1(final T request) {
super(request);
+ path = request.getPath();
}
@Override
private final DataTreeModification mod;
private final boolean coordinated;
- public CommitLocalTransactionRequest(@Nonnull final TransactionIdentifier identifier,
+ public CommitLocalTransactionRequest(@Nonnull final TransactionIdentifier identifier, final long sequence,
@Nonnull final ActorRef replyTo, @Nonnull final DataTreeModification mod, final boolean coordinated) {
- super(identifier, 0, replyTo);
+ super(identifier, sequence, replyTo);
this.mod = Preconditions.checkNotNull(mod);
this.coordinated = coordinated;
}
} else {
out.writeBoolean(false);
}
-
- out.writeObject(data);
}
@Override
String histStr = historyId.getHistoryId() == 0 ? "" : "-chn-" + historyId.getHistoryId();
shortString = historyId.getClientId().getFrontendId().getMemberName().getName() + "-"
+ historyId.getClientId().getFrontendId().getClientType().getName() + "-fe-"
- + historyId.getClientId().getGeneration() + histStr + "-txn-" + transactionId;
+ + historyId.getClientId().getGeneration() + histStr + "-txn-" + transactionId
+ + "-" + historyId.getCookie();
}
return shortString;
return;
}
+ LOG.debug("{}: resolved shard {} to {}", persistenceId(), shard, backend);
final long stamp = connectionsLock.writeLock();
try {
// Bring the connection up
super(context, cookie, backend);
}
- private TransmittedConnectionEntry transmit(final ConnectionEntry entry) {
+ private void transmit(final ConnectionEntry entry) {
final long txSequence = nextTxSequence++;
final RequestEnvelope toSend = new RequestEnvelope(entry.getRequest().toVersion(remoteVersion()), sessionId(),
txSequence);
+ // We need to enqueue the request before we send it to the actor, as we may be executing on a different thread
+ // than the client actor thread, in which case the round-trip could be made faster than we can enqueue --
+ // in which case the receive routine would not find the entry.
+ final TransmittedConnectionEntry txEntry = new TransmittedConnectionEntry(entry, sessionId(), txSequence,
+ readTime());
+ appendToInflight(txEntry);
+
final ActorRef actor = remoteActor();
LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), toSend, actor);
actor.tell(toSend, ActorRef.noSender());
-
- return new TransmittedConnectionEntry(entry, sessionId(), txSequence, readTime());
}
@Override
void enqueueEntry(final ConnectionEntry entry) {
if (inflightSize() < remoteMaxMessages()) {
- appendToInflight(transmit(entry));
+ transmit(entry);
LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this);
} else {
LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest());
}
LOG.debug("Transmitting entry {}", e);
- appendToInflight(transmit(e));
+ transmit(e);
toSend--;
}
}
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
*/
private ProxyHistory createHistoryProxy(final Long shard) {
final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
- final ProxyHistory ret = createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
- identifier.getHistoryId(), shard), connection);
+ final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
+ identifier.getHistoryId(), shard);
+ LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
- // Request creation of the history.
- connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
- this::createHistoryCallback);
+ final ProxyHistory ret = createHistoryProxy(proxyId, connection);
+
+ // Request creation of the history, if it is not the single history
+ if (ret.getIdentifier().getHistoryId() != 0) {
+ connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
+ this::createHistoryCallback);
+ }
return ret;
}
}
}
+ /**
+ * Allocate a {@link ClientTransaction}.
+ *
+ * @return A new {@link ClientTransaction}
+ * @throws TransactionChainClosedException if this history is closed
+ */
public final ClientTransaction createTransaction() {
- Preconditions.checkState(state != State.CLOSED);
+ if (state == State.CLOSED) {
+ throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
+ }
synchronized (this) {
final ClientTransaction ret = doCreateTransaction();
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.function.Consumer;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
* @author Robert Varga
*/
abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
+ private static final class IncrementSequence {
+ private long delta = 1;
+
+ long getDelta() {
+ return delta;
+ }
+
+ void incrementDelta() {
+ delta++;
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
+ private final Deque<Object> successfulRequests = new ArrayDeque<>();
private final ProxyHistory parent;
private AbstractProxyTransaction successor;
return parent.localActor();
}
+ private void incrementSequence(final long delta) {
+ sequence += delta;
+ LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
+ }
+
final long nextSequence() {
- return sequence++;
+ final long ret = sequence++;
+ LOG.debug("Transaction {} allocated sequence {}", this, ret);
+ return ret;
}
final void delete(final YangInstanceIdentifier path) {
Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
}
+ final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
+ successfulRequests.add(Verify.verifyNotNull(req));
+ }
+
+ final void recordFinishedRequest() {
+ final Object last = successfulRequests.peekLast();
+ if (last instanceof IncrementSequence) {
+ ((IncrementSequence) last).incrementDelta();
+ } else {
+ successfulRequests.addLast(new IncrementSequence());
+ }
+ }
+
/**
* Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
* being sent to the backend.
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+ // This is a terminal request, hence we do not need to record it
+ LOG.debug("Transaction {} abort completed", this);
parent.completeTransaction(this);
});
}
ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
}
+ // This is a terminal request, hence we do not need to record it
+ LOG.debug("Transaction {} directCommit completed", this);
parent.completeTransaction(this);
});
return ret;
void canCommit(final VotingFuture<?> ret) {
checkSealed();
- sendRequest(Verify.verifyNotNull(commitRequest(true)), t -> {
+ final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
+ sendRequest(req, t -> {
if (t instanceof TransactionCanCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
} else {
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+
+ recordSuccessfulRequest(req);
+ LOG.debug("Transaction {} canCommit completed", this);
});
}
void preCommit(final VotingFuture<?> ret) {
checkSealed();
- sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
+ final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
+ localActor());
+ sendRequest(req, t -> {
if (t instanceof TransactionPreCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
} else {
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+
+ recordSuccessfulRequest(req);
+ LOG.debug("Transaction {} preCommit completed", this);
});
}
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+ LOG.debug("Transaction {} doCommit completed", this);
parent.completeTransaction(this);
});
}
- void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
+ final void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
this.successor = Preconditions.checkNotNull(successor);
+
+ for (Object obj : successfulRequests) {
+ if (obj instanceof TransactionRequest) {
+ LOG.debug("Forwarding request {} to successor {}", obj, successor);
+ successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
+ } else {
+ Verify.verify(obj instanceof IncrementSequence);
+ successor.incrementSequence(((IncrementSequence) obj).getDelta());
+ }
+ }
+ LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
+ successfulRequests.clear();
}
/**
return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx()));
}
+ @Override
+ void onTransactionAbort(final TransactionIdentifier txId) {
+ final State local = state();
+ if (local == State.TX_OPEN) {
+ updateState(local, State.IDLE);
+ }
+
+ super.onTransactionAbort(txId);
+ }
+
@Override
AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
final AbstractTransactionCommitCohort cohort) {
* Release all state associated with this transaction.
*/
public void abort() {
- if (ensureClosed()) {
- for (AbstractProxyTransaction proxy : proxies.values()) {
- proxy.abort();
- }
- proxies.clear();
-
+ if (commonAbort()) {
parent.onTransactionAbort(transactionId);
}
}
+ private boolean commonAbort() {
+ if (!ensureClosed()) {
+ return false;
+ }
+
+ for (AbstractProxyTransaction proxy : proxies.values()) {
+ proxy.abort();
+ }
+ proxies.clear();
+ return true;
+ }
+
@Override
void localAbort(final Throwable cause) {
- LOG.debug("Aborting transaction {}", getIdentifier(), cause);
- abort();
+ LOG.debug("Local abort of transaction {}", getIdentifier(), cause);
+ commonAbort();
}
Map<Long, AbstractProxyTransaction> getProxies() {
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
@Override
CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
- final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(),
- modification, coordinated);
+ final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, nextSequence(),
+ localActor(), modification, coordinated);
modification = new FailedDataTreeModification(this::submittedException);
return ret;
}
@Override
void handleForwardedRemoteRequest(final TransactionRequest<?> request,
final @Nullable Consumer<Response<?, ?>> callback) {
- LOG.debug("Applying forwaded request {}", request);
+ LOG.debug("Applying forwarded request {}", request);
if (request instanceof ModifyTransactionRequest) {
applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+ } else if (request instanceof ReadTransactionRequest) {
+ final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
+ final Optional<NormalizedNode<?, ?>> result = modification.readNode(path);
+ callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ } else if (request instanceof ExistsTransactionRequest) {
+ final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
+ final boolean result = modification.readNode(path).isPresent();
+ callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ } else if (request instanceof TransactionPreCommitRequest) {
+ sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionDoCommitRequest) {
+ sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionAbortRequest) {
+ sendAbort(callback);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
final TransactionIdentifier txId) {
- Preconditions.checkState(lastOpen == null, "Proxy {} is currently open", lastOpen);
+ Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
// onTransactionCompleted() runs concurrently
final LocalProxyTransaction localSealed = lastSealed;
lastOpen = new LocalProxyTransaction(this, txId,
(CursorAwareDataTreeModification) baseSnapshot.newModification());
+ LOG.debug("Proxy {} open transaction {}", this, lastOpen);
return lastOpen;
}
@Override
void replaySuccessfulRequests() {
for (AbstractProxyTransaction t : proxies.values()) {
+ LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
- LOG.debug("{} created successor transaction proxy {} for {}", identifier, newProxy, t);
+ LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
t.replaySuccessfulRequests(newProxy);
}
}
}
final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
- final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
-
lock.lock();
try {
+ if (successor != null) {
+ return successor.createTransactionProxy(txId);
+ }
+
+ final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
proxies.put(proxyId, ret);
LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
lock.lock();
try {
proxies.remove(tx.getIdentifier());
+ LOG.debug("Proxy {} aborting transaction {}", this, tx);
+ onTransactionAborted(tx);
} finally {
lock.unlock();
}
lock.lock();
try {
proxies.remove(tx.getIdentifier());
+ LOG.debug("Proxy {} completing transaction {}", this, tx);
+ onTransactionCompleted(tx);
} finally {
lock.unlock();
}
}
successor = createSuccessor(newConnection);
+ LOG.debug("History {} instantiated successor {}", this, successor);
return new ReconnectCohort();
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
// FIXME: make this tuneable
private static final int REQUEST_MAX_MODIFICATIONS = 1000;
- private final Collection<TransactionRequest<?>> successfulRequests = new ArrayList<>();
private final ModifyTransactionRequestBuilder builder;
private boolean builderBusy;
if (response instanceof TransactionSuccess) {
// Happy path
- successfulRequests.add(request);
+ recordSuccessfulRequest(request);
} else {
recordFailedResponse(response);
}
} else {
failFuture(future, response);
}
+
+ recordFinishedRequest();
}
private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
} else {
failFuture(future, response);
}
+
+ recordFinishedRequest();
}
@Override
// No-op
}
- @Override
- void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
- super.replaySuccessfulRequests(successor);
-
- for (TransactionRequest<?> req : successfulRequests) {
- LOG.debug("Forwarding request {} to successor {}", req, successor);
- successor.handleForwardedRemoteRequest(req, null);
- }
- successfulRequests.clear();
- }
-
@Override
void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
final Consumer<Response<?, ?>> callback) throws RequestException {
throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
}
}
+ } else if (request instanceof ReadTransactionRequest) {
+ ensureFlushedBuider();
+ sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+ ((ReadTransactionRequest) request).getPath()), callback);
+ } else if (request instanceof ExistsTransactionRequest) {
+ ensureFlushedBuider();
+ sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+ ((ExistsTransactionRequest) request).getPath()), callback);
+ } else if (request instanceof TransactionPreCommitRequest) {
+ ensureFlushedBuider();
+ sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionDoCommitRequest) {
+ ensureFlushedBuider();
+ sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionAbortRequest) {
+ ensureFlushedBuider();
+ sendAbort(callback);
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}
transactions.put(id, tx);
} else {
- final Optional<TransactionSuccess<?>> replay = tx.replaySequence(request.getSequence());
- if (replay.isPresent()) {
- return replay.get();
+ final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
+ if (maybeReplay.isPresent()) {
+ final TransactionSuccess<?> replay = maybeReplay.get();
+ LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
+ return replay;
}
}
Preconditions.checkState(openTransaction == null, "Transaction %s is open", openTransaction);
if (previousTx == null) {
+ LOG.debug("Opening an unchained snapshot in {}", chainId);
return dataTree.takeSnapshot();
- } else {
- return previousTx.getSnapshot();
}
+
+ LOG.debug("Reusing a chained snapshot in {}", chainId);
+ return previousTx.getSnapshot();
}
ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
org.slf4j.simpleLogger.logFile=System.out
org.slf4j.simpleLogger.showShortLogName=true
org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.access=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker=debug
org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
-org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
\ No newline at end of file
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker.actors.dds=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off