We still have a tiny race window where we do not correctly handle
reconnection, leading to an ISE splat -- this time if the final
stage of transaction completion.
The problem is that a reconnect attempt is happening while we are
waiting for the transaction purge to complete. At this point the
transaction has been completely resolved and the only remaining
task is to inform the backend that we have received all of the
state and hence it can throw it out (we will remove our state once
purge completes).
We are still allocating a live transaction in the local history
and the purge request replay does not logically close it, leading
to the splat.
To fix this, we really need to allocate a non-open tx successor,
which will not trip the successor local history. All the required
knowledge already resides in AbstractProxyHistory, except we do
not have a dedicated 'purging' state.
This makes the first step of moving the allocation caller to the
appropriate place.
Change-Id: If82957019b478f4d5132edda4d38e6bc026aa0ab
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
cc5009b8f3ea91f64ee48cda815c6a5e73a8a1af)
}
// Called with the connection locked
- final void replayMessages(final AbstractProxyTransaction successor,
- final Iterable<ConnectionEntry> enqueuedEntries) {
+ final void replayMessages(final ProxyHistory successorHistory, final Iterable<ConnectionEntry> enqueuedEntries) {
final SuccessorState local = getSuccessorState();
+ final State prevState = local.getPrevState();
+
+ final AbstractProxyTransaction successor = successorHistory.createTransactionProxy(getIdentifier(),
+ isSnapshotOnly());
+ LOG.debug("{} created successor transaction proxy {}", this, successor);
local.setSuccessor(successor);
// Replay successful requests first
* reconnecting have been forced to slow paths, which will be unlocked once we unblock the state latch
* at the end of this method.
*/
- final State prevState = local.getPrevState();
if (SEALED.equals(prevState)) {
LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
flushState(successor);
}
for (AbstractProxyTransaction t : proxies.values()) {
- LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
- final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
- t.isSnapshotOnly());
- LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
- t.replayMessages(newProxy, previousEntries);
+ LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
+ t.replayMessages(successor, previousEntries);
}
// Now look for any finalizing messages
return parent;
}
- final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+ AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
final boolean snapshotOnly) {
lock.lock();
try {
new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
transaction.recordSuccessfulRequest(successful2);
transaction.startReconnect();
- transaction.replayMessages(successor.getTransaction(), entries);
+
+ final ProxyHistory mockSuccessor = mock(ProxyHistory.class);
+ when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly()))
+ .thenReturn(successor.getTransaction());
+
+ transaction.replayMessages(mockSuccessor, entries);
final ModifyTransactionRequest transformed = successor.expectTransactionRequest(ModifyTransactionRequest.class);
Assert.assertNotNull(transformed);