We need correct accounting for DONE non-standalone local transactions,
as such transactions do not interact with open/closed semantics.
Propagate DONE via a simple flag, which we check in local ProxyHistory
a create a proxy without a backing modification.
Change-Id: Ie921db8c9e40f30934c119b74c31ca5418b61548
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
* variable. It uses pre-allocated objects for fast paths (i.e. no successor present) and a per-transition object
* for slow paths (when successor is injected/present).
*/
* variable. It uses pre-allocated objects for fast paths (i.e. no successor present) and a per-transition object
* for slow paths (when successor is injected/present).
*/
- private volatile int sealed = 0;
- private volatile State state = OPEN;
+ private volatile int sealed;
+ private volatile State state;
- AbstractProxyTransaction(final ProxyHistory parent) {
+ AbstractProxyTransaction(final ProxyHistory parent, final boolean isDone) {
this.parent = Preconditions.checkNotNull(parent);
this.parent = Preconditions.checkNotNull(parent);
+ if (isDone) {
+ state = DONE;
+ // DONE implies previous seal operation completed
+ sealed = 1;
+ } else {
+ state = OPEN;
+ }
}
final void executeInActor(final Runnable command) {
}
final void executeInActor(final Runnable command) {
final SuccessorState local = getSuccessorState();
final State prevState = local.getPrevState();
final SuccessorState local = getSuccessorState();
final State prevState = local.getPrevState();
+ final boolean isDone = DONE.equals(state)
+ || state instanceof SuccessorState && ((SuccessorState) state).isDone();
final AbstractProxyTransaction successor = successorHistory.createTransactionProxy(getIdentifier(),
final AbstractProxyTransaction successor = successorHistory.createTransactionProxy(getIdentifier(),
- isSnapshotOnly());
- LOG.debug("{} created successor transaction proxy {}", this, successor);
+ isSnapshotOnly(), isDone);
+ LOG.debug("{} created successor {}", this, successor);
local.setSuccessor(successor);
// Replay successful requests first
local.setSuccessor(successor);
// Replay successful requests first
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import java.util.function.Consumer;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import java.util.function.Consumer;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
private final TransactionIdentifier identifier;
private final TransactionIdentifier identifier;
- LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
- super(parent);
+ LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean isDone) {
+ super(parent, isDone);
this.identifier = Preconditions.checkNotNull(identifier);
}
this.identifier = Preconditions.checkNotNull(identifier);
}
- abstract DataTreeSnapshot readOnlyView();
+ abstract @Nonnull DataTreeSnapshot readOnlyView();
abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request,
@Nullable Consumer<Response<?, ?>> callback);
abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request,
@Nullable Consumer<Response<?, ?>> callback);
LocalReadOnlyProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
final DataTreeSnapshot snapshot) {
LocalReadOnlyProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
final DataTreeSnapshot snapshot) {
- super(parent, identifier);
+ super(parent, identifier, false);
this.snapshot = Preconditions.checkNotNull(snapshot);
}
this.snapshot = Preconditions.checkNotNull(snapshot);
}
+ LocalReadOnlyProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
+ super(parent, identifier, true);
+ // It is an error to touch snapshot once we are DONE
+ this.snapshot = null;
+ }
+
@Override
boolean isSnapshotOnly() {
return true;
@Override
boolean isSnapshotOnly() {
return true;
@Override
DataTreeSnapshot readOnlyView() {
@Override
DataTreeSnapshot readOnlyView() {
+ return Preconditions.checkNotNull(snapshot, "Transaction %s is DONE", getIdentifier());
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
final DataTreeSnapshot snapshot) {
LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
final DataTreeSnapshot snapshot) {
- super(parent, identifier);
+ super(parent, identifier, false);
this.modification = (CursorAwareDataTreeModification) snapshot.newModification();
}
this.modification = (CursorAwareDataTreeModification) snapshot.newModification();
}
+ LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
+ super(parent, identifier, true);
+ // This is DONE transaction, this should never be touched
+ this.modification = null;
+ }
+
@Override
boolean isSnapshotOnly() {
return false;
@Override
boolean isSnapshotOnly() {
return false;
closedException = this::abortedException;
}
closedException = this::abortedException;
}
- private CursorAwareDataTreeModification getModification() {
+ private @Nonnull CursorAwareDataTreeModification getModification() {
if (closedException != null) {
throw closedException.get();
}
if (closedException != null) {
throw closedException.get();
}
+ return Preconditions.checkNotNull(modification, "Transaction %s is DONE", getIdentifier());
}
private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
}
private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
+ if (isDone) {
+ // Done transactions do not register on our radar on should not have any state associated.
+ return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
+ : new LocalReadWriteProxyTransaction(this, txId);
+ }
+
// onTransactionCompleted() runs concurrently
final LocalReadWriteProxyTransaction localSealed = lastSealed;
final DataTreeSnapshot baseSnapshot;
// onTransactionCompleted() runs concurrently
final LocalReadWriteProxyTransaction localSealed = lastSealed;
final DataTreeSnapshot baseSnapshot;
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
final DataTreeSnapshot snapshot = takeSnapshot();
return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
new LocalReadWriteProxyTransaction(this, txId, snapshot);
final DataTreeSnapshot snapshot = takeSnapshot();
return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
new LocalReadWriteProxyTransaction(this, txId, snapshot);
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
- return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
- return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
- AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+ final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
final boolean snapshotOnly) {
final boolean snapshotOnly) {
+ return createTransactionProxy(txId, snapshotOnly, false);
+ }
+
+ AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
+ final boolean isDone) {
lock.lock();
try {
if (successor != null) {
lock.lock();
try {
if (successor != null) {
- return successor.createTransactionProxy(txId, snapshotOnly);
+ return successor.createTransactionProxy(txId, snapshotOnly, isDone);
}
final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
}
final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
- final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
+ final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
proxies.put(proxyId, ret);
LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
return ret;
proxies.put(proxyId, ret);
LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
return ret;
@GuardedBy("lock")
abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
@GuardedBy("lock")
abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
- TransactionIdentifier txId, boolean snapshotOnly);
+ TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
private volatile Exception operationFailure;
RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
private volatile Exception operationFailure;
RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
- final boolean snapshotOnly, final boolean sendReadyOnSeal) {
- super(parent);
+ final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) {
+ super(parent, isDone);
this.snapshotOnly = snapshotOnly;
this.sendReadyOnSeal = sendReadyOnSeal;
builder = new ModifyTransactionRequestBuilder(identifier, localActor());
this.snapshotOnly = snapshotOnly;
this.sendReadyOnSeal = sendReadyOnSeal;
builder = new ModifyTransactionRequestBuilder(identifier, localActor());
transaction.startReconnect();
final ProxyHistory mockSuccessor = mock(ProxyHistory.class);
transaction.startReconnect();
final ProxyHistory mockSuccessor = mock(ProxyHistory.class);
- when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly()))
+ when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly(), false))
.thenReturn(successor.getTransaction());
transaction.replayMessages(mockSuccessor, entries);
.thenReturn(successor.getTransaction());
transaction.replayMessages(mockSuccessor, entries);
AccessClientUtil.createConnectedConnection(context, 0L, backend);
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
final RemoteProxyTransaction transaction =
AccessClientUtil.createConnectedConnection(context, 0L, backend);
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
final RemoteProxyTransaction transaction =
- new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+ new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
return new TransactionTester<>(transaction, connection, backendProbe);
}
}
return new TransactionTester<>(transaction, connection, backendProbe);
}
}
AccessClientUtil.createConnectedConnection(context, 0L, backend);
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
final RemoteProxyTransaction transaction =
AccessClientUtil.createConnectedConnection(context, 0L, backend);
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
final RemoteProxyTransaction transaction =
- new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+ new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
return new TransactionTester(transaction, connection, backendProbe);
}
return new TransactionTester(transaction, connection, backendProbe);
}
AccessClientUtil.createConnectedConnection(context, 0L, backend);
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
final RemoteProxyTransaction transaction =
AccessClientUtil.createConnectedConnection(context, 0L, backend);
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
final RemoteProxyTransaction transaction =
- new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+ new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
return new TransactionTester<>(transaction, connection, backendProbe);
}
return new TransactionTester<>(transaction, connection, backendProbe);
}
@Override
protected RemoteProxyTransaction createTransaction(final ProxyHistory parent, final TransactionIdentifier id,
final DataTreeSnapshot snapshot) {
@Override
protected RemoteProxyTransaction createTransaction(final ProxyHistory parent, final TransactionIdentifier id,
final DataTreeSnapshot snapshot) {
- return new RemoteProxyTransaction(parent, TRANSACTION_ID, false, false);
+ return new RemoteProxyTransaction(parent, TRANSACTION_ID, false, false, false);