Bump versions by x.(y+1).z
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
9
10 import com.google.common.util.concurrent.FluentFuture;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14 import java.lang.Thread.UncaughtExceptionHandler;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ThreadFactory;
25 import javax.annotation.PreDestroy;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.checkerframework.checker.lock.qual.GuardedBy;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
31 import org.opendaylight.mdsal.binding.api.Transaction;
32 import org.opendaylight.mdsal.binding.api.TransactionChain;
33 import org.osgi.service.component.annotations.Activate;
34 import org.osgi.service.component.annotations.Component;
35 import org.osgi.service.component.annotations.Deactivate;
36 import org.osgi.service.component.annotations.Reference;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /*  TODO:
41  * Copied over as-is from southbound plugin. Good candidate to be common
42  * when refactoring code.
43  */
44 @Singleton
45 @Component(service = TransactionInvoker.class)
46 public final class TransactionInvokerImpl implements TransactionInvoker, Runnable, AutoCloseable,
47         UncaughtExceptionHandler {
48     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
49     private static final int QUEUE_SIZE = 10000;
50
51     private final DataBroker db;
52     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
53     private final BlockingQueue<Transaction> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
54     private final ExecutorService executor;
55
56     @GuardedBy("this")
57     private final Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
58     @GuardedBy("this")
59     private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
60
61     private TransactionChain chain;
62     //This is made volatile as it is accessed from uncaught exception handler thread also
63     private volatile ReadWriteTransaction transactionInFlight = null;
64     private Iterator<TransactionCommand> commandIterator = null;
65
66     @Inject
67     @Activate
68     public TransactionInvokerImpl(@Reference final DataBroker db) {
69         this.db = db;
70         chain = db.createTransactionChain();
71         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
72                 .setUncaughtExceptionHandler(this).build();
73         executor = Executors.newSingleThreadExecutor(threadFact);
74         //Using the execute method here so that un caught exception handler gets triggered upon exception.
75         //The other way to do it is using submit method and wait on the future to catch any exceptions
76         executor.execute(this);
77     }
78
79     @Override
80     public void invoke(final TransactionCommand command) {
81         // TODO what do we do if queue is full?
82         if (!inputQueue.offer(command)) {
83             LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
84         }
85     }
86
87     @Override
88     public void run() {
89         while (true) {
90             final List<TransactionCommand> commands;
91             try {
92                 commands = extractCommands();
93             } catch (InterruptedException e) {
94                 LOG.warn("Extracting commands was interrupted.", e);
95                 continue;
96             }
97             commandIterator = commands.iterator();
98             try {
99                 while (commandIterator.hasNext()) {
100                     executeCommand(commandIterator.next());
101                 }
102                 transactionInFlight = null;
103             } catch (IllegalStateException e) {
104                 if (transactionInFlight != null) {
105                     // TODO: This method should distinguish exceptions on which the command should be
106                     // retried from exceptions on which the command should NOT be retried.
107                     // Then it should retry only the commands which should be retried, otherwise
108                     // this method will retry commands which will never be successful forever.
109                     offerFailedTransaction(transactionInFlight);
110                 }
111                 transactionInFlight = null;
112                 LOG.warn("Failed to process an update notification from OVS.", e);
113             }
114         }
115     }
116
117     private synchronized void executeCommand(final TransactionCommand command) {
118         final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
119         transactionInFlight = transaction;
120         recordPendingTransaction(command, transaction);
121         command.execute(transaction);
122         FluentFuture<?> ft = transaction.commit();
123         command.setTransactionResultFuture(ft);
124         ft.addCallback(new FutureCallback<Object>() {
125             @Override
126             public void onSuccess(final Object result) {
127                 forgetSuccessfulTransaction(transaction);
128                 command.onSuccess();
129             }
130
131             @Override
132             public void onFailure(final Throwable throwable) {
133                 offerFailedTransaction(transaction);
134                 command.onFailure();
135             }
136         }, MoreExecutors.directExecutor());
137     }
138
139     private void offerFailedTransaction(final Transaction transaction) {
140         if (!failedTransactionQueue.offer(transaction)) {
141             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
142         }
143     }
144
145     private List<TransactionCommand> extractResubmitCommands() {
146         List<TransactionCommand> commands = new ArrayList<>();
147         synchronized (this) {
148             Transaction transaction = failedTransactionQueue.poll();
149             if (transaction != null) {
150                 int index = pendingTransactions.lastIndexOf(transaction);
151                 //This logic needs to be revisited. Is it ok to resubmit these things again ?
152                 //are these operations idempotent ?
153                 //Does the transaction chain execute n+1th if nth one threw error ?
154                 List<ReadWriteTransaction> transactions =
155                         pendingTransactions.subList(index, pendingTransactions.size() - 1);
156                 for (ReadWriteTransaction tx: transactions) {
157                     commands.add(transactionToCommand.get(tx));
158                 }
159                 resetTransactionQueue();
160             }
161         }
162         if (commandIterator != null) {
163             while (commandIterator.hasNext()) {
164                 commands.add(commandIterator.next());
165             }
166         }
167         return commands;
168     }
169
170     private void resetTransactionQueue() {
171         chain.close();
172         chain = db.createTransactionChain();
173         pendingTransactions.clear();
174         transactionToCommand.clear();
175         failedTransactionQueue.clear();
176     }
177
178     synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
179         pendingTransactions.remove(transaction);
180         transactionToCommand.remove(transaction);
181     }
182
183     private synchronized void recordPendingTransaction(final TransactionCommand command,
184             final ReadWriteTransaction transaction) {
185         transactionToCommand.put(transaction, command);
186         pendingTransactions.add(transaction);
187     }
188
189     private List<TransactionCommand> extractCommands() throws InterruptedException {
190         List<TransactionCommand> commands = extractResubmitCommands();
191         if (!commands.isEmpty() && inputQueue.isEmpty()) {
192             //we got some commands to be executed let us not sit and wait on empty queue
193             return commands;
194         }
195         //pull commands from queue if not empty , otherwise wait for commands to be placed in queue.
196         commands.addAll(extractCommandsFromQueue());
197         return commands;
198     }
199
200     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
201         List<TransactionCommand> result = new ArrayList<>();
202         TransactionCommand command = inputQueue.take();
203         result.add(command);
204         inputQueue.drainTo(result);
205         return result;
206     }
207
208     @PreDestroy
209     @Deactivate
210     @Override
211     public void close() {
212         chain.close();
213         executor.shutdown();
214     }
215
216     @Override
217     public void uncaughtException(final Thread thread, final Throwable ex) {
218         LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex);
219         if (transactionInFlight != null) {
220             offerFailedTransaction(transactionInFlight);
221         }
222         transactionInFlight = null;
223         executor.execute(this);
224     }
225 }