}
/**
- * Seal this transaction before it is either committed or aborted.
+ * Seal this transaction before it is either committed or aborted. This method should only be invoked from
+ * application thread.
*/
final void seal() {
// Transition user-visible state first
- final boolean success = SEALED_UPDATER.compareAndSet(this, 0, 1);
+ final boolean success = markSealed();
Preconditions.checkState(success, "Proxy %s was already sealed", getIdentifier());
- internalSeal();
+
+ if (!sealAndSend(Optional.absent())) {
+ sealSuccessor();
+ }
}
- final void ensureSealed() {
- if (SEALED_UPDATER.compareAndSet(this, 0, 1)) {
- internalSeal();
+ /**
+ * Internal seal propagation method, invoked when we have raced with reconnection thread. Note that there may have
+ * been multiple reconnects, so we have to make sure the action is propagate through all intermediate instances.
+ */
+ private void sealSuccessor() {
+ // Slow path: wait for the successor to complete
+ final AbstractProxyTransaction successor = awaitSuccessor();
+
+ // At this point the successor has completed transition and is possibly visible by the user thread, which is
+ // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
+ // Propagate state and seal the successor.
+ flushState(successor);
+ successor.predecessorSealed();
+ }
+
+ private void predecessorSealed() {
+ if (markSealed() && !sealAndSend(Optional.absent())) {
+ sealSuccessor();
}
}
- private void internalSeal() {
- doSeal();
+ void sealOnly() {
parent.onTransactionSealed(this);
+ final boolean success = STATE_UPDATER.compareAndSet(this, OPEN, SEALED);
+ Verify.verify(success, "Attempted to replay seal on {}", this);
+ }
- // Now deal with state transfer, which can occur via successor or a follow-up canCommit() or directCommit().
- if (!STATE_UPDATER.compareAndSet(this, OPEN, SEALED)) {
- // Slow path: wait for the successor to complete
- final AbstractProxyTransaction successor = awaitSuccessor();
+ /**
+ * Seal this transaction and potentially send it out towards the backend. If this method reports false, the caller
+ * needs to deal with propagating the seal operation towards the successor.
+ *
+ * @param enqueuedTicks Enqueue ticks when this is invoked from replay path.
+ * @return True if seal operation was successful, false if this proxy has a successor.
+ */
+ boolean sealAndSend(final Optional<Long> enqueuedTicks) {
+ parent.onTransactionSealed(this);
- // At this point the successor has completed transition and is possibly visible by the user thread, which is
- // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
- // Propagate state and seal the successor.
- flushState(successor);
- successor.ensureSealed();
- }
+ // Transition internal state to sealed and detect presence of a successor
+ return STATE_UPDATER.compareAndSet(this, OPEN, SEALED);
+ }
+
+ /**
+ * Mark this proxy as having been sealed.
+ *
+ * @return True if this call has transitioned to sealed state.
+ */
+ final boolean markSealed() {
+ return SEALED_UPDATER.compareAndSet(this, 0, 1);
}
private void checkNotSealed() {
if (SEALED.equals(prevState)) {
LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
flushState(successor);
- successor.ensureSealed();
+ if (successor.markSealed()) {
+ successor.sealAndSend(Optional.of(parent.currentTime()));
+ }
}
}
abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
- abstract void doSeal();
-
@GuardedBy("this")
abstract void flushState(AbstractProxyTransaction successor);
}
});
- successor.ensureSealed();
-
+ successor.sealOnly();
final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
successor.sendRequest(successorReq, callback);
} else if (request instanceof AbortLocalTransactionRequest) {
throw new UnsupportedOperationException("Read-only snapshot");
}
- @Override
- void doSeal() {
- // No-op
- }
-
@Override
void flushState(final AbstractProxyTransaction successor) {
// No-op
return ret;
}
- @Override
- void doSeal() {
- Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", getIdentifier());
+ private void sealModification() {
+ Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", this);
final CursorAwareDataTreeModification mod = getModification();
mod.ready();
sealedModification = mod;
}
+ @Override
+ void sealOnly() {
+ sealModification();
+ super.sealOnly();
+ }
+
+ @Override
+ boolean sealAndSend(final com.google.common.base.Optional<Long> enqueuedTicks) {
+ sealModification();
+ return super.sealAndSend(enqueuedTicks);
+ }
+
@Override
void flushState(final AbstractProxyTransaction successor) {
sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
final Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
if (maybeProtocol.isPresent()) {
Verify.verify(callback != null, "Request {} has null callback", request);
- ensureSealed();
+ if (markSealed()) {
+ sealOnly();
+ }
switch (maybeProtocol.get()) {
case ABORT:
sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
break;
case READY:
- // No-op, as we have already issued a seal()
+ // No-op, as we have already issued a sealOnly() and we are not transmitting anything
break;
case SIMPLE:
sendMethod.accept(commitRequest(false), callback);
void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
final Consumer<Response<?, ?>> callback, final long now) {
if (request instanceof CommitLocalTransactionRequest) {
- sendCommit((CommitLocalTransactionRequest) request, callback);
+ enqueueRequest(rebaseCommit((CommitLocalTransactionRequest)request), callback, now);
} else {
super.handleReplayedLocalRequest(request, callback, now);
}
final Consumer<Response<?, ?>> callback) {
if (request instanceof CommitLocalTransactionRequest) {
Verify.verify(successor instanceof LocalReadWriteProxyTransaction);
- ((LocalReadWriteProxyTransaction) successor).sendCommit((CommitLocalTransactionRequest)request, callback);
+ ((LocalReadWriteProxyTransaction) successor).sendRebased((CommitLocalTransactionRequest)request, callback);
LOG.debug("Forwarded request {} to successor {}", request, successor);
} else {
super.forwardToLocal(successor, request, callback);
return Preconditions.checkNotNull(modification, "Transaction %s is DONE", getIdentifier());
}
- private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+ private void sendRebased(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+ sendRequest(rebaseCommit(request), callback);
+ }
+
+ private CommitLocalTransactionRequest rebaseCommit(final CommitLocalTransactionRequest request) {
// Rebase old modification on new data tree.
final CursorAwareDataTreeModification mod = getModification();
request.getModification().applyToCursor(cursor);
}
- ensureSealed();
- sendRequest(commitRequest(request.isCoordinated()), callback);
+ if (markSealed()) {
+ sealOnly();
+ }
+
+ return commitRequest(request.isCoordinated());
}
}
}
private void ensureFlushedBuider() {
- if (builderBusy) {
- flushBuilder();
- }
+ ensureFlushedBuider(Optional.absent());
}
private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
}
}
- private void flushBuilder() {
- flushBuilder(Optional.absent());
- }
-
private void flushBuilder(final Optional<Long> enqueuedTicks) {
final ModifyTransactionRequest request = builder.build();
builderBusy = false;
ModifyTransactionRequest abortRequest() {
ensureInitializedBuilder();
builder.setAbort();
- final ModifyTransactionRequest ret = builder.build();
builderBusy = false;
- return ret;
+ return builder.build();
}
@Override
ModifyTransactionRequest commitRequest(final boolean coordinated) {
ensureInitializedBuilder();
builder.setCommit(coordinated);
+ builderBusy = false;
+ return builder.build();
+ }
- final ModifyTransactionRequest ret = builder.build();
+ private ModifyTransactionRequest readyRequest() {
+ ensureInitializedBuilder();
+ builder.setReady();
builderBusy = false;
- return ret;
+ return builder.build();
}
@Override
- void doSeal() {
+ boolean sealAndSend(final Optional<Long> enqueuedTicks) {
if (sendReadyOnSeal) {
ensureInitializedBuilder();
builder.setReady();
- flushBuilder();
+ flushBuilder(enqueuedTicks);
}
+ return super.sealAndSend(enqueuedTicks);
}
@Override
final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
if (maybeProto.isPresent()) {
- ensureSealed();
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
final TransactionRequest<?> tmp;
switch (maybeProto.get()) {
});
break;
case READY:
- //no op
+ tmp = readyRequest();
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
break;
default:
throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
if (maybeProto.isPresent()) {
- ensureSealed();
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
final TransactionRequest<?> tmp;
switch (maybeProto.get()) {
}, enqueuedTicks);
break;
case READY:
- //no op
+ tmp = readyRequest();
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
break;
default:
throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
}
@Test
- public void testDoSeal() throws Exception {
+ public void testSealOnly() throws Exception {
assertOperationThrowsException(() -> transaction.getSnapshot(), IllegalStateException.class);
- transaction.doSeal();
+ transaction.sealOnly();
Assert.assertEquals(modification, transaction.getSnapshot());
}
final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
final RemoteProxyTransaction successor = transactionTester.getTransaction();
doAnswer(this::applyToCursorAnswer).when(modification).applyToCursor(any());
- transaction.doSeal();
+ transaction.sealOnly();
transaction.flushState(successor);
verify(modification).applyToCursor(any());
transactionTester.getTransaction().seal();