import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import java.util.function.Consumer;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
private final TransactionIdentifier identifier;
- LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
- super(parent);
+ LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean isDone) {
+ super(parent, isDone);
this.identifier = Preconditions.checkNotNull(identifier);
}
return identifier;
}
- abstract DataTreeSnapshot readOnlyView();
+ abstract @Nonnull DataTreeSnapshot readOnlyView();
- abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
+ abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request,
@Nullable Consumer<Response<?, ?>> callback);
abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
}
@Override
- final void doAbort() {
- sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> {
- LOG.debug("Transaction {} abort completed with {}", identifier, response);
- });
+ final AbortLocalTransactionRequest abortRequest() {
+ return new AbortLocalTransactionRequest(identifier, localActor());
}
@Override
private boolean handleReadRequest(final TransactionRequest<?> request,
final @Nullable Consumer<Response<?, ?>> callback) {
+ // Note we delay completion of read requests to limit the scope at which the client can run, as they have
+ // listeners, which we do not want to execute while we are reconnecting.
if (request instanceof ReadTransactionRequest) {
final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
- callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ if (callback != null) {
+ // XXX: FB does not see that callback is final, on stack and has be check for non-null.
+ final Consumer<Response<?, ?>> fbIsStupid = Preconditions.checkNotNull(callback);
+ executeInActor(() -> fbIsStupid.accept(new ReadTransactionSuccess(request.getTarget(),
+ request.getSequence(), result)));
+ }
return true;
} else if (request instanceof ExistsTransactionRequest) {
final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
final boolean result = readOnlyView().readNode(path).isPresent();
- callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ if (callback != null) {
+ // XXX: FB does not see that callback is final, on stack and has be check for non-null.
+ final Consumer<Response<?, ?>> fbIsStupid = Preconditions.checkNotNull(callback);
+ executeInActor(() -> fbIsStupid.accept(new ExistsTransactionSuccess(request.getTarget(),
+ request.getSequence(), result)));
+ }
return true;
} else {
return false;
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
- enqueuePurge(enqueuedTicks);
+ enqueuePurge(callback, enqueuedTicks);
+ } else if (request instanceof IncrementTransactionSequenceRequest) {
+ // Local transactions do not have non-replayable requests which would be visible to the backend,
+ // hence we can skip sequence increments.
+ LOG.debug("Not replaying {}", request);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
*/
void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
if (request instanceof ModifyTransactionRequest) {
- applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+ applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback);
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
- sendPurge();
+ enqueuePurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
successor.abort();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
- successor.sendPurge();
+ successor.enqueuePurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
if (request instanceof AbortLocalTransactionRequest) {
successor.sendAbort(request, callback);
} else if (request instanceof TransactionPurgeRequest) {
- successor.sendPurge();
+ successor.enqueuePurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}