import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
@Override
CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
- final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(),
- modification, coordinated);
+ final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, nextSequence(),
+ localActor(), modification, coordinated);
modification = new FailedDataTreeModification(this::submittedException);
return ret;
}
@Override
void handleForwardedRemoteRequest(final TransactionRequest<?> request,
final @Nullable Consumer<Response<?, ?>> callback) {
- LOG.debug("Applying forwaded request {}", request);
+ LOG.debug("Applying forwarded request {}", request);
if (request instanceof ModifyTransactionRequest) {
applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+ } else if (request instanceof ReadTransactionRequest) {
+ final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
+ final Optional<NormalizedNode<?, ?>> result = modification.readNode(path);
+ callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ } else if (request instanceof ExistsTransactionRequest) {
+ final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
+ final boolean result = modification.readNode(path).isPresent();
+ callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ } else if (request instanceof TransactionPreCommitRequest) {
+ sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionDoCommitRequest) {
+ sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionAbortRequest) {
+ sendAbort(callback);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}