import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
isSnapshotOnly()), t -> completeRead(future, t), future);
}
- @Override
- void doAbort() {
- ensureInitializedBuilder();
- builder.setAbort();
- flushBuilder();
- }
-
private void ensureInitializedBuilder() {
if (!builderBusy) {
builder.setSequence(nextSequence());
failFuture(future, response);
}
- recordFinishedRequest();
+ recordFinishedRequest(response);
}
private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
failFuture(future, response);
}
- recordFinishedRequest();
+ recordFinishedRequest(response);
}
- private ModifyTransactionRequest abortRequest() {
+ @Override
+ ModifyTransactionRequest abortRequest() {
ensureInitializedBuilder();
builder.setAbort();
final ModifyTransactionRequest ret = builder.build();
ensureFlushedBuider();
sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
callback.accept(resp);
});
} else if (request instanceof ExistsTransactionRequest) {
ensureFlushedBuider();
sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
callback.accept(resp);
});
} else if (request instanceof TransactionPreCommitRequest) {
sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider();
- sendAbort(callback);
+ sendDoAbort(callback);
} else if (request instanceof TransactionPurgeRequest) {
- sendPurge();
+ sendPurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}
ensureFlushedBuider(optTicks);
enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
cb.accept(resp);
}, enqueuedTicks);
} else if (request instanceof ExistsTransactionRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
- recordFinishedRequest();
+ recordFinishedRequest(resp);
cb.accept(resp);
}, enqueuedTicks);
} else if (request instanceof TransactionPreCommitRequest) {
enqueuedTicks);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider(optTicks);
- enqueueAbort(callback, enqueuedTicks);
+ enqueueDoAbort(callback, enqueuedTicks);
} else if (request instanceof TransactionPurgeRequest) {
- enqueuePurge(enqueuedTicks);
+ enqueuePurge(callback, enqueuedTicks);
+ } else if (request instanceof IncrementTransactionSequenceRequest) {
+ final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
+ ensureFlushedBuider(optTicks);
+ enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
+ snapshotOnly, req.getIncrement()), callback, enqueuedTicks);
+ incrementSequence(req.getIncrement());
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}