import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
+ if (isDone) {
+ // Done transactions do not register on our radar on should not have any state associated.
+ return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
+ : new LocalReadWriteProxyTransaction(this, txId);
+ }
+
// onTransactionCompleted() runs concurrently
final LocalReadWriteProxyTransaction localSealed = lastSealed;
final DataTreeSnapshot baseSnapshot;
@Override
void onTransactionCompleted(final AbstractProxyTransaction tx) {
Verify.verify(tx instanceof LocalProxyTransaction);
- if (tx instanceof LocalReadWriteProxyTransaction) {
- if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
- LOG.debug("Completed last sealed transaction {}", tx);
- }
+ if (tx instanceof LocalReadWriteProxyTransaction
+ && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
+ LOG.debug("Completed last sealed transaction {}", tx);
}
}
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
final DataTreeSnapshot snapshot = takeSnapshot();
return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
new LocalReadWriteProxyTransaction(this, txId, snapshot);
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
- return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
}
@Override
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
- return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
}
@Override
}
@Override
- void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
- final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
- // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
- // period required to get into the queue.
+ void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
+ throws RequestException {
+ final Request<?, ?> request = entry.getRequest();
if (request instanceof TransactionRequest) {
- forwardTransactionRequest((TransactionRequest<?>) request, callback);
+ lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
+ entry.getEnqueuedTicks());
} else if (request instanceof LocalHistoryRequest) {
- forwardTo.accept(request, callback);
+ replayTo.accept(entry);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
}
- private void forwardTransactionRequest(final TransactionRequest<?> request,
- final Consumer<Response<?, ?>> callback) throws RequestException {
+ @Override
+ void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
+ throws RequestException {
+ final Request<?, ?> request = entry.getRequest();
+ if (request instanceof TransactionRequest) {
+ lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
+ } else if (request instanceof LocalHistoryRequest) {
+ forwardTo.accept(entry);
+ } else {
+ throw new IllegalArgumentException("Unhandled request " + request);
+ }
+ }
+ private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
+ throws RequestReplayException {
final AbstractProxyTransaction proxy;
lock.lock();
try {
} finally {
lock.unlock();
}
- if (proxy == null) {
- throw new RequestReplayException("Failed to find proxy for %s", request);
+ if (proxy != null) {
+ return proxy;
}
- proxy.forwardRequest(request, callback);
+ throw new RequestReplayException("Failed to find proxy for %s", request);
}
}
return parent;
}
- AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+ final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
final boolean snapshotOnly) {
+ return createTransactionProxy(txId, snapshotOnly, false);
+ }
+
+ AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
+ final boolean isDone) {
lock.lock();
try {
if (successor != null) {
- return successor.createTransactionProxy(txId, snapshotOnly);
+ return successor.createTransactionProxy(txId, snapshotOnly, isDone);
}
final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
- final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
+ final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
proxies.put(proxyId, ret);
LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
return ret;
final void completeTransaction(final AbstractProxyTransaction tx) {
lock.lock();
try {
- proxies.remove(tx.getIdentifier());
+ // Removal will be completed once purge completes
LOG.debug("Proxy {} completing transaction {}", this, tx);
onTransactionCompleted(tx);
} finally {
}
}
+ void purgeTransaction(final AbstractProxyTransaction tx) {
+ lock.lock();
+ try {
+ proxies.remove(tx.getIdentifier());
+ LOG.debug("Proxy {} purged transaction {}", this, tx);
+ } finally {
+ lock.unlock();
+ }
+ }
+
final void close() {
lock.lock();
try {
}
@GuardedBy("lock")
+ @SuppressWarnings("checkstyle:hiddenField")
abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
- TransactionIdentifier txId, boolean snapshotOnly);
+ TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
+ @SuppressWarnings("checkstyle:hiddenField")
abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
@SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")