@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
+ if (isDone) {
+ // Done transactions do not register on our radar on should not have any state associated.
+ return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
+ : new LocalReadWriteProxyTransaction(this, txId);
+ }
+
// onTransactionCompleted() runs concurrently
final LocalReadWriteProxyTransaction localSealed = lastSealed;
final DataTreeSnapshot baseSnapshot;
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
final DataTreeSnapshot snapshot = takeSnapshot();
return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
new LocalReadWriteProxyTransaction(this, txId, snapshot);
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
- return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
}
@Override
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
- return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
}
@Override
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
if (req instanceof CreateLocalHistoryRequest) {
- successor.connection.sendRequest(req, e.getCallback());
+ successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
it.remove();
break;
}
}
for (AbstractProxyTransaction t : proxies.values()) {
- LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
- final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
- t.isSnapshotOnly());
- LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
- t.replayMessages(newProxy, previousEntries);
+ LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
+ t.replayMessages(successor, previousEntries);
}
// Now look for any finalizing messages
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
if (req instanceof DestroyLocalHistoryRequest) {
- successor.connection.sendRequest(req, e.getCallback());
+ successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
it.remove();
break;
}
final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
final boolean snapshotOnly) {
+ return createTransactionProxy(txId, snapshotOnly, false);
+ }
+
+ AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
+ final boolean isDone) {
lock.lock();
try {
if (successor != null) {
- return successor.createTransactionProxy(txId, snapshotOnly);
+ return successor.createTransactionProxy(txId, snapshotOnly, isDone);
}
final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
- final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
+ final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
proxies.put(proxyId, ret);
LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
return ret;
final void completeTransaction(final AbstractProxyTransaction tx) {
lock.lock();
try {
- proxies.remove(tx.getIdentifier());
+ // Removal will be completed once purge completes
LOG.debug("Proxy {} completing transaction {}", this, tx);
onTransactionCompleted(tx);
} finally {
}
}
+ void purgeTransaction(final AbstractProxyTransaction tx) {
+ lock.lock();
+ try {
+ proxies.remove(tx.getIdentifier());
+ LOG.debug("Proxy {} purged transaction {}", this, tx);
+ } finally {
+ lock.unlock();
+ }
+ }
+
final void close() {
lock.lock();
try {
@GuardedBy("lock")
abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
- TransactionIdentifier txId, boolean snapshotOnly);
+ TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);