Eliminate TransactionInvokerImpl.successfulTransactionQueue 44/86244/16
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 5 Dec 2019 21:12:31 +0000 (22:12 +0100)
committerStephen Kitt <skitt@redhat.com>
Tue, 7 Jan 2020 16:04:52 +0000 (16:04 +0000)
This queue is actually just a hand-off between the datastore callback
thread and the central thread. There is no need for such a queue, as
we can just properly synchronize access to the critical structures.

JIRA: OVSDB-428
Change-Id: Idba05aebfa7fd35dbdb706b8bfef03e3e03d7772
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionInvokerImpl.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java
southbound/southbound-impl/src/test/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImplTest.java

index 201e6b8154230aef185a40b3933bd9370459f2c9..e9d2dc348a3040ddf407ee3c77b273dcdd1236e1 100644 (file)
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -43,12 +44,13 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private final DataBroker db;
     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
-            new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final ExecutorService executor;
 
-    private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
-    private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+    @GuardedBy("this")
+    private final Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
+    @GuardedBy("this")
+    private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+
     private BindingTransactionChain chain;
     //This is made volatile as it is accessed from uncaught exception handler thread also
     private volatile ReadWriteTransaction transactionInFlight = null;
@@ -87,9 +89,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     @Override
     public void run() {
         while (true) {
-            forgetSuccessfulTransactions();
-
-            List<TransactionCommand> commands = null;
+            final List<TransactionCommand> commands;
             try {
                 commands = extractCommands();
             } catch (InterruptedException e) {
@@ -100,26 +100,25 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
             try {
                 while (commandIterator.hasNext()) {
                     TransactionCommand command = commandIterator.next();
-                    final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
-                    transactionInFlight = transaction;
-                    recordPendingTransaction(command, transaction);
-                    command.execute(transaction);
-                    ListenableFuture<Void> ft = transaction.submit();
-                    command.setTransactionResultFuture(ft);
-                    Futures.addCallback(ft, new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(final Void result) {
-                            if (!successfulTransactionQueue.offer(transaction)) {
-                                LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
-                                        successfulTransactionQueue.size(), transaction);
+                    synchronized (this) {
+                        final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+                        transactionInFlight = transaction;
+                        recordPendingTransaction(command, transaction);
+                        command.execute(transaction);
+                        ListenableFuture<Void> ft = transaction.submit();
+                        command.setTransactionResultFuture(ft);
+                        Futures.addCallback(ft, new FutureCallback<Void>() {
+                            @Override
+                            public void onSuccess(final Void result) {
+                                forgetSuccessfulTransaction(transaction);
                             }
-                        }
 
-                        @Override
-                        public void onFailure(final Throwable throwable) {
-                            // NOOP - handled by failure of transaction chain
-                        }
-                    }, MoreExecutors.directExecutor());
+                            @Override
+                            public void onFailure(final Throwable throwable) {
+                                // NOOP - handled by failure of transaction chain
+                            }
+                        }, MoreExecutors.directExecutor());
+                    }
                 }
                 transactionInFlight = null;
             } catch (IllegalStateException e) {
@@ -143,19 +142,21 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     private List<TransactionCommand> extractResubmitCommands() {
-        AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
         List<TransactionCommand> commands = new ArrayList<>();
-        if (transaction != null) {
-            int index = pendingTransactions.lastIndexOf(transaction);
-            //This logic needs to be revisited. Is it ok to resubmit these things again ?
-            //are these operations idempotent ?
-            //Does the transaction chain execute n+1th if nth one threw error ?
-            List<ReadWriteTransaction> transactions =
-                    pendingTransactions.subList(index, pendingTransactions.size() - 1);
-            for (ReadWriteTransaction tx: transactions) {
-                commands.add(transactionToCommand.get(tx));
+        synchronized (this) {
+            AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
+            if (transaction != null) {
+                int index = pendingTransactions.lastIndexOf(transaction);
+                //This logic needs to be revisited. Is it ok to resubmit these things again ?
+                //are these operations idempotent ?
+                //Does the transaction chain execute n+1th if nth one threw error ?
+                List<ReadWriteTransaction> transactions =
+                        pendingTransactions.subList(index, pendingTransactions.size() - 1);
+                for (ReadWriteTransaction tx: transactions) {
+                    commands.add(transactionToCommand.get(tx));
+                }
+                resetTransactionQueue();
             }
-            resetTransactionQueue();
         }
         if (commandIterator != null) {
             while (commandIterator.hasNext()) {
@@ -168,13 +169,17 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private void resetTransactionQueue() {
         chain.close();
         chain = db.createTransactionChain(this);
-        pendingTransactions = new ArrayList<>();
-        transactionToCommand = new HashMap<>();
+        pendingTransactions.clear();
+        transactionToCommand.clear();
         failedTransactionQueue.clear();
-        successfulTransactionQueue.clear();
     }
 
-    private void recordPendingTransaction(final TransactionCommand command,
+    synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
+        pendingTransactions.remove(transaction);
+        transactionToCommand.remove(transaction);
+    }
+
+    private synchronized void recordPendingTransaction(final TransactionCommand command,
             final ReadWriteTransaction transaction) {
         transactionToCommand.put(transaction, command);
         pendingTransactions.add(transaction);
@@ -199,15 +204,6 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         return result;
     }
 
-    private void forgetSuccessfulTransactions() {
-        ReadWriteTransaction transaction = successfulTransactionQueue.poll();
-        while (transaction != null) {
-            pendingTransactions.remove(transaction);
-            transactionToCommand.remove(transaction);
-            transaction = successfulTransactionQueue.poll();
-        }
-    }
-
     @Override
     public void close() throws Exception {
         this.chain.close();
index b0baa6ebfc6a4eeff16718d137735d44c58c1b8d..98b423e11054ea2345c0dfc1f1adc691721e619c 100644 (file)
@@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -39,14 +40,15 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private final DataBroker db;
     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
-            new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final ExecutorService executor;
 
     private final AtomicBoolean runTask = new AtomicBoolean(true);
 
-    private Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
-    private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+    @GuardedBy("this")
+    private final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
+    @GuardedBy("this")
+    private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+
     private BindingTransactionChain chain;
 
     public TransactionInvokerImpl(final DataBroker db) {
@@ -80,9 +82,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     @Override
     public void run() {
         while (runTask.get()) {
-            forgetSuccessfulTransactions();
-
-            List<TransactionCommand> commands = null;
+            final List<TransactionCommand> commands;
             try {
                 commands = extractCommands();
             } catch (InterruptedException e) {
@@ -93,26 +93,25 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
             ReadWriteTransaction transactionInFlight = null;
             try {
                 for (TransactionCommand command: commands) {
-                    final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
-                    transactionInFlight = transaction;
-                    recordPendingTransaction(command, transaction);
-                    command.execute(transaction);
-                    Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(final Void result) {
-                            if (!successfulTransactionQueue.offer(transaction)) {
-                                LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
-                                        successfulTransactionQueue.size(), transaction);
+                    synchronized (this) {
+                        final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+                        transactionInFlight = transaction;
+                        recordPendingTransaction(command, transaction);
+                        command.execute(transaction);
+                        Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+                            @Override
+                            public void onSuccess(final Void result) {
+                                forgetSuccessfulTransaction(transaction);
+                                command.onSuccess();
                             }
-                            command.onSuccess();
-                        }
-
-                        @Override
-                        public void onFailure(final Throwable throwable) {
-                            command.onFailure(throwable);
-                            // NOOP - handled by failure of transaction chain
-                        }
-                    }, MoreExecutors.directExecutor());
+
+                            @Override
+                            public void onFailure(final Throwable throwable) {
+                                command.onFailure(throwable);
+                                // NOOP - handled by failure of transaction chain
+                            }
+                        }, MoreExecutors.directExecutor());
+                    }
                 }
             } catch (IllegalStateException e) {
                 if (transactionInFlight != null) {
@@ -134,7 +133,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     @VisibleForTesting
-    List<TransactionCommand> extractResubmitCommands() {
+    synchronized List<TransactionCommand> extractResubmitCommands() {
         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
         List<TransactionCommand> commands = new ArrayList<>();
         if (transaction != null) {
@@ -150,16 +149,20 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     @VisibleForTesting
-    void resetTransactionQueue() {
+    synchronized void resetTransactionQueue() {
         chain.close();
         chain = db.createTransactionChain(this);
-        pendingTransactions = new ArrayList<>();
-        transactionToCommand = new HashMap<>();
+        pendingTransactions.clear();
+        transactionToCommand.clear();
         failedTransactionQueue.clear();
-        successfulTransactionQueue.clear();
     }
 
-    private void recordPendingTransaction(final TransactionCommand command,
+    synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
+        pendingTransactions.remove(transaction);
+        transactionToCommand.remove(transaction);
+    }
+
+    private synchronized void recordPendingTransaction(final TransactionCommand command,
             final ReadWriteTransaction transaction) {
         transactionToCommand.put(transaction, command);
         pendingTransactions.add(transaction);
@@ -180,15 +183,6 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         return result;
     }
 
-    private void forgetSuccessfulTransactions() {
-        ReadWriteTransaction transaction = successfulTransactionQueue.poll();
-        while (transaction != null) {
-            pendingTransactions.remove(transaction);
-            transactionToCommand.remove(transaction);
-            transaction = successfulTransactionQueue.poll();
-        }
-    }
-
     @Override
     public void close() throws InterruptedException {
         this.chain.close();
index 1b237aecdb838d6d6a31b2deafeccb8ccb6322e5..58a16febc2bfb248b99a84455a2bfcdfe83505a5 100644 (file)
@@ -5,13 +5,12 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.ovsdb.southbound.transactions.md;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -142,7 +141,6 @@ public class TransactionInvokerImplTest {
             transactionToCommand);
         getField(TransactionInvokerImpl.class, "failedTransactionQueue").set(transactionInvokerImpl,
             failedTransactionQ);
-        getField(TransactionInvokerImpl.class, "successfulTransactionQueue").set(transactionInvokerImpl, successfulTxQ);
 
         Whitebox.invokeMethod(transactionInvokerImpl, "resetTransactionQueue");
         assertNotNull(Whitebox.getInternalState(transactionInvokerImpl, "pendingTransactions"));
@@ -195,27 +193,6 @@ public class TransactionInvokerImplTest {
         assertEquals(testResult, Whitebox.invokeMethod(transactionInvokerImpl, "extractCommandsFromQueue"));
     }
 
-    @Test
-    public void testForgetSuccessfulTransactions() throws Exception {
-        ReadWriteTransaction transaction = mock(ReadWriteTransaction.class);
-        successfulTxQ.add(transaction);
-        pendingTransactions.add(transaction);
-        transactionToCommand.put(transaction, mock(TransactionCommand.class));
-        getField(TransactionInvokerImpl.class, "successfulTransactionQueue").set(transactionInvokerImpl, successfulTxQ);
-        getField(TransactionInvokerImpl.class, "pendingTransactions").set(transactionInvokerImpl, pendingTransactions);
-        getField(TransactionInvokerImpl.class, "transactionToCommand").set(transactionInvokerImpl,
-            transactionToCommand);
-
-        Whitebox.invokeMethod(transactionInvokerImpl, "forgetSuccessfulTransactions");
-
-        List<ReadWriteTransaction> testPendingTransactions = Whitebox.getInternalState(transactionInvokerImpl,
-                "pendingTransactions");
-        Map<ReadWriteTransaction, TransactionCommand> testTransactionToCommand = Whitebox
-                .getInternalState(transactionInvokerImpl, "transactionToCommand");
-        assertTrue(testPendingTransactions.isEmpty());
-        assertTrue(testTransactionToCommand.isEmpty());
-    }
-
     @Test
     public void testClose() throws Exception {
         getField(TransactionInvokerImpl.class, "executor").set(transactionInvokerImpl, executor);