From: Robert Varga Date: Thu, 5 Dec 2019 21:12:31 +0000 (+0100) Subject: Eliminate TransactionInvokerImpl.successfulTransactionQueue X-Git-Tag: release/magnesium~21 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=9d714a27491cfae217dcd50341908db15a3489cc;p=ovsdb.git Eliminate TransactionInvokerImpl.successfulTransactionQueue This queue is actually just a hand-off between the datastore callback thread and the central thread. There is no need for such a queue, as we can just properly synchronize access to the critical structures. JIRA: OVSDB-428 Change-Id: Idba05aebfa7fd35dbdb706b8bfef03e3e03d7772 Signed-off-by: Robert Varga --- diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionInvokerImpl.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionInvokerImpl.java index 201e6b815..e9d2dc348 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionInvokerImpl.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionInvokerImpl.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; @@ -43,12 +44,13 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha private final DataBroker db; private final BlockingQueue inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final BlockingQueue> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - private final BlockingQueue successfulTransactionQueue = - new LinkedBlockingQueue<>(QUEUE_SIZE); private final ExecutorService executor; - private Map transactionToCommand = new HashMap<>(); - private List pendingTransactions = new ArrayList<>(); + @GuardedBy("this") + private final Map transactionToCommand = new HashMap<>(); + @GuardedBy("this") + private final List pendingTransactions = new ArrayList<>(); + private BindingTransactionChain chain; //This is made volatile as it is accessed from uncaught exception handler thread also private volatile ReadWriteTransaction transactionInFlight = null; @@ -87,9 +89,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha @Override public void run() { while (true) { - forgetSuccessfulTransactions(); - - List commands = null; + final List commands; try { commands = extractCommands(); } catch (InterruptedException e) { @@ -100,26 +100,25 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha try { while (commandIterator.hasNext()) { TransactionCommand command = commandIterator.next(); - final ReadWriteTransaction transaction = chain.newReadWriteTransaction(); - transactionInFlight = transaction; - recordPendingTransaction(command, transaction); - command.execute(transaction); - ListenableFuture ft = transaction.submit(); - command.setTransactionResultFuture(ft); - Futures.addCallback(ft, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - if (!successfulTransactionQueue.offer(transaction)) { - LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}", - successfulTransactionQueue.size(), transaction); + synchronized (this) { + final ReadWriteTransaction transaction = chain.newReadWriteTransaction(); + transactionInFlight = transaction; + recordPendingTransaction(command, transaction); + command.execute(transaction); + ListenableFuture ft = transaction.submit(); + command.setTransactionResultFuture(ft); + Futures.addCallback(ft, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + forgetSuccessfulTransaction(transaction); } - } - @Override - public void onFailure(final Throwable throwable) { - // NOOP - handled by failure of transaction chain - } - }, MoreExecutors.directExecutor()); + @Override + public void onFailure(final Throwable throwable) { + // NOOP - handled by failure of transaction chain + } + }, MoreExecutors.directExecutor()); + } } transactionInFlight = null; } catch (IllegalStateException e) { @@ -143,19 +142,21 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha } private List extractResubmitCommands() { - AsyncTransaction transaction = failedTransactionQueue.poll(); List commands = new ArrayList<>(); - if (transaction != null) { - int index = pendingTransactions.lastIndexOf(transaction); - //This logic needs to be revisited. Is it ok to resubmit these things again ? - //are these operations idempotent ? - //Does the transaction chain execute n+1th if nth one threw error ? - List transactions = - pendingTransactions.subList(index, pendingTransactions.size() - 1); - for (ReadWriteTransaction tx: transactions) { - commands.add(transactionToCommand.get(tx)); + synchronized (this) { + AsyncTransaction transaction = failedTransactionQueue.poll(); + if (transaction != null) { + int index = pendingTransactions.lastIndexOf(transaction); + //This logic needs to be revisited. Is it ok to resubmit these things again ? + //are these operations idempotent ? + //Does the transaction chain execute n+1th if nth one threw error ? + List transactions = + pendingTransactions.subList(index, pendingTransactions.size() - 1); + for (ReadWriteTransaction tx: transactions) { + commands.add(transactionToCommand.get(tx)); + } + resetTransactionQueue(); } - resetTransactionQueue(); } if (commandIterator != null) { while (commandIterator.hasNext()) { @@ -168,13 +169,17 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha private void resetTransactionQueue() { chain.close(); chain = db.createTransactionChain(this); - pendingTransactions = new ArrayList<>(); - transactionToCommand = new HashMap<>(); + pendingTransactions.clear(); + transactionToCommand.clear(); failedTransactionQueue.clear(); - successfulTransactionQueue.clear(); } - private void recordPendingTransaction(final TransactionCommand command, + synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) { + pendingTransactions.remove(transaction); + transactionToCommand.remove(transaction); + } + + private synchronized void recordPendingTransaction(final TransactionCommand command, final ReadWriteTransaction transaction) { transactionToCommand.put(transaction, command); pendingTransactions.add(transaction); @@ -199,15 +204,6 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha return result; } - private void forgetSuccessfulTransactions() { - ReadWriteTransaction transaction = successfulTransactionQueue.poll(); - while (transaction != null) { - pendingTransactions.remove(transaction); - transactionToCommand.remove(transaction); - transaction = successfulTransactionQueue.poll(); - } - } - @Override public void close() throws Exception { this.chain.close(); diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java index b0baa6ebf..98b423e11 100644 --- a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java +++ b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java @@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; @@ -39,14 +40,15 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha private final DataBroker db; private final BlockingQueue inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final BlockingQueue> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - private final BlockingQueue successfulTransactionQueue = - new LinkedBlockingQueue<>(QUEUE_SIZE); private final ExecutorService executor; private final AtomicBoolean runTask = new AtomicBoolean(true); - private Map transactionToCommand = new HashMap<>(); - private List pendingTransactions = new ArrayList<>(); + @GuardedBy("this") + private final Map transactionToCommand = new HashMap<>(); + @GuardedBy("this") + private final List pendingTransactions = new ArrayList<>(); + private BindingTransactionChain chain; public TransactionInvokerImpl(final DataBroker db) { @@ -80,9 +82,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha @Override public void run() { while (runTask.get()) { - forgetSuccessfulTransactions(); - - List commands = null; + final List commands; try { commands = extractCommands(); } catch (InterruptedException e) { @@ -93,26 +93,25 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha ReadWriteTransaction transactionInFlight = null; try { for (TransactionCommand command: commands) { - final ReadWriteTransaction transaction = chain.newReadWriteTransaction(); - transactionInFlight = transaction; - recordPendingTransaction(command, transaction); - command.execute(transaction); - Futures.addCallback(transaction.submit(), new FutureCallback() { - @Override - public void onSuccess(final Void result) { - if (!successfulTransactionQueue.offer(transaction)) { - LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}", - successfulTransactionQueue.size(), transaction); + synchronized (this) { + final ReadWriteTransaction transaction = chain.newReadWriteTransaction(); + transactionInFlight = transaction; + recordPendingTransaction(command, transaction); + command.execute(transaction); + Futures.addCallback(transaction.submit(), new FutureCallback() { + @Override + public void onSuccess(final Void result) { + forgetSuccessfulTransaction(transaction); + command.onSuccess(); } - command.onSuccess(); - } - - @Override - public void onFailure(final Throwable throwable) { - command.onFailure(throwable); - // NOOP - handled by failure of transaction chain - } - }, MoreExecutors.directExecutor()); + + @Override + public void onFailure(final Throwable throwable) { + command.onFailure(throwable); + // NOOP - handled by failure of transaction chain + } + }, MoreExecutors.directExecutor()); + } } } catch (IllegalStateException e) { if (transactionInFlight != null) { @@ -134,7 +133,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha } @VisibleForTesting - List extractResubmitCommands() { + synchronized List extractResubmitCommands() { AsyncTransaction transaction = failedTransactionQueue.poll(); List commands = new ArrayList<>(); if (transaction != null) { @@ -150,16 +149,20 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha } @VisibleForTesting - void resetTransactionQueue() { + synchronized void resetTransactionQueue() { chain.close(); chain = db.createTransactionChain(this); - pendingTransactions = new ArrayList<>(); - transactionToCommand = new HashMap<>(); + pendingTransactions.clear(); + transactionToCommand.clear(); failedTransactionQueue.clear(); - successfulTransactionQueue.clear(); } - private void recordPendingTransaction(final TransactionCommand command, + synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) { + pendingTransactions.remove(transaction); + transactionToCommand.remove(transaction); + } + + private synchronized void recordPendingTransaction(final TransactionCommand command, final ReadWriteTransaction transaction) { transactionToCommand.put(transaction, command); pendingTransactions.add(transaction); @@ -180,15 +183,6 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha return result; } - private void forgetSuccessfulTransactions() { - ReadWriteTransaction transaction = successfulTransactionQueue.poll(); - while (transaction != null) { - pendingTransactions.remove(transaction); - transactionToCommand.remove(transaction); - transaction = successfulTransactionQueue.poll(); - } - } - @Override public void close() throws InterruptedException { this.chain.close(); diff --git a/southbound/southbound-impl/src/test/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImplTest.java b/southbound/southbound-impl/src/test/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImplTest.java index 1b237aecd..58a16febc 100644 --- a/southbound/southbound-impl/src/test/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImplTest.java +++ b/southbound/southbound-impl/src/test/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImplTest.java @@ -5,13 +5,12 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.ovsdb.southbound.transactions.md; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -142,7 +141,6 @@ public class TransactionInvokerImplTest { transactionToCommand); getField(TransactionInvokerImpl.class, "failedTransactionQueue").set(transactionInvokerImpl, failedTransactionQ); - getField(TransactionInvokerImpl.class, "successfulTransactionQueue").set(transactionInvokerImpl, successfulTxQ); Whitebox.invokeMethod(transactionInvokerImpl, "resetTransactionQueue"); assertNotNull(Whitebox.getInternalState(transactionInvokerImpl, "pendingTransactions")); @@ -195,27 +193,6 @@ public class TransactionInvokerImplTest { assertEquals(testResult, Whitebox.invokeMethod(transactionInvokerImpl, "extractCommandsFromQueue")); } - @Test - public void testForgetSuccessfulTransactions() throws Exception { - ReadWriteTransaction transaction = mock(ReadWriteTransaction.class); - successfulTxQ.add(transaction); - pendingTransactions.add(transaction); - transactionToCommand.put(transaction, mock(TransactionCommand.class)); - getField(TransactionInvokerImpl.class, "successfulTransactionQueue").set(transactionInvokerImpl, successfulTxQ); - getField(TransactionInvokerImpl.class, "pendingTransactions").set(transactionInvokerImpl, pendingTransactions); - getField(TransactionInvokerImpl.class, "transactionToCommand").set(transactionInvokerImpl, - transactionToCommand); - - Whitebox.invokeMethod(transactionInvokerImpl, "forgetSuccessfulTransactions"); - - List testPendingTransactions = Whitebox.getInternalState(transactionInvokerImpl, - "pendingTransactions"); - Map testTransactionToCommand = Whitebox - .getInternalState(transactionInvokerImpl, "transactionToCommand"); - assertTrue(testPendingTransactions.isEmpty()); - assertTrue(testTransactionToCommand.isEmpty()); - } - @Test public void testClose() throws Exception { getField(TransactionInvokerImpl.class, "executor").set(transactionInvokerImpl, executor);