* previous Tx's ready operations haven't completed yet.
*/
@Override
- protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName) {
+ protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName, final String txId) {
// Read current state atomically
final State localState = currentState;
// There are no outstanding futures, shortcut
final Future<?> previous = localState.previousFuture();
if (previous == null) {
- return parent.findPrimaryShard(shardName);
+ return parent.findPrimaryShard(shardName, txId);
}
final String previousTransactionId;
if(localState instanceof Pending){
previousTransactionId = ((Pending) localState).getIdentifier().toString();
- LOG.debug("Waiting for ready futures with pending Tx {}", previousTransactionId);
+ LOG.debug("Tx: {} - waiting for ready futures with pending Tx {}", txId, previousTransactionId);
} else {
previousTransactionId = "";
LOG.debug("Waiting for ready futures on chain {}", getTransactionChainId());
public void onComplete(final Throwable failure, final Object notUsed) {
if (failure != null) {
// A Ready Future failed so fail the returned Promise.
- LOG.error("Ready future failed for Tx {}", previousTransactionId);
+ LOG.error("Tx: {} - ready future failed for previous Tx {}", txId, previousTransactionId);
returnPromise.failure(failure);
} else {
- LOG.debug("Previous Tx {} readied - proceeding to FindPrimaryShard",
- previousTransactionId);
+ LOG.debug("Tx: {} - previous Tx {} readied - proceeding to FindPrimaryShard",
+ txId, previousTransactionId);
// Send the FindPrimaryShard message and use the resulting Future to complete the
// returned Promise.
- returnPromise.completeWith(parent.findPrimaryShard(shardName));
+ returnPromise.completeWith(parent.findPrimaryShard(shardName, txId));
}
}
};