cca624d292b7452a8b3a2dd0e6b90e1104e3b457
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import java.util.ArrayList;
17 import java.util.HashMap;
18 import java.util.Iterator;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ThreadFactory;
26 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /*  TODO:
36  * Copied over as-is from southbound plugin. Good candidate to be common
37  * when refactoring code.
38  */
39 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable,
40         Thread.UncaughtExceptionHandler {
41     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
42     private static final int QUEUE_SIZE = 10000;
43     private BindingTransactionChain chain;
44     private final DataBroker db;
45     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
46     private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
47         = new LinkedBlockingQueue<>(QUEUE_SIZE);
48     private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
49         = new LinkedBlockingQueue<>(QUEUE_SIZE);
50     private final ExecutorService executor;
51     private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
52         = new HashMap<>();
53     private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
54     //This is made volatile as it is accessed from uncaught exception handler thread also
55     private volatile ReadWriteTransaction transactionInFlight = null;
56     private Iterator<TransactionCommand> commandIterator = null;
57
58     public TransactionInvokerImpl(DataBroker db) {
59         this.db = db;
60         this.chain = db.createTransactionChain(this);
61         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
62                 .setUncaughtExceptionHandler(this).build();
63         executor = Executors.newSingleThreadExecutor(threadFact);
64         //Using the execute method here so that un caught exception handler gets triggered upon exception.
65         //The other way to do it is using submit method and wait on the future to catch any exceptions
66         executor.execute(this);
67     }
68
69     @Override
70     public void invoke(final TransactionCommand command) {
71         // TODO what do we do if queue is full?
72         if (!inputQueue.offer(command)) {
73             LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
74         }
75     }
76
77     @Override
78     public void onTransactionChainFailed(TransactionChain<?, ?> txChain,
79             AsyncTransaction<?, ?> transaction, Throwable cause) {
80         offerFailedTransaction(transaction);
81     }
82
83     @Override
84     public void onTransactionChainSuccessful(TransactionChain<?, ?> txChain) {
85         // NO OP
86     }
87
88     @Override
89     public void run() {
90         while (true) {
91             forgetSuccessfulTransactions();
92
93             List<TransactionCommand> commands = null;
94             try {
95                 commands = extractCommands();
96             } catch (InterruptedException e) {
97                 LOG.warn("Extracting commands was interrupted.", e);
98                 continue;
99             }
100             commandIterator = commands.iterator();
101             try {
102                 while (commandIterator.hasNext()) {
103                     TransactionCommand command = commandIterator.next();
104                     final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
105                     transactionInFlight = transaction;
106                     recordPendingTransaction(command, transaction);
107                     command.execute(transaction);
108                     ListenableFuture<Void> ft = transaction.submit();
109                     command.setTransactionResultFuture(ft);
110                     Futures.addCallback(ft, new FutureCallback<Void>() {
111                         @Override
112                         public void onSuccess(final Void result) {
113                             if (!successfulTransactionQueue.offer(transaction)) {
114                                 LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
115                                         successfulTransactionQueue.size(), transaction);
116                             }
117                         }
118
119                         @Override
120                         public void onFailure(final Throwable throwable) {
121                             // NOOP - handled by failure of transaction chain
122                         }
123                     }, MoreExecutors.directExecutor());
124                 }
125                 transactionInFlight = null;
126             } catch (IllegalStateException e) {
127                 if (transactionInFlight != null) {
128                     // TODO: This method should distinguish exceptions on which the command should be
129                     // retried from exceptions on which the command should NOT be retried.
130                     // Then it should retry only the commands which should be retried, otherwise
131                     // this method will retry commands which will never be successful forever.
132                     offerFailedTransaction(transactionInFlight);
133                 }
134                 transactionInFlight = null;
135                 LOG.warn("Failed to process an update notification from OVS.", e);
136             }
137         }
138     }
139
140     private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
141         if (!failedTransactionQueue.offer(transaction)) {
142             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
143         }
144     }
145
146     private List<TransactionCommand> extractResubmitCommands() {
147         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
148         List<TransactionCommand> commands = new ArrayList<>();
149         if (transaction != null) {
150             int index = pendingTransactions.lastIndexOf(transaction);
151             //This logic needs to be revisited. Is it ok to resubmit these things again ?
152             //are these operations idempotent ?
153             //Does the transaction chain execute n+1th if nth one threw error ?
154             List<ReadWriteTransaction> transactions =
155                     pendingTransactions.subList(index, pendingTransactions.size() - 1);
156             for (ReadWriteTransaction tx: transactions) {
157                 commands.add(transactionToCommand.get(tx));
158             }
159             resetTransactionQueue();
160         }
161         if (commandIterator != null) {
162             while (commandIterator.hasNext()) {
163                 commands.add(commandIterator.next());
164             }
165         }
166         return commands;
167     }
168
169     private void resetTransactionQueue() {
170         chain.close();
171         chain = db.createTransactionChain(this);
172         pendingTransactions = new ArrayList<>();
173         transactionToCommand = new HashMap<>();
174         failedTransactionQueue.clear();
175         successfulTransactionQueue.clear();
176     }
177
178     private void recordPendingTransaction(TransactionCommand command,
179             final ReadWriteTransaction transaction) {
180         transactionToCommand.put(transaction, command);
181         pendingTransactions.add(transaction);
182     }
183
184     private List<TransactionCommand> extractCommands() throws InterruptedException {
185         List<TransactionCommand> commands = extractResubmitCommands();
186         if (!commands.isEmpty() && inputQueue.isEmpty()) {
187             //we got some commands to be executed let us not sit and wait on empty queue
188             return commands;
189         }
190         //pull commands from queue if not empty , otherwise wait for commands to be placed in queue.
191         commands.addAll(extractCommandsFromQueue());
192         return commands;
193     }
194
195     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
196         List<TransactionCommand> result = new ArrayList<>();
197         TransactionCommand command = inputQueue.take();
198         while (command != null) {
199             result.add(command);
200             command = inputQueue.poll();
201         }
202         return result;
203     }
204
205     private void forgetSuccessfulTransactions() {
206         ReadWriteTransaction transaction = successfulTransactionQueue.poll();
207         while (transaction != null) {
208             pendingTransactions.remove(transaction);
209             transactionToCommand.remove(transaction);
210             transaction = successfulTransactionQueue.poll();
211         }
212     }
213
214     @Override
215     public void close() throws Exception {
216         this.chain.close();
217         this.executor.shutdown();
218     }
219
220     @Override
221     public void uncaughtException(Thread thread, Throwable ex) {
222         LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex);
223         if (transactionInFlight != null) {
224             offerFailedTransaction(transactionInFlight);
225         }
226         transactionInFlight = null;
227         executor.execute(this);
228     }
229 }