Speed up inputQueue interaction
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.southbound.transactions.md;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14 import java.util.ArrayList;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
35     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
36     private static final int QUEUE_SIZE = 10000;
37
38     private final DataBroker db;
39     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
40     private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
41     private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
42             new LinkedBlockingQueue<>(QUEUE_SIZE);
43     private final ExecutorService executor;
44
45     private final AtomicBoolean runTask = new AtomicBoolean(true);
46
47     private Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
48     private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
49     private BindingTransactionChain chain;
50
51     public TransactionInvokerImpl(final DataBroker db) {
52         this.db = db;
53         this.chain = db.createTransactionChain(this);
54         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
55         executor = Executors.newSingleThreadExecutor(threadFact);
56         executor.execute(this);
57     }
58
59     @Override
60     public void invoke(final TransactionCommand command) {
61         // TODO what do we do if queue is full?
62         if (!inputQueue.offer(command)) {
63             LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
64         }
65     }
66
67     @Override
68     public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
69             final AsyncTransaction<?, ?> transaction, final Throwable cause) {
70         LOG.error("Failed to write operational topology", cause);
71         offerFailedTransaction(transaction);
72     }
73
74     @Override
75     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
76         // NO OP
77     }
78
79     @Override
80     public void run() {
81         while (runTask.get()) {
82             forgetSuccessfulTransactions();
83
84             List<TransactionCommand> commands = null;
85             try {
86                 commands = extractCommands();
87             } catch (InterruptedException e) {
88                 LOG.warn("Extracting commands was interrupted.", e);
89                 continue;
90             }
91
92             ReadWriteTransaction transactionInFlight = null;
93             try {
94                 for (TransactionCommand command: commands) {
95                     final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
96                     transactionInFlight = transaction;
97                     recordPendingTransaction(command, transaction);
98                     command.execute(transaction);
99                     Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
100                         @Override
101                         public void onSuccess(final Void result) {
102                             if (!successfulTransactionQueue.offer(transaction)) {
103                                 LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
104                                         successfulTransactionQueue.size(), transaction);
105                             }
106                             command.onSuccess();
107                         }
108
109                         @Override
110                         public void onFailure(final Throwable throwable) {
111                             command.onFailure(throwable);
112                             // NOOP - handled by failure of transaction chain
113                         }
114                     }, MoreExecutors.directExecutor());
115                 }
116             } catch (IllegalStateException e) {
117                 if (transactionInFlight != null) {
118                     // TODO: This method should distinguish exceptions on which the command should be
119                     // retried from exceptions on which the command should NOT be retried.
120                     // Then it should retry only the commands which should be retried, otherwise
121                     // this method will retry commands which will never be successful forever.
122                     offerFailedTransaction(transactionInFlight);
123                 }
124                 LOG.warn("Failed to process an update notification from OVS.", e);
125             }
126         }
127     }
128
129     private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
130         if (!failedTransactionQueue.offer(transaction)) {
131             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
132         }
133     }
134
135     private List<TransactionCommand> extractResubmitCommands() {
136         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
137         List<TransactionCommand> commands = new ArrayList<>();
138         if (transaction != null) {
139             int index = pendingTransactions.lastIndexOf(transaction);
140             List<ReadWriteTransaction> transactions =
141                     pendingTransactions.subList(index, pendingTransactions.size() - 1);
142             for (ReadWriteTransaction tx: transactions) {
143                 commands.add(transactionToCommand.get(tx));
144             }
145             resetTransactionQueue();
146         }
147         return commands;
148     }
149
150     private void resetTransactionQueue() {
151         chain.close();
152         chain = db.createTransactionChain(this);
153         pendingTransactions = new ArrayList<>();
154         transactionToCommand = new HashMap<>();
155         failedTransactionQueue.clear();
156         successfulTransactionQueue.clear();
157     }
158
159     private void recordPendingTransaction(final TransactionCommand command,
160             final ReadWriteTransaction transaction) {
161         transactionToCommand.put(transaction, command);
162         pendingTransactions.add(transaction);
163     }
164
165     private List<TransactionCommand> extractCommands() throws InterruptedException {
166         List<TransactionCommand> commands = extractResubmitCommands();
167         commands.addAll(extractCommandsFromQueue());
168         return commands;
169     }
170
171     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
172         List<TransactionCommand> result = new ArrayList<>();
173         TransactionCommand command = inputQueue.take();
174         result.add(command);
175         inputQueue.drainTo(result);
176         return result;
177     }
178
179     private void forgetSuccessfulTransactions() {
180         ReadWriteTransaction transaction = successfulTransactionQueue.poll();
181         while (transaction != null) {
182             pendingTransactions.remove(transaction);
183             transactionToCommand.remove(transaction);
184             transaction = successfulTransactionQueue.poll();
185         }
186     }
187
188     @Override
189     public void close() throws InterruptedException {
190         this.chain.close();
191         this.executor.shutdown();
192         if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
193             runTask.set(false);
194             this.executor.shutdownNow();
195         }
196     }
197 }