import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
private volatile Exception operationFailure;
RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
- final boolean snapshotOnly, final boolean sendReadyOnSeal) {
- super(parent);
+ final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) {
+ super(parent, isDone);
this.snapshotOnly = snapshotOnly;
this.sendReadyOnSeal = sendReadyOnSeal;
builder = new ModifyTransactionRequestBuilder(identifier, localActor());
isSnapshotOnly()), t -> completeRead(future, t), future);
}
- @Override
- void doAbort() {
- ensureInitializedBuilder();
- builder.setAbort();
- flushBuilder();
- }
-
private void ensureInitializedBuilder() {
if (!builderBusy) {
builder.setSequence(nextSequence());
}
private void ensureFlushedBuider() {
- if (builderBusy) {
- flushBuilder();
- }
+ ensureFlushedBuider(Optional.absent());
}
private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
}
}
- private void flushBuilder() {
- flushBuilder(Optional.absent());
- }
-
private void flushBuilder(final Optional<Long> enqueuedTicks) {
final ModifyTransactionRequest request = builder.build();
builderBusy = false;
recordFinishedRequest(response);
}
- private ModifyTransactionRequest abortRequest() {
+ @Override
+ ModifyTransactionRequest abortRequest() {
ensureInitializedBuilder();
builder.setAbort();
- final ModifyTransactionRequest ret = builder.build();
builderBusy = false;
- return ret;
+ return builder.build();
}
@Override
ModifyTransactionRequest commitRequest(final boolean coordinated) {
ensureInitializedBuilder();
builder.setCommit(coordinated);
+ builderBusy = false;
+ return builder.build();
+ }
- final ModifyTransactionRequest ret = builder.build();
+ private ModifyTransactionRequest readyRequest() {
+ ensureInitializedBuilder();
+ builder.setReady();
builderBusy = false;
- return ret;
+ return builder.build();
}
@Override
- void doSeal() {
+ boolean sealAndSend(final Optional<Long> enqueuedTicks) {
if (sendReadyOnSeal) {
ensureInitializedBuilder();
builder.setReady();
- flushBuilder();
+ flushBuilder(enqueuedTicks);
}
+ return super.sealAndSend(enqueuedTicks);
}
@Override
- void flushState(final AbstractProxyTransaction successor) {
- if (builderBusy) {
- final ModifyTransactionRequest request = builder.build();
- builderBusy = false;
- forwardToSuccessor(successor, request, null);
+ java.util.Optional<ModifyTransactionRequest> flushState() {
+ if (!builderBusy) {
+ return java.util.Optional.empty();
}
+
+ final ModifyTransactionRequest request = builder.build();
+ builderBusy = false;
+ return java.util.Optional.of(request);
}
@Override
successor.handleForwardedRequest(request, callback);
}
- private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
if (request instanceof ModifyTransactionRequest) {
- final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
-
- req.getModifications().forEach(this::appendModification);
-
- final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
- if (maybeProto.isPresent()) {
- ensureSealed();
-
- final TransactionRequest<?> tmp;
- switch (maybeProto.get()) {
- case ABORT:
- tmp = abortRequest();
- sendRequest(tmp, resp -> {
- completeModify(tmp, resp);
- callback.accept(resp);
- });
- break;
- case SIMPLE:
- tmp = commitRequest(false);
- sendRequest(tmp, resp -> {
- completeModify(tmp, resp);
- callback.accept(resp);
- });
- break;
- case THREE_PHASE:
- tmp = commitRequest(true);
- sendRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- callback.accept(resp);
- });
- break;
- case READY:
- //no op
- break;
- default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
- }
- }
+ handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
ensureFlushedBuider();
sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider();
- sendAbort(callback);
+ sendDoAbort(callback);
} else if (request instanceof TransactionPurgeRequest) {
- sendPurge();
+ enqueuePurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}
}
+ private void handleForwardedModifyTransactionRequest(final Consumer<Response<?, ?>> callback,
+ final ModifyTransactionRequest req) {
+ req.getModifications().forEach(this::appendModification);
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+ if (maybeProto.isPresent()) {
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
+
+ final TransactionRequest<?> tmp;
+ switch (maybeProto.get()) {
+ case ABORT:
+ tmp = abortRequest();
+ sendRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ callback.accept(resp);
+ });
+ break;
+ case SIMPLE:
+ tmp = commitRequest(false);
+ sendRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ callback.accept(resp);
+ });
+ break;
+ case THREE_PHASE:
+ tmp = commitRequest(true);
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
+ break;
+ case READY:
+ tmp = readyRequest();
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ }
+ }
+ }
+
@Override
void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
final Consumer<Response<?, ?>> callback) {
@Override
void handleReplayedRemoteRequest(final TransactionRequest<?> request,
- final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
- final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { };
+ @Nullable final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { /* NOOP */ };
final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
if (request instanceof ModifyTransactionRequest) {
- final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
- for (TransactionModification mod : req.getModifications()) {
- appendModification(mod, optTicks);
- }
-
- final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
- if (maybeProto.isPresent()) {
- ensureSealed();
-
- final TransactionRequest<?> tmp;
- switch (maybeProto.get()) {
- case ABORT:
- tmp = abortRequest();
- enqueueRequest(tmp, resp -> {
- completeModify(tmp, resp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case SIMPLE:
- tmp = commitRequest(false);
- enqueueRequest(tmp, resp -> {
- completeModify(tmp, resp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case THREE_PHASE:
- tmp = commitRequest(true);
- enqueueRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case READY:
- //no op
- break;
- default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
- }
- }
+ handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
enqueuedTicks);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider(optTicks);
- enqueueAbort(callback, enqueuedTicks);
+ enqueueDoAbort(callback, enqueuedTicks);
} else if (request instanceof TransactionPurgeRequest) {
- enqueuePurge(enqueuedTicks);
+ enqueuePurge(callback, enqueuedTicks);
} else if (request instanceof IncrementTransactionSequenceRequest) {
final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
ensureFlushedBuider(optTicks);
throw new IllegalArgumentException("Unhandled request {}" + request);
}
}
+
+ private void handleReplayedModifyTransactionRequest(final long enqueuedTicks, final Consumer<Response<?, ?>> cb,
+ final ModifyTransactionRequest req) {
+ req.getModifications().forEach(this::appendModification);
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+ if (maybeProto.isPresent()) {
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
+
+ final TransactionRequest<?> tmp;
+ switch (maybeProto.get()) {
+ case ABORT:
+ tmp = abortRequest();
+ enqueueRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case SIMPLE:
+ tmp = commitRequest(false);
+ enqueueRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case THREE_PHASE:
+ tmp = commitRequest(true);
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case READY:
+ tmp = readyRequest();
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ }
+ }
+ }
}