Eliminate TransactionInvokerImpl.successfulTransactionQueue
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ThreadFactory;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /*  TODO:
36  * Copied over as-is from southbound plugin. Good candidate to be common
37  * when refactoring code.
38  */
39 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable,
40         Thread.UncaughtExceptionHandler {
41     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
42     private static final int QUEUE_SIZE = 10000;
43
44     private final DataBroker db;
45     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
46     private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
47     private final ExecutorService executor;
48
49     @GuardedBy("this")
50     private final Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
51     @GuardedBy("this")
52     private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
53
54     private BindingTransactionChain chain;
55     //This is made volatile as it is accessed from uncaught exception handler thread also
56     private volatile ReadWriteTransaction transactionInFlight = null;
57     private Iterator<TransactionCommand> commandIterator = null;
58
59     public TransactionInvokerImpl(final DataBroker db) {
60         this.db = db;
61         this.chain = db.createTransactionChain(this);
62         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
63                 .setUncaughtExceptionHandler(this).build();
64         executor = Executors.newSingleThreadExecutor(threadFact);
65         //Using the execute method here so that un caught exception handler gets triggered upon exception.
66         //The other way to do it is using submit method and wait on the future to catch any exceptions
67         executor.execute(this);
68     }
69
70     @Override
71     public void invoke(final TransactionCommand command) {
72         // TODO what do we do if queue is full?
73         if (!inputQueue.offer(command)) {
74             LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
75         }
76     }
77
78     @Override
79     public void onTransactionChainFailed(final TransactionChain<?, ?> txChain,
80             final AsyncTransaction<?, ?> transaction, final Throwable cause) {
81         offerFailedTransaction(transaction);
82     }
83
84     @Override
85     public void onTransactionChainSuccessful(final TransactionChain<?, ?> txChain) {
86         // NO OP
87     }
88
89     @Override
90     public void run() {
91         while (true) {
92             final List<TransactionCommand> commands;
93             try {
94                 commands = extractCommands();
95             } catch (InterruptedException e) {
96                 LOG.warn("Extracting commands was interrupted.", e);
97                 continue;
98             }
99             commandIterator = commands.iterator();
100             try {
101                 while (commandIterator.hasNext()) {
102                     TransactionCommand command = commandIterator.next();
103                     synchronized (this) {
104                         final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
105                         transactionInFlight = transaction;
106                         recordPendingTransaction(command, transaction);
107                         command.execute(transaction);
108                         ListenableFuture<Void> ft = transaction.submit();
109                         command.setTransactionResultFuture(ft);
110                         Futures.addCallback(ft, new FutureCallback<Void>() {
111                             @Override
112                             public void onSuccess(final Void result) {
113                                 forgetSuccessfulTransaction(transaction);
114                             }
115
116                             @Override
117                             public void onFailure(final Throwable throwable) {
118                                 // NOOP - handled by failure of transaction chain
119                             }
120                         }, MoreExecutors.directExecutor());
121                     }
122                 }
123                 transactionInFlight = null;
124             } catch (IllegalStateException e) {
125                 if (transactionInFlight != null) {
126                     // TODO: This method should distinguish exceptions on which the command should be
127                     // retried from exceptions on which the command should NOT be retried.
128                     // Then it should retry only the commands which should be retried, otherwise
129                     // this method will retry commands which will never be successful forever.
130                     offerFailedTransaction(transactionInFlight);
131                 }
132                 transactionInFlight = null;
133                 LOG.warn("Failed to process an update notification from OVS.", e);
134             }
135         }
136     }
137
138     private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
139         if (!failedTransactionQueue.offer(transaction)) {
140             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
141         }
142     }
143
144     private List<TransactionCommand> extractResubmitCommands() {
145         List<TransactionCommand> commands = new ArrayList<>();
146         synchronized (this) {
147             AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
148             if (transaction != null) {
149                 int index = pendingTransactions.lastIndexOf(transaction);
150                 //This logic needs to be revisited. Is it ok to resubmit these things again ?
151                 //are these operations idempotent ?
152                 //Does the transaction chain execute n+1th if nth one threw error ?
153                 List<ReadWriteTransaction> transactions =
154                         pendingTransactions.subList(index, pendingTransactions.size() - 1);
155                 for (ReadWriteTransaction tx: transactions) {
156                     commands.add(transactionToCommand.get(tx));
157                 }
158                 resetTransactionQueue();
159             }
160         }
161         if (commandIterator != null) {
162             while (commandIterator.hasNext()) {
163                 commands.add(commandIterator.next());
164             }
165         }
166         return commands;
167     }
168
169     private void resetTransactionQueue() {
170         chain.close();
171         chain = db.createTransactionChain(this);
172         pendingTransactions.clear();
173         transactionToCommand.clear();
174         failedTransactionQueue.clear();
175     }
176
177     synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
178         pendingTransactions.remove(transaction);
179         transactionToCommand.remove(transaction);
180     }
181
182     private synchronized void recordPendingTransaction(final TransactionCommand command,
183             final ReadWriteTransaction transaction) {
184         transactionToCommand.put(transaction, command);
185         pendingTransactions.add(transaction);
186     }
187
188     private List<TransactionCommand> extractCommands() throws InterruptedException {
189         List<TransactionCommand> commands = extractResubmitCommands();
190         if (!commands.isEmpty() && inputQueue.isEmpty()) {
191             //we got some commands to be executed let us not sit and wait on empty queue
192             return commands;
193         }
194         //pull commands from queue if not empty , otherwise wait for commands to be placed in queue.
195         commands.addAll(extractCommandsFromQueue());
196         return commands;
197     }
198
199     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
200         List<TransactionCommand> result = new ArrayList<>();
201         TransactionCommand command = inputQueue.take();
202         result.add(command);
203         inputQueue.drainTo(result);
204         return result;
205     }
206
207     @Override
208     public void close() throws Exception {
209         this.chain.close();
210         this.executor.shutdown();
211     }
212
213     @Override
214     public void uncaughtException(final Thread thread, final Throwable ex) {
215         LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex);
216         if (transactionInFlight != null) {
217             offerFailedTransaction(transactionInFlight);
218         }
219         transactionInFlight = null;
220         executor.execute(this);
221     }
222 }