+ /**
+ * This method is overridden to ensure the previous Tx's ready operations complete
+ * before we initiate the next Tx in the chain to avoid creation failures if the
+ * previous Tx's ready operations haven't completed yet.
+ */
+ @Override
+ protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName) {
+ // Read current state atomically
+ final State localState = currentState;
+
+ // There are no outstanding futures, shortcut
+ final Future<?> previous = localState.previousFuture();
+ if (previous == null) {
+ return parent.findPrimaryShard(shardName);
+ }
+
+ final String previousTransactionId;
+
+ if(localState instanceof Pending){
+ previousTransactionId = ((Pending) localState).getIdentifier().toString();
+ LOG.debug("Waiting for ready futures with pending Tx {}", previousTransactionId);
+ } else {
+ previousTransactionId = "";
+ LOG.debug("Waiting for ready futures on chain {}", getTransactionChainId());
+ }
+
+ // Add a callback for completion of the combined Futures.
+ final Promise<PrimaryShardInfo> returnPromise = akka.dispatch.Futures.promise();
+
+ final OnComplete onComplete = new OnComplete() {
+ @Override
+ public void onComplete(final Throwable failure, final Object notUsed) {
+ if (failure != null) {
+ // A Ready Future failed so fail the returned Promise.
+ LOG.error("Ready future failed for Tx {}", previousTransactionId);
+ returnPromise.failure(failure);
+ } else {
+ LOG.debug("Previous Tx {} readied - proceeding to FindPrimaryShard",
+ previousTransactionId);
+
+ // Send the FindPrimaryShard message and use the resulting Future to complete the
+ // returned Promise.
+ returnPromise.completeWith(parent.findPrimaryShard(shardName));
+ }
+ }
+ };