private final List<TransactionModification> modifications = new ArrayList<>(1);
private final TransactionIdentifier identifier;
private final ActorRef replyTo;
+
private PersistenceProtocol protocol;
- private Long sequence;
+ private boolean haveSequence;
+ private long sequence;
public ModifyTransactionRequestBuilder(final TransactionIdentifier identifier, final ActorRef replyTo) {
this.identifier = Preconditions.checkNotNull(identifier);
}
public void setSequence(final long sequence) {
+ Preconditions.checkState(!haveSequence, "Sequence has already been set");
this.sequence = sequence;
+ haveSequence = true;
}
public void setAbort() {
protocol = coordinated ? PersistenceProtocol.THREE_PHASE : PersistenceProtocol.SIMPLE;
}
+ public void setReady() {
+ checkNotFinished();
+ protocol = PersistenceProtocol.READY;
+ }
+
public int size() {
return modifications.size();
}
@Override
public ModifyTransactionRequest build() {
- Preconditions.checkState(sequence != null, "Request sequence has not been set");
+ Preconditions.checkState(haveSequence, "Request sequence has not been set");
final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, sequence, replyTo, modifications,
protocol);
modifications.clear();
protocol = null;
- sequence = null;
+ haveSequence = false;
return ret;
}
}
byte byteValue() {
return 3;
}
-
+ },
+ /**
+ * Transaction is ready. This is not a really a persistence protocol, but an indication that that frontend has
+ * completed modifications on the transaction and considers it ready, without deciding the actual commit protocol.
+ */
+ READY {
+ @Override
+ byte byteValue() {
+ return 4;
+ }
};
@Override
return SIMPLE;
case 3:
return THREE_PHASE;
+ case 4:
+ return READY;
default:
throw new IllegalArgumentException("Unhandled byte value " + value);
}
// Transition user-visible state first
final boolean success = SEALED_UPDATER.compareAndSet(this, 0, 1);
Preconditions.checkState(success, "Proxy %s was already sealed", getIdentifier());
+ internalSeal();
+ }
+
+ final void ensureSealed() {
+ if (SEALED_UPDATER.compareAndSet(this, 0, 1)) {
+ internalSeal();
+ }
+ }
+
+ private void internalSeal() {
doSeal();
parent.onTransactionSealed(this);
// still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
// Propagate state and seal the successor.
flushState(successor);
- successor.seal();
+ successor.ensureSealed();
}
}
if (SEALED.equals(prevState)) {
LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
flushState(successor);
- successor.seal();
+ successor.ensureSealed();
}
}
}
});
- successor.seal();
+ successor.ensureSealed();
final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
successor.sendRequest(successorReq, callback);
}
});
- successor.seal();
+ successor.ensureSealed();
final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
successor.sendRequest(successorReq, callback);
final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
if (maybeProtocol.isPresent()) {
- seal();
Verify.verify(callback != null, "Request {} has null callback", request);
+ ensureSealed();
switch (maybeProtocol.get()) {
case ABORT:
sendAbort(callback);
break;
+ case READY:
+ // No-op, as we have already issued a seal()
+ break;
case SIMPLE:
sendRequest(commitRequest(false), callback);
break;
}
});
- successor.seal();
+ successor.ensureSealed();
final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
successor.sendRequest(successorReq, callback);
request.getModification().applyToCursor(cursor);
}
- seal();
+ ensureSealed();
sendRequest(commitRequest(request.isCoordinated()), callback);
}
}
final LocalHistoryIdentifier identifier) {
super(connection, identifier);
}
-
- @Override
- final AbstractProxyTransaction doCreateTransactionProxy(
- final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId,
- final boolean snapshotOnly) {
- return new RemoteProxyTransaction(this, txId, snapshotOnly);
- }
}
private static final class Local extends AbstractLocal {
super(connection, identifier);
}
+ @Override
+ AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+ final TransactionIdentifier txId, final boolean snapshotOnly) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
+ }
+
@Override
ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
return createClient(connection, getIdentifier());
super(connection, identifier);
}
+ @Override
+ AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+ final TransactionIdentifier txId, final boolean snapshotOnly) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
+ }
+
@Override
ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
return createSingle(connection, getIdentifier());
private static final int REQUEST_MAX_MODIFICATIONS = 1000;
private final ModifyTransactionRequestBuilder builder;
+ private final boolean sendReadyOnSeal;
private final boolean snapshotOnly;
private boolean builderBusy;
private volatile Exception operationFailure;
+
RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
- final boolean snapshotOnly) {
+ final boolean snapshotOnly, final boolean sendReadyOnSeal) {
super(parent);
this.snapshotOnly = snapshotOnly;
+ this.sendReadyOnSeal = sendReadyOnSeal;
builder = new ModifyTransactionRequestBuilder(identifier, localActor());
}
@Override
void doAbort() {
- ensureInitializedBuider();
+ ensureInitializedBuilder();
builder.setAbort();
flushBuilder();
}
- private void ensureInitializedBuider() {
+ private void ensureInitializedBuilder() {
if (!builderBusy) {
builder.setSequence(nextSequence());
builderBusy = true;
private void appendModification(final TransactionModification modification) {
if (operationFailure == null) {
- ensureInitializedBuider();
+ ensureInitializedBuilder();
builder.addModification(modification);
if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
@Override
ModifyTransactionRequest commitRequest(final boolean coordinated) {
- ensureInitializedBuider();
+ ensureInitializedBuilder();
builder.setCommit(coordinated);
final ModifyTransactionRequest ret = builder.build();
@Override
void doSeal() {
- // No-op
+ if (sendReadyOnSeal) {
+ ensureInitializedBuilder();
+ builder.setReady();
+ flushBuilder();
+ }
}
@Override
final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
if (maybeProto.isPresent()) {
- seal();
+ ensureSealed();
switch (maybeProto.get()) {
case ABORT:
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
+import java.util.Collection;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
readyCohort = null;
}
});
-
}
private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
- final DataTreeModification modification = openTransaction.getSnapshot();
- for (TransactionModification m : request.getModifications()) {
- if (m instanceof TransactionDelete) {
- modification.delete(m.getPath());
- } else if (m instanceof TransactionWrite) {
- modification.write(m.getPath(), ((TransactionWrite) m).getData());
- } else if (m instanceof TransactionMerge) {
- modification.merge(m.getPath(), ((TransactionMerge) m).getData());
- } else {
- LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+ final Collection<TransactionModification> mods = request.getModifications();
+ if (!mods.isEmpty()) {
+ final DataTreeModification modification = openTransaction.getSnapshot();
+ for (TransactionModification m : mods) {
+ if (m instanceof TransactionDelete) {
+ modification.delete(m.getPath());
+ } else if (m instanceof TransactionWrite) {
+ modification.write(m.getPath(), ((TransactionWrite) m).getData());
+ } else if (m instanceof TransactionMerge) {
+ modification.merge(m.getPath(), ((TransactionMerge) m).getData());
+ } else {
+ LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+ }
}
}
openTransaction.abort();
openTransaction = null;
return replyModifySuccess(request.getSequence());
+ case READY:
+ ensureReady();
+ return replyModifySuccess(request.getSequence());
case SIMPLE:
- readyCohort = openTransaction.ready();
- openTransaction = null;
+ ensureReady();
directCommit(envelope, now);
return null;
case THREE_PHASE:
- readyCohort = openTransaction.ready();
- openTransaction = null;
+ ensureReady();
coordinatedCommit(envelope, now);
return null;
default:
throw new UnsupportedRequestException(request);
}
}
+
+ private void ensureReady() {
+ // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
+ // only once.
+ if (readyCohort == null) {
+ readyCohort = openTransaction.ready();
+ LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier());
+ openTransaction = null;
+ }
+ }
}