Speed up inputQueue interaction
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ThreadFactory;
25 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /*  TODO:
35  * Copied over as-is from southbound plugin. Good candidate to be common
36  * when refactoring code.
37  */
38 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable,
39         Thread.UncaughtExceptionHandler {
40     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
41     private static final int QUEUE_SIZE = 10000;
42
43     private final DataBroker db;
44     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
45     private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
46     private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
47             new LinkedBlockingQueue<>(QUEUE_SIZE);
48     private final ExecutorService executor;
49
50     private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
51     private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
52     private BindingTransactionChain chain;
53     //This is made volatile as it is accessed from uncaught exception handler thread also
54     private volatile ReadWriteTransaction transactionInFlight = null;
55     private Iterator<TransactionCommand> commandIterator = null;
56
57     public TransactionInvokerImpl(final DataBroker db) {
58         this.db = db;
59         this.chain = db.createTransactionChain(this);
60         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
61                 .setUncaughtExceptionHandler(this).build();
62         executor = Executors.newSingleThreadExecutor(threadFact);
63         //Using the execute method here so that un caught exception handler gets triggered upon exception.
64         //The other way to do it is using submit method and wait on the future to catch any exceptions
65         executor.execute(this);
66     }
67
68     @Override
69     public void invoke(final TransactionCommand command) {
70         // TODO what do we do if queue is full?
71         if (!inputQueue.offer(command)) {
72             LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
73         }
74     }
75
76     @Override
77     public void onTransactionChainFailed(final TransactionChain<?, ?> txChain,
78             final AsyncTransaction<?, ?> transaction, final Throwable cause) {
79         offerFailedTransaction(transaction);
80     }
81
82     @Override
83     public void onTransactionChainSuccessful(final TransactionChain<?, ?> txChain) {
84         // NO OP
85     }
86
87     @Override
88     public void run() {
89         while (true) {
90             forgetSuccessfulTransactions();
91
92             List<TransactionCommand> commands = null;
93             try {
94                 commands = extractCommands();
95             } catch (InterruptedException e) {
96                 LOG.warn("Extracting commands was interrupted.", e);
97                 continue;
98             }
99             commandIterator = commands.iterator();
100             try {
101                 while (commandIterator.hasNext()) {
102                     TransactionCommand command = commandIterator.next();
103                     final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
104                     transactionInFlight = transaction;
105                     recordPendingTransaction(command, transaction);
106                     command.execute(transaction);
107                     ListenableFuture<Void> ft = transaction.submit();
108                     command.setTransactionResultFuture(ft);
109                     Futures.addCallback(ft, new FutureCallback<Void>() {
110                         @Override
111                         public void onSuccess(final Void result) {
112                             if (!successfulTransactionQueue.offer(transaction)) {
113                                 LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
114                                         successfulTransactionQueue.size(), transaction);
115                             }
116                         }
117
118                         @Override
119                         public void onFailure(final Throwable throwable) {
120                             // NOOP - handled by failure of transaction chain
121                         }
122                     }, MoreExecutors.directExecutor());
123                 }
124                 transactionInFlight = null;
125             } catch (IllegalStateException e) {
126                 if (transactionInFlight != null) {
127                     // TODO: This method should distinguish exceptions on which the command should be
128                     // retried from exceptions on which the command should NOT be retried.
129                     // Then it should retry only the commands which should be retried, otherwise
130                     // this method will retry commands which will never be successful forever.
131                     offerFailedTransaction(transactionInFlight);
132                 }
133                 transactionInFlight = null;
134                 LOG.warn("Failed to process an update notification from OVS.", e);
135             }
136         }
137     }
138
139     private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
140         if (!failedTransactionQueue.offer(transaction)) {
141             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
142         }
143     }
144
145     private List<TransactionCommand> extractResubmitCommands() {
146         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
147         List<TransactionCommand> commands = new ArrayList<>();
148         if (transaction != null) {
149             int index = pendingTransactions.lastIndexOf(transaction);
150             //This logic needs to be revisited. Is it ok to resubmit these things again ?
151             //are these operations idempotent ?
152             //Does the transaction chain execute n+1th if nth one threw error ?
153             List<ReadWriteTransaction> transactions =
154                     pendingTransactions.subList(index, pendingTransactions.size() - 1);
155             for (ReadWriteTransaction tx: transactions) {
156                 commands.add(transactionToCommand.get(tx));
157             }
158             resetTransactionQueue();
159         }
160         if (commandIterator != null) {
161             while (commandIterator.hasNext()) {
162                 commands.add(commandIterator.next());
163             }
164         }
165         return commands;
166     }
167
168     private void resetTransactionQueue() {
169         chain.close();
170         chain = db.createTransactionChain(this);
171         pendingTransactions = new ArrayList<>();
172         transactionToCommand = new HashMap<>();
173         failedTransactionQueue.clear();
174         successfulTransactionQueue.clear();
175     }
176
177     private void recordPendingTransaction(final TransactionCommand command,
178             final ReadWriteTransaction transaction) {
179         transactionToCommand.put(transaction, command);
180         pendingTransactions.add(transaction);
181     }
182
183     private List<TransactionCommand> extractCommands() throws InterruptedException {
184         List<TransactionCommand> commands = extractResubmitCommands();
185         if (!commands.isEmpty() && inputQueue.isEmpty()) {
186             //we got some commands to be executed let us not sit and wait on empty queue
187             return commands;
188         }
189         //pull commands from queue if not empty , otherwise wait for commands to be placed in queue.
190         commands.addAll(extractCommandsFromQueue());
191         return commands;
192     }
193
194     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
195         List<TransactionCommand> result = new ArrayList<>();
196         TransactionCommand command = inputQueue.take();
197         result.add(command);
198         inputQueue.drainTo(result);
199         return result;
200     }
201
202     private void forgetSuccessfulTransactions() {
203         ReadWriteTransaction transaction = successfulTransactionQueue.poll();
204         while (transaction != null) {
205             pendingTransactions.remove(transaction);
206             transactionToCommand.remove(transaction);
207             transaction = successfulTransactionQueue.poll();
208         }
209     }
210
211     @Override
212     public void close() throws Exception {
213         this.chain.close();
214         this.executor.shutdown();
215     }
216
217     @Override
218     public void uncaughtException(final Thread thread, final Throwable ex) {
219         LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex);
220         if (transactionInFlight != null) {
221             offerFailedTransaction(transactionInFlight);
222         }
223         transactionInFlight = null;
224         executor.execute(this);
225     }
226 }