import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
// FIXME: make this tuneable
private static final int REQUEST_MAX_MODIFICATIONS = 1000;
- private final Collection<TransactionRequest<?>> successfulRequests = new ArrayList<>();
private final ModifyTransactionRequestBuilder builder;
private boolean builderBusy;
if (response instanceof TransactionSuccess) {
// Happy path
- successfulRequests.add(request);
+ recordSuccessfulRequest(request);
} else {
recordFailedResponse(response);
}
} else {
failFuture(future, response);
}
+
+ recordFinishedRequest();
}
private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
} else {
failFuture(future, response);
}
+
+ recordFinishedRequest();
}
@Override
// No-op
}
- @Override
- void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
- super.replaySuccessfulRequests(successor);
-
- for (TransactionRequest<?> req : successfulRequests) {
- LOG.debug("Forwarding request {} to successor {}", req, successor);
- successor.handleForwardedRemoteRequest(req, null);
- }
- successfulRequests.clear();
- }
-
@Override
void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
final Consumer<Response<?, ?>> callback) throws RequestException {
throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
}
}
+ } else if (request instanceof ReadTransactionRequest) {
+ ensureFlushedBuider();
+ sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+ ((ReadTransactionRequest) request).getPath()), callback);
+ } else if (request instanceof ExistsTransactionRequest) {
+ ensureFlushedBuider();
+ sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+ ((ExistsTransactionRequest) request).getPath()), callback);
+ } else if (request instanceof TransactionPreCommitRequest) {
+ ensureFlushedBuider();
+ sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionDoCommitRequest) {
+ ensureFlushedBuider();
+ sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionAbortRequest) {
+ ensureFlushedBuider();
+ sendAbort(callback);
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}