import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@GuardedBy("this")
private final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
@GuardedBy("this")
- private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+ private final Queue<ReadWriteTransaction> pendingTransactions = new ArrayDeque<>();
private BindingTransactionChain chain;
AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
List<TransactionCommand> commands = new ArrayList<>();
if (transaction != null) {
- int index = pendingTransactions.lastIndexOf(transaction);
- for (ReadWriteTransaction tx : pendingTransactions.subList(index, pendingTransactions.size())) {
- commands.add(transactionToCommand.get(tx));
+ // Process all pending transactions, looking for the failed one...
+ final Iterator<ReadWriteTransaction> it = pendingTransactions.iterator();
+ while (it.hasNext()) {
+ final ReadWriteTransaction current = it.next();
+ if (transaction.equals(current)) {
+ // .. collect current and all remaining pending transactions
+ commands.add(transactionToCommand.get(current));
+ it.forEachRemaining(tx -> commands.add(transactionToCommand.get(tx)));
+ break;
+ }
}
+
resetTransactionQueue();
}
return commands;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
final ReadWriteTransaction transaction = mock(ReadWriteTransaction.class);
invoker.recordPendingTransaction(command, transaction);
- List<ReadWriteTransaction> testPendingTransactions = getInternalState(invoker, "pendingTransactions");
+ Queue<ReadWriteTransaction> testPendingTransactions = getInternalState(invoker, "pendingTransactions");
assertEquals(1, testPendingTransactions.size());
assertTrue(testPendingTransactions.contains(transaction));