import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
+import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
private final AtomicBoolean runTask = new AtomicBoolean(true);
@GuardedBy("this")
- private final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
- @GuardedBy("this")
- private final Queue<ReadWriteTransaction> pendingTransactions = new ArrayDeque<>();
+ private final Queue<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions = new ArrayDeque<>();
private BindingTransactionChain chain;
}
@VisibleForTesting
- TransactionInvokerImpl(final DataBroker db, final List<ReadWriteTransaction> pendingTransactions,
- final List<ReadWriteTransaction> failedTransactions,
- final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand) {
+ TransactionInvokerImpl(final DataBroker db,
+ final List<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions,
+ final List<ReadWriteTransaction> failedTransactions) {
this(db, (ExecutorService) null);
// Initialize state
this.pendingTransactions.addAll(pendingTransactions);
this.failedTransactionQueue.addAll(failedTransactions);
- this.transactionToCommand.putAll(transactionToCommand);
}
@VisibleForTesting
- TransactionInvokerImpl(final DataBroker db, final List<ReadWriteTransaction> pendingTransactions) {
- this(db, pendingTransactions, Collections.emptyList(), Collections.emptyMap());
+ TransactionInvokerImpl(final DataBroker db,
+ final List<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions) {
+ this(db, pendingTransactions, Collections.emptyList());
}
@Override
List<TransactionCommand> commands = new ArrayList<>();
if (transaction != null) {
// Process all pending transactions, looking for the failed one...
- final Iterator<ReadWriteTransaction> it = pendingTransactions.iterator();
+ final Iterator<Entry<ReadWriteTransaction, TransactionCommand>> it = pendingTransactions.iterator();
while (it.hasNext()) {
- final ReadWriteTransaction current = it.next();
- if (transaction.equals(current)) {
- // .. collect current and all remaining pending transactions
- commands.add(transactionToCommand.get(current));
- it.forEachRemaining(tx -> commands.add(transactionToCommand.get(tx)));
+ final Entry<ReadWriteTransaction, TransactionCommand> current = it.next();
+ if (transaction.equals(current.getKey())) {
+ // .. collect current and all remaining pending transactions' values
+ commands.add(current.getValue());
+ it.forEachRemaining(entry -> commands.add(entry.getValue()));
break;
}
}
chain.close();
chain = db.createTransactionChain(this);
pendingTransactions.clear();
- transactionToCommand.clear();
failedTransactionQueue.clear();
}
synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
- pendingTransactions.remove(transaction);
- transactionToCommand.remove(transaction);
+ Iterator<Entry<ReadWriteTransaction, TransactionCommand>> it = pendingTransactions.iterator();
+ while (it.hasNext()) {
+ final Entry<ReadWriteTransaction, TransactionCommand> entry = it.next();
+ if (transaction.equals(entry.getKey())) {
+ it.remove();
+ break;
+ }
+ }
}
@VisibleForTesting
synchronized void recordPendingTransaction(final TransactionCommand command,
final ReadWriteTransaction transaction) {
- transactionToCommand.put(transaction, command);
- pendingTransactions.add(transaction);
+ pendingTransactions.add(new SimpleImmutableEntry<>(transaction, command));
}
@VisibleForTesting
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.powermock.reflect.Whitebox.getInternalState;
import com.google.common.collect.ImmutableList;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import org.junit.Before;
final ReadWriteTransaction tx1 = mock(ReadWriteTransaction.class);
final ReadWriteTransaction tx2 = mock(ReadWriteTransaction.class);
final ReadWriteTransaction tx3 = mock(ReadWriteTransaction.class);
-
- final Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
- transactionToCommand.put(tx1, mock(TransactionCommand.class));
+ final TransactionCommand cmd1 = mock(TransactionCommand.class);
final TransactionCommand cmd2 = mock(TransactionCommand.class);
- transactionToCommand.put(tx2, cmd2);
final TransactionCommand cmd3 = mock(TransactionCommand.class);
- transactionToCommand.put(tx3, cmd3);
final TransactionInvokerImpl invoker = new TransactionInvokerImpl(db,
// Given pending transaction order ...
- ImmutableList.of(tx1, tx2, tx3),
+ ImmutableList.of(entry(tx1, cmd1), entry(tx2, cmd2), entry(tx3, cmd3)),
// .. if tx2 fails ...
- Collections.singletonList(tx2),
- transactionToCommand);
+ Collections.singletonList(tx2));
// .. we want to replay tx2 and tx3
assertEquals(ImmutableList.of(cmd2, cmd3), invoker.extractResubmitCommands());
}
+ private static <K, V> Entry<K, V> entry(final K key, final V value) {
+ return new SimpleImmutableEntry<>(key, value);
+ }
+
@Test
public void testResetTransactionQueue() {
final TransactionInvokerImpl invoker = new TransactionInvokerImpl(db, Collections.emptyList(),
- Collections.singletonList(mock(ReadWriteTransaction.class)), Collections.emptyMap());
+ Collections.singletonList(mock(ReadWriteTransaction.class)));
invoker.resetTransactionQueue();
- assertNotNull(getInternalState(invoker, "pendingTransactions"));
- assertNotNull(getInternalState(invoker, "transactionToCommand"));
- final Queue<?> failedTransactionQueue = getInternalState(invoker, "failedTransactionQueue");
- assertEquals(0, failedTransactionQueue.size());
+ assertEmpty(getInternalState(invoker, "pendingTransactions"));
+ assertEmpty(getInternalState(invoker, "failedTransactionQueue"));
+ }
+
+ private static void assertEmpty(final Collection<?> collection) {
+ assertNotNull(collection);
+ assertEquals(0, collection.size());
}
@Test
final ReadWriteTransaction transaction = mock(ReadWriteTransaction.class);
invoker.recordPendingTransaction(command, transaction);
- Queue<ReadWriteTransaction> testPendingTransactions = getInternalState(invoker, "pendingTransactions");
- assertEquals(1, testPendingTransactions.size());
- assertTrue(testPendingTransactions.contains(transaction));
-
- assertEquals(Collections.singletonMap(transaction, command), getInternalState(invoker, "transactionToCommand"));
+ Queue<Entry<?, ?>> endingTransactions = getInternalState(invoker, "pendingTransactions");
+ assertEquals(1, endingTransactions.size());
+ assertSame(transaction, endingTransactions.element().getKey());
+ assertSame(command, endingTransactions.element().getValue());
}
@Test