import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
private volatile Exception operationFailure;
RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
- final boolean snapshotOnly, final boolean sendReadyOnSeal) {
- super(parent);
+ final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) {
+ super(parent, isDone);
this.snapshotOnly = snapshotOnly;
this.sendReadyOnSeal = sendReadyOnSeal;
builder = new ModifyTransactionRequestBuilder(identifier, localActor());
isSnapshotOnly()), t -> completeRead(future, t), future);
}
- @Override
- void doAbort() {
- ensureInitializedBuilder();
- builder.setAbort();
- flushBuilder();
- }
-
private void ensureInitializedBuilder() {
if (!builderBusy) {
builder.setSequence(nextSequence());
}
private void ensureFlushedBuider() {
- if (builderBusy) {
- flushBuilder();
- }
+ ensureFlushedBuider(Optional.absent());
}
private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
}
}
- private void flushBuilder() {
- flushBuilder(Optional.absent());
- }
-
private void flushBuilder(final Optional<Long> enqueuedTicks) {
final ModifyTransactionRequest request = builder.build();
builderBusy = false;
failFuture(future, response);
}
- recordFinishedRequest();
+ recordFinishedRequest(response);
}
private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
failFuture(future, response);
}
- recordFinishedRequest();
+ recordFinishedRequest(response);
}
- private ModifyTransactionRequest abortRequest() {
+ @Override
+ ModifyTransactionRequest abortRequest() {
ensureInitializedBuilder();
builder.setAbort();
- final ModifyTransactionRequest ret = builder.build();
builderBusy = false;
- return ret;
+ return builder.build();
}
@Override
ModifyTransactionRequest commitRequest(final boolean coordinated) {
ensureInitializedBuilder();
builder.setCommit(coordinated);
+ builderBusy = false;
+ return builder.build();
+ }
- final ModifyTransactionRequest ret = builder.build();
+ private ModifyTransactionRequest readyRequest() {
+ ensureInitializedBuilder();
+ builder.setReady();
builderBusy = false;
- return ret;
+ return builder.build();
}
@Override
- void doSeal() {
+ boolean sealAndSend(final Optional<Long> enqueuedTicks) {
if (sendReadyOnSeal) {
ensureInitializedBuilder();
builder.setReady();
- flushBuilder();
+ flushBuilder(enqueuedTicks);
}
+ return super.sealAndSend(enqueuedTicks);
}
@Override
final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
if (maybeProto.isPresent()) {
- ensureSealed();
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
final TransactionRequest<?> tmp;
switch (maybeProto.get()) {
});
break;
case READY:
- //no op
+ tmp = readyRequest();
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
break;
default:
throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
ensureFlushedBuider();
sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
callback.accept(resp);
});
} else if (request instanceof ExistsTransactionRequest) {
ensureFlushedBuider();
sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
callback.accept(resp);
});
} else if (request instanceof TransactionPreCommitRequest) {
sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider();
- sendAbort(callback);
+ sendDoAbort(callback);
} else if (request instanceof TransactionPurgeRequest) {
- sendPurge();
+ enqueuePurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}
final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
if (maybeProto.isPresent()) {
- ensureSealed();
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
final TransactionRequest<?> tmp;
switch (maybeProto.get()) {
}, enqueuedTicks);
break;
case READY:
- //no op
+ tmp = readyRequest();
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
break;
default:
throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
ensureFlushedBuider(optTicks);
enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
cb.accept(resp);
}, enqueuedTicks);
} else if (request instanceof ExistsTransactionRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
cb.accept(resp);
}, enqueuedTicks);
} else if (request instanceof TransactionPreCommitRequest) {
enqueuedTicks);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider(optTicks);
- enqueueAbort(callback, enqueuedTicks);
+ enqueueDoAbort(callback, enqueuedTicks);
} else if (request instanceof TransactionPurgeRequest) {
- enqueuePurge(enqueuedTicks);
+ enqueuePurge(callback, enqueuedTicks);
+ } else if (request instanceof IncrementTransactionSequenceRequest) {
+ final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
+ ensureFlushedBuider(optTicks);
+ enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
+ snapshotOnly, req.getIncrement()), callback, enqueuedTicks);
+ incrementSequence(req.getIncrement());
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}