import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
@GuardedBy("lock")
@Override
- void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
+ void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
// First look for our Create message
- for (ConnectionEntry e : previousEntries) {
+ Iterator<ConnectionEntry> it = previousEntries.iterator();
+ while (it.hasNext()) {
+ final ConnectionEntry e = it.next();
final Request<?, ?> req = e.getRequest();
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
if (req instanceof CreateLocalHistoryRequest) {
successor.connection.sendRequest(req, e.getCallback());
+ it.remove();
break;
}
}
}
// Now look for any finalizing messages
- for (ConnectionEntry e : previousEntries) {
+ it = previousEntries.iterator();
+ while (it.hasNext()) {
+ final ConnectionEntry e = it.next();
final Request<?, ?> req = e.getRequest();
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
- successor.connection.sendRequest(req, e.getCallback());
+ if (req instanceof DestroyLocalHistoryRequest) {
+ successor.connection.sendRequest(req, e.getCallback());
+ it.remove();
+ break;
+ }
}
}
}
}
@Override
- void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
- final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
+ void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+ final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
+ // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
+ // period required to get into the queue.
if (request instanceof TransactionRequest) {
- replayTransactionRequest((TransactionRequest<?>) request, callback);
+ forwardTransactionRequest((TransactionRequest<?>) request, callback);
} else if (request instanceof LocalHistoryRequest) {
- replayTo.accept(request, callback);
+ forwardTo.accept(request, callback);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
}
- private void replayTransactionRequest(final TransactionRequest<?> request,
+ private void forwardTransactionRequest(final TransactionRequest<?> request,
final Consumer<Response<?, ?>> callback) throws RequestException {
final AbstractProxyTransaction proxy;
throw new RequestReplayException("Failed to find proxy for %s", request);
}
- proxy.replayRequest(request, callback);
+ proxy.forwardRequest(request, callback);
}
}
return identifier;
}
+ final long currentTime() {
+ return connection.currentTime();
+ }
+
final ActorRef localActor() {
return connection.localActor();
}
}
}
+ final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ connection.enqueueRequest(request, callback, enqueuedTicks);
+ }
+
final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
connection.sendRequest(request, callback);
}