import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
private final DataBroker db;
private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
- new LinkedBlockingQueue<>(QUEUE_SIZE);
private final ExecutorService executor;
- private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
- private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+ @GuardedBy("this")
+ private final Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
+ @GuardedBy("this")
+ private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+
private BindingTransactionChain chain;
//This is made volatile as it is accessed from uncaught exception handler thread also
private volatile ReadWriteTransaction transactionInFlight = null;
@Override
public void run() {
while (true) {
- forgetSuccessfulTransactions();
-
- List<TransactionCommand> commands = null;
+ final List<TransactionCommand> commands;
try {
commands = extractCommands();
} catch (InterruptedException e) {
try {
while (commandIterator.hasNext()) {
TransactionCommand command = commandIterator.next();
- final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
- transactionInFlight = transaction;
- recordPendingTransaction(command, transaction);
- command.execute(transaction);
- ListenableFuture<Void> ft = transaction.submit();
- command.setTransactionResultFuture(ft);
- Futures.addCallback(ft, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- if (!successfulTransactionQueue.offer(transaction)) {
- LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
- successfulTransactionQueue.size(), transaction);
+ synchronized (this) {
+ final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+ transactionInFlight = transaction;
+ recordPendingTransaction(command, transaction);
+ command.execute(transaction);
+ ListenableFuture<Void> ft = transaction.submit();
+ command.setTransactionResultFuture(ft);
+ Futures.addCallback(ft, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ forgetSuccessfulTransaction(transaction);
}
- }
- @Override
- public void onFailure(final Throwable throwable) {
- // NOOP - handled by failure of transaction chain
- }
- }, MoreExecutors.directExecutor());
+ @Override
+ public void onFailure(final Throwable throwable) {
+ // NOOP - handled by failure of transaction chain
+ }
+ }, MoreExecutors.directExecutor());
+ }
}
transactionInFlight = null;
} catch (IllegalStateException e) {
}
private List<TransactionCommand> extractResubmitCommands() {
- AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
List<TransactionCommand> commands = new ArrayList<>();
- if (transaction != null) {
- int index = pendingTransactions.lastIndexOf(transaction);
- //This logic needs to be revisited. Is it ok to resubmit these things again ?
- //are these operations idempotent ?
- //Does the transaction chain execute n+1th if nth one threw error ?
- List<ReadWriteTransaction> transactions =
- pendingTransactions.subList(index, pendingTransactions.size() - 1);
- for (ReadWriteTransaction tx: transactions) {
- commands.add(transactionToCommand.get(tx));
+ synchronized (this) {
+ AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
+ if (transaction != null) {
+ int index = pendingTransactions.lastIndexOf(transaction);
+ //This logic needs to be revisited. Is it ok to resubmit these things again ?
+ //are these operations idempotent ?
+ //Does the transaction chain execute n+1th if nth one threw error ?
+ List<ReadWriteTransaction> transactions =
+ pendingTransactions.subList(index, pendingTransactions.size() - 1);
+ for (ReadWriteTransaction tx: transactions) {
+ commands.add(transactionToCommand.get(tx));
+ }
+ resetTransactionQueue();
}
- resetTransactionQueue();
}
if (commandIterator != null) {
while (commandIterator.hasNext()) {
private void resetTransactionQueue() {
chain.close();
chain = db.createTransactionChain(this);
- pendingTransactions = new ArrayList<>();
- transactionToCommand = new HashMap<>();
+ pendingTransactions.clear();
+ transactionToCommand.clear();
failedTransactionQueue.clear();
- successfulTransactionQueue.clear();
}
- private void recordPendingTransaction(final TransactionCommand command,
+ synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
+ pendingTransactions.remove(transaction);
+ transactionToCommand.remove(transaction);
+ }
+
+ private synchronized void recordPendingTransaction(final TransactionCommand command,
final ReadWriteTransaction transaction) {
transactionToCommand.put(transaction, command);
pendingTransactions.add(transaction);
return result;
}
- private void forgetSuccessfulTransactions() {
- ReadWriteTransaction transaction = successfulTransactionQueue.poll();
- while (transaction != null) {
- pendingTransactions.remove(transaction);
- transactionToCommand.remove(transaction);
- transaction = successfulTransactionQueue.poll();
- }
- }
-
@Override
public void close() throws Exception {
this.chain.close();
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
private final DataBroker db;
private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
- new LinkedBlockingQueue<>(QUEUE_SIZE);
private final ExecutorService executor;
private final AtomicBoolean runTask = new AtomicBoolean(true);
- private Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
- private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+ @GuardedBy("this")
+ private final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
+ @GuardedBy("this")
+ private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+
private BindingTransactionChain chain;
public TransactionInvokerImpl(final DataBroker db) {
@Override
public void run() {
while (runTask.get()) {
- forgetSuccessfulTransactions();
-
- List<TransactionCommand> commands = null;
+ final List<TransactionCommand> commands;
try {
commands = extractCommands();
} catch (InterruptedException e) {
ReadWriteTransaction transactionInFlight = null;
try {
for (TransactionCommand command: commands) {
- final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
- transactionInFlight = transaction;
- recordPendingTransaction(command, transaction);
- command.execute(transaction);
- Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- if (!successfulTransactionQueue.offer(transaction)) {
- LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
- successfulTransactionQueue.size(), transaction);
+ synchronized (this) {
+ final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+ transactionInFlight = transaction;
+ recordPendingTransaction(command, transaction);
+ command.execute(transaction);
+ Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ forgetSuccessfulTransaction(transaction);
+ command.onSuccess();
}
- command.onSuccess();
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- command.onFailure(throwable);
- // NOOP - handled by failure of transaction chain
- }
- }, MoreExecutors.directExecutor());
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ command.onFailure(throwable);
+ // NOOP - handled by failure of transaction chain
+ }
+ }, MoreExecutors.directExecutor());
+ }
}
} catch (IllegalStateException e) {
if (transactionInFlight != null) {
}
@VisibleForTesting
- List<TransactionCommand> extractResubmitCommands() {
+ synchronized List<TransactionCommand> extractResubmitCommands() {
AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
List<TransactionCommand> commands = new ArrayList<>();
if (transaction != null) {
}
@VisibleForTesting
- void resetTransactionQueue() {
+ synchronized void resetTransactionQueue() {
chain.close();
chain = db.createTransactionChain(this);
- pendingTransactions = new ArrayList<>();
- transactionToCommand = new HashMap<>();
+ pendingTransactions.clear();
+ transactionToCommand.clear();
failedTransactionQueue.clear();
- successfulTransactionQueue.clear();
}
- private void recordPendingTransaction(final TransactionCommand command,
+ synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
+ pendingTransactions.remove(transaction);
+ transactionToCommand.remove(transaction);
+ }
+
+ private synchronized void recordPendingTransaction(final TransactionCommand command,
final ReadWriteTransaction transaction) {
transactionToCommand.put(transaction, command);
pendingTransactions.add(transaction);
return result;
}
- private void forgetSuccessfulTransactions() {
- ReadWriteTransaction transaction = successfulTransactionQueue.poll();
- while (transaction != null) {
- pendingTransactions.remove(transaction);
- transactionToCommand.remove(transaction);
- transaction = successfulTransactionQueue.poll();
- }
- }
-
@Override
public void close() throws InterruptedException {
this.chain.close();
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.ovsdb.southbound.transactions.md;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
transactionToCommand);
getField(TransactionInvokerImpl.class, "failedTransactionQueue").set(transactionInvokerImpl,
failedTransactionQ);
- getField(TransactionInvokerImpl.class, "successfulTransactionQueue").set(transactionInvokerImpl, successfulTxQ);
Whitebox.invokeMethod(transactionInvokerImpl, "resetTransactionQueue");
assertNotNull(Whitebox.getInternalState(transactionInvokerImpl, "pendingTransactions"));
assertEquals(testResult, Whitebox.invokeMethod(transactionInvokerImpl, "extractCommandsFromQueue"));
}
- @Test
- public void testForgetSuccessfulTransactions() throws Exception {
- ReadWriteTransaction transaction = mock(ReadWriteTransaction.class);
- successfulTxQ.add(transaction);
- pendingTransactions.add(transaction);
- transactionToCommand.put(transaction, mock(TransactionCommand.class));
- getField(TransactionInvokerImpl.class, "successfulTransactionQueue").set(transactionInvokerImpl, successfulTxQ);
- getField(TransactionInvokerImpl.class, "pendingTransactions").set(transactionInvokerImpl, pendingTransactions);
- getField(TransactionInvokerImpl.class, "transactionToCommand").set(transactionInvokerImpl,
- transactionToCommand);
-
- Whitebox.invokeMethod(transactionInvokerImpl, "forgetSuccessfulTransactions");
-
- List<ReadWriteTransaction> testPendingTransactions = Whitebox.getInternalState(transactionInvokerImpl,
- "pendingTransactions");
- Map<ReadWriteTransaction, TransactionCommand> testTransactionToCommand = Whitebox
- .getInternalState(transactionInvokerImpl, "transactionToCommand");
- assertTrue(testPendingTransactions.isEmpty());
- assertTrue(testTransactionToCommand.isEmpty());
- }
-
@Test
public void testClose() throws Exception {
getField(TransactionInvokerImpl.class, "executor").set(transactionInvokerImpl, executor);