Read/write transactions which transition to ready state
throw away their open transaction, which causes the following
exception:
Shard - member-1-shard-people-testTransactionChainWithMultipleShards: request Envelope{sessionId=1, txSequence=11, message=TransactionPurgeRequest{target=member-2-datastore-testTransactionChainWithMultipleShards-fe-0-chn-1-txn-2-2, sequence=2, replyTo=Actor[akka://cluster-test@127.0.0.1:2559/user/$a#-
493460599]}} caused failure
java.lang.NullPointerException
at org.opendaylight.controller.cluster.datastore.FrontendReadWriteTransaction.purge(FrontendReadWriteTransaction.java:113)
at org.opendaylight.controller.cluster.datastore.AbstractFrontendHistory.handleTransactionRequest(AbstractFrontendHistory.java:114)
at org.opendaylight.controller.cluster.datastore.LeaderFrontendState.handleTransactionRequest(LeaderFrontendState.java:197)
at org.opendaylight.controller.cluster.datastore.Shard.handleRequest(Shard.java:413)
at org.opendaylight.controller.cluster.datastore.Shard.handleNonRaftCommand(Shard.java:277)
Rework purge logic to talk directly to the data tree, which
prevents this from happening and simplifies the code a bit.
Change-Id: I7cc08687648d2473a712c171944a06307e4d8f9f
Signed-off-by: Robert Varga <rovarga@cisco.com>
return new TransactionPurgeResponse(id, request.getSequence());
}
- tx.purge(() -> {
+ tree.purgeTransaction(id, () -> {
purgedTransactions.add(Range.singleton(ul));
transactions.remove(id);
LOG.debug("{}: finished purging transaction {}", persistenceId(), id);
parent.abortTransaction(this, callback);
}
- final void purge(final Runnable callback) {
- if (!closed) {
- LOG.warn("Purging unclosed transaction {}", id);
- }
- parent.purgeTransaction(id, callback);
- }
-
@Override
public final String toString() {
return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot)
}
}
- @Override
- void purge(final Runnable callback) {
- openTransaction.purge(callback);
- }
-
private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope,
final long now) throws RequestException {
openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(request.getTarget(),
}
}
- @Override
- void purge(final Runnable callback) {
- openTransaction.purge(callback);
- }
-
private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
abstract @Nullable TransactionSuccess<?> handleRequest(TransactionRequest<?> request,
RequestEnvelope envelope, long now) throws RequestException;
- // Final request, needs routing to the data tree, so it can persist a tombstone
- abstract void purge(Runnable callback);
-
private void recordResponse(final long sequence, final Object response) {
if (replayQueue.isEmpty()) {
firstReplaySequence = sequence;
replicatePayload(id, AbortTransactionPayload.create(id), callback);
}
-
- @Override
- void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
- LOG.debug("{}: purging transaction {}", logContext, id);
- replicatePayload(id, PurgeTransactionPayload.create(id), callback);
- }
-
@Override
ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
final DataTreeModification snapshot = transaction.getSnapshot();
return createReadyCohort(transaction.getIdentifier(), snapshot);
}
+ void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+ LOG.debug("{}: purging transaction {}", logContext, id);
+ replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+ }
+
public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
return dataTree.takeSnapshot().readNode(path);
}
dataTree.abortTransaction(transaction, callback);
}
- @Override
- void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
- dataTree.purgeTransaction(id, callback);
- }
-
@Override
ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
Preconditions.checkState(openTransaction != null,
abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction, Runnable callback);
- abstract void purgeTransaction(TransactionIdentifier id, Runnable callback);
-
abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);