}
// Called with the connection locked
- final void replayMessages(final AbstractProxyTransaction successor,
- final Iterable<ConnectionEntry> enqueuedEntries) {
+ final void replayMessages(final ProxyHistory successorHistory, final Iterable<ConnectionEntry> enqueuedEntries) {
final SuccessorState local = getSuccessorState();
+ final State prevState = local.getPrevState();
+
+ final AbstractProxyTransaction successor = successorHistory.createTransactionProxy(getIdentifier(),
+ isSnapshotOnly());
+ LOG.debug("{} created successor transaction proxy {}", this, successor);
local.setSuccessor(successor);
// Replay successful requests first
* reconnecting have been forced to slow paths, which will be unlocked once we unblock the state latch
* at the end of this method.
*/
- final State prevState = local.getPrevState();
if (SEALED.equals(prevState)) {
LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
flushState(successor);
}
for (AbstractProxyTransaction t : proxies.values()) {
- LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
- final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
- t.isSnapshotOnly());
- LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
- t.replayMessages(newProxy, previousEntries);
+ LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
+ t.replayMessages(successor, previousEntries);
}
// Now look for any finalizing messages
return parent;
}
- final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+ AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
final boolean snapshotOnly) {
lock.lock();
try {
new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
transaction.recordSuccessfulRequest(successful2);
transaction.startReconnect();
- transaction.replayMessages(successor.getTransaction(), entries);
+
+ final ProxyHistory mockSuccessor = mock(ProxyHistory.class);
+ when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly()))
+ .thenReturn(successor.getTransaction());
+
+ transaction.replayMessages(mockSuccessor, entries);
final ModifyTransactionRequest transformed = successor.expectTransactionRequest(ModifyTransactionRequest.class);
Assert.assertNotNull(transformed);