*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
final class RemoteProxyTransaction extends AbstractProxyTransaction {
private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
+ private static final Function<Exception, Exception> NOOP_EXCEPTION_MAPPER = ex -> ex;
+
// FIXME: make this tuneable
private static final int REQUEST_MAX_MODIFICATIONS = 1000;
@Override
void doDelete(final YangInstanceIdentifier path) {
- appendModification(new TransactionDelete(path), Optional.absent());
+ appendModification(new TransactionDelete(path), Optional.empty());
}
@Override
void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- appendModification(new TransactionMerge(path, data), Optional.absent());
+ appendModification(new TransactionMerge(path, data), Optional.empty());
}
@Override
void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- appendModification(new TransactionWrite(path, data), Optional.absent());
+ appendModification(new TransactionWrite(path, data), Optional.empty());
}
- private <T> CheckedFuture<T, ReadFailedException> sendReadRequest(final AbstractReadTransactionRequest<?> request,
+ private <T> FluentFuture<T> sendReadRequest(final AbstractReadTransactionRequest<?> request,
final Consumer<Response<?, ?>> completer, final ListenableFuture<T> future) {
// Check if a previous operation failed. If it has, do not bother sending anything and report a failure
final Exception local = operationFailure;
if (local != null) {
- return Futures.immediateFailedCheckedFuture(new ReadFailedException("Previous operation failed", local));
+ return FluentFutures.immediateFailedFluentFuture(
+ new ReadFailedException("Previous operation failed", local));
}
// Make sure we send any modifications before issuing a read
ensureFlushedBuider();
sendRequest(request, completer);
- return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
+ return FluentFuture.from(future);
}
@Override
- CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
+ FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
final SettableFuture<Boolean> future = SettableFuture.create();
return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
isSnapshotOnly()), t -> completeExists(future, t), future);
}
@Override
- CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
+ FluentFuture<Optional<NormalizedNode<?, ?>>> doRead(final YangInstanceIdentifier path) {
final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
isSnapshotOnly()), t -> completeRead(future, t), future);
}
private void ensureFlushedBuider() {
- ensureFlushedBuider(Optional.absent());
+ ensureFlushedBuider(Optional.empty());
}
private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
}
private void appendModification(final TransactionModification modification) {
- appendModification(modification, Optional.absent());
+ appendModification(modification, Optional.empty());
}
private void appendModification(final TransactionModification modification, final Optional<Long> enqueuedTicks) {
// Happy path
recordSuccessfulRequest(request);
} else {
- recordFailedResponse(response);
+ recordFailedResponse(response, NOOP_EXCEPTION_MAPPER);
}
}
- private Exception recordFailedResponse(final Response<?, ?> response) {
+ private <X extends Exception> X recordFailedResponse(final Response<?, ?> response,
+ final Function<Exception, X> exMapper) {
final Exception failure;
if (response instanceof RequestFailure) {
failure = ((RequestFailure<?, ?>) response).getCause();
LOG.debug("Transaction {} failed", getIdentifier(), failure);
operationFailure = failure;
}
- return failure;
+ return exMapper.apply(failure);
}
- private void failFuture(final SettableFuture<?> future, final Response<?, ?> response) {
- future.setException(recordFailedResponse(response));
+ private void failReadFuture(final SettableFuture<?> future, final Response<?, ?> response) {
+ future.setException(recordFailedResponse(response, ReadFailedException.MAPPER));
}
private void completeExists(final SettableFuture<Boolean> future, final Response<?, ?> response) {
if (response instanceof ExistsTransactionSuccess) {
future.set(((ExistsTransactionSuccess) response).getExists());
} else {
- failFuture(future, response);
+ failReadFuture(future, response);
}
recordFinishedRequest(response);
if (response instanceof ReadTransactionSuccess) {
future.set(((ReadTransactionSuccess) response).getData());
} else {
- failFuture(future, response);
+ failReadFuture(future, response);
}
recordFinishedRequest(response);
void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
if (request instanceof ModifyTransactionRequest) {
- final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
-
- req.getModifications().forEach(this::appendModification);
-
- final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
- if (maybeProto.isPresent()) {
- // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
- // until we know what we are going to do.
- if (markSealed()) {
- sealOnly();
- }
-
- final TransactionRequest<?> tmp;
- switch (maybeProto.get()) {
- case ABORT:
- tmp = abortRequest();
- sendRequest(tmp, resp -> {
- completeModify(tmp, resp);
- callback.accept(resp);
- });
- break;
- case SIMPLE:
- tmp = commitRequest(false);
- sendRequest(tmp, resp -> {
- completeModify(tmp, resp);
- callback.accept(resp);
- });
- break;
- case THREE_PHASE:
- tmp = commitRequest(true);
- sendRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- callback.accept(resp);
- });
- break;
- case READY:
- tmp = readyRequest();
- sendRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- callback.accept(resp);
- });
- break;
- default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
- }
- }
+ handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
ensureFlushedBuider();
sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
}
}
+ private void handleForwardedModifyTransactionRequest(final Consumer<Response<?, ?>> callback,
+ final ModifyTransactionRequest req) {
+ req.getModifications().forEach(this::appendModification);
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+ if (maybeProto.isPresent()) {
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
+
+ final TransactionRequest<?> tmp;
+ switch (maybeProto.get()) {
+ case ABORT:
+ tmp = abortRequest();
+ sendRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ callback.accept(resp);
+ });
+ break;
+ case SIMPLE:
+ tmp = commitRequest(false);
+ sendRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ callback.accept(resp);
+ });
+ break;
+ case THREE_PHASE:
+ tmp = commitRequest(true);
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
+ break;
+ case READY:
+ tmp = readyRequest();
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ }
+ }
+ }
+
@Override
void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
final Consumer<Response<?, ?>> callback) {
@Override
void handleReplayedRemoteRequest(final TransactionRequest<?> request,
- final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
- final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { };
+ @Nullable final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { /* NOOP */ };
final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
if (request instanceof ModifyTransactionRequest) {
- final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
- for (TransactionModification mod : req.getModifications()) {
- appendModification(mod, optTicks);
- }
-
- final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
- if (maybeProto.isPresent()) {
- // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
- // until we know what we are going to do.
- if (markSealed()) {
- sealOnly();
- }
-
- final TransactionRequest<?> tmp;
- switch (maybeProto.get()) {
- case ABORT:
- tmp = abortRequest();
- enqueueRequest(tmp, resp -> {
- completeModify(tmp, resp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case SIMPLE:
- tmp = commitRequest(false);
- enqueueRequest(tmp, resp -> {
- completeModify(tmp, resp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case THREE_PHASE:
- tmp = commitRequest(true);
- enqueueRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case READY:
- tmp = readyRequest();
- enqueueRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
- }
- }
+ handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
throw new IllegalArgumentException("Unhandled request {}" + request);
}
}
+
+ private void handleReplayedModifyTransactionRequest(final long enqueuedTicks, final Consumer<Response<?, ?>> cb,
+ final ModifyTransactionRequest req) {
+ req.getModifications().forEach(this::appendModification);
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+ if (maybeProto.isPresent()) {
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
+
+ final TransactionRequest<?> tmp;
+ switch (maybeProto.get()) {
+ case ABORT:
+ tmp = abortRequest();
+ enqueueRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case SIMPLE:
+ tmp = commitRequest(false);
+ enqueueRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case THREE_PHASE:
+ tmp = commitRequest(true);
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case READY:
+ tmp = readyRequest();
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ }
+ }
+ }
}