import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
@Nullable Consumer<Response<?, ?>> callback);
+ abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
+ @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
+
@Override
final CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent());
}
@Override
- void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+ void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ if (request instanceof AbortLocalTransactionRequest) {
+ enqueueAbort(request, callback, enqueuedTicks);
+ } else {
+ throw new IllegalArgumentException("Unhandled request" + request);
+ }
+ }
+
+ private boolean handleReadRequest(final TransactionRequest<?> request,
final @Nullable Consumer<Response<?, ?>> callback) {
- if (request instanceof ModifyTransactionRequest) {
- applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
- } else if (request instanceof ReadTransactionRequest) {
+ if (request instanceof ReadTransactionRequest) {
final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ return true;
} else if (request instanceof ExistsTransactionRequest) {
final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
final boolean result = readOnlyView().readNode(path).isPresent();
callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ void handleReplayedRemoteRequest(final TransactionRequest<?> request,
+ final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ if (request instanceof ModifyTransactionRequest) {
+ replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks);
+ } else if (handleReadRequest(request, callback)) {
+ // No-op
+ } else if (request instanceof TransactionPurgeRequest) {
+ enqueuePurge(enqueuedTicks);
+ } else if (request instanceof IncrementTransactionSequenceRequest) {
+ // Local transactions do not have non-replayable requests which would be visible to the backend,
+ // hence we can skip sequence increments.
+ LOG.debug("Not replaying {}", request);
+ } else {
+ throw new IllegalArgumentException("Unhandled request " + request);
+ }
+ }
+
+ /**
+ * Remote-to-local equivalent of {@link #handleReplayedRemoteRequest(TransactionRequest, Consumer, long)},
+ * except it is invoked in the forwarding path from
+ * {@link RemoteProxyTransaction#forwardToLocal(LocalProxyTransaction, TransactionRequest, Consumer)}.
+ *
+ * @param request Forwarded request
+ * @param callback Callback to be invoked once the request completes
+ */
+ void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ if (request instanceof ModifyTransactionRequest) {
+ applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+ } else if (handleReadRequest(request, callback)) {
+ // No-op
} else if (request instanceof TransactionPurgeRequest) {
- purge();
+ sendPurge();
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
}
@Override
- void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+ final void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
final Consumer<Response<?, ?>> callback) {
if (request instanceof CommitLocalTransactionRequest) {
final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
successor.abort();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
- successor.purge();
+ successor.sendPurge();
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
if (request instanceof AbortLocalTransactionRequest) {
successor.sendAbort(request, callback);
} else if (request instanceof TransactionPurgeRequest) {
- successor.purge();
+ successor.sendPurge();
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
sendRequest(request, callback);
}
+
+ void enqueueAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ enqueueRequest(request, callback, enqueuedTicks);
+ }
}