Use ArrayDeque for TransactionInvokerImpl.pendingTransactions
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.southbound.transactions.md;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.ArrayDeque;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Queue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import org.checkerframework.checker.lock.qual.GuardedBy;
31 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
41     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
42     private static final int QUEUE_SIZE = 10000;
43
44     private final DataBroker db;
45     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
46     private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
47     private final ExecutorService executor;
48
49     private final AtomicBoolean runTask = new AtomicBoolean(true);
50
51     @GuardedBy("this")
52     private final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
53     @GuardedBy("this")
54     private final Queue<ReadWriteTransaction> pendingTransactions = new ArrayDeque<>();
55
56     private BindingTransactionChain chain;
57
58     public TransactionInvokerImpl(final DataBroker db) {
59         this.db = db;
60         this.chain = db.createTransactionChain(this);
61         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
62         executor = Executors.newSingleThreadExecutor(threadFact);
63         executor.execute(this);
64     }
65
66     @VisibleForTesting
67     TransactionInvokerImpl(final DataBroker db, final ExecutorService executor) {
68         this.db = db;
69         this.chain = db.createTransactionChain(this);
70         this.executor = executor;
71     }
72
73     @VisibleForTesting
74     TransactionInvokerImpl(final DataBroker db, final List<ReadWriteTransaction> pendingTransactions,
75             final List<ReadWriteTransaction> failedTransactions,
76             final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand) {
77         this(db, (ExecutorService) null);
78
79         // Initialize state
80         this.pendingTransactions.addAll(pendingTransactions);
81         this.failedTransactionQueue.addAll(failedTransactions);
82         this.transactionToCommand.putAll(transactionToCommand);
83     }
84
85     @VisibleForTesting
86     TransactionInvokerImpl(final DataBroker db, final List<ReadWriteTransaction> pendingTransactions) {
87         this(db, pendingTransactions, Collections.emptyList(), Collections.emptyMap());
88     }
89
90     @Override
91     public void invoke(final TransactionCommand command) {
92         // TODO what do we do if queue is full?
93         if (!inputQueue.offer(command)) {
94             LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
95         }
96     }
97
98     @Override
99     public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
100             final AsyncTransaction<?, ?> transaction, final Throwable cause) {
101         LOG.error("Failed to write operational topology", cause);
102         offerFailedTransaction(transaction);
103     }
104
105     @Override
106     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
107         // NO OP
108     }
109
110     @Override
111     public void run() {
112         while (runTask.get()) {
113             final List<TransactionCommand> commands;
114             try {
115                 commands = extractCommands();
116             } catch (InterruptedException e) {
117                 LOG.warn("Extracting commands was interrupted.", e);
118                 continue;
119             }
120
121             ReadWriteTransaction transactionInFlight = null;
122             try {
123                 for (TransactionCommand command: commands) {
124                     synchronized (this) {
125                         final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
126                         transactionInFlight = transaction;
127                         recordPendingTransaction(command, transaction);
128                         command.execute(transaction);
129                         Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
130                             @Override
131                             public void onSuccess(final Void result) {
132                                 forgetSuccessfulTransaction(transaction);
133                                 command.onSuccess();
134                             }
135
136                             @Override
137                             public void onFailure(final Throwable throwable) {
138                                 command.onFailure(throwable);
139                                 // NOOP - handled by failure of transaction chain
140                             }
141                         }, MoreExecutors.directExecutor());
142                     }
143                 }
144             } catch (IllegalStateException e) {
145                 if (transactionInFlight != null) {
146                     // TODO: This method should distinguish exceptions on which the command should be
147                     // retried from exceptions on which the command should NOT be retried.
148                     // Then it should retry only the commands which should be retried, otherwise
149                     // this method will retry commands which will never be successful forever.
150                     offerFailedTransaction(transactionInFlight);
151                 }
152                 LOG.warn("Failed to process an update notification from OVS.", e);
153             }
154         }
155     }
156
157     private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
158         if (!failedTransactionQueue.offer(transaction)) {
159             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
160         }
161     }
162
163     @VisibleForTesting
164     synchronized List<TransactionCommand> extractResubmitCommands() {
165         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
166         List<TransactionCommand> commands = new ArrayList<>();
167         if (transaction != null) {
168             // Process all pending transactions, looking for the failed one...
169             final Iterator<ReadWriteTransaction> it = pendingTransactions.iterator();
170             while (it.hasNext()) {
171                 final ReadWriteTransaction current = it.next();
172                 if (transaction.equals(current)) {
173                     // .. collect current and all remaining pending transactions
174                     commands.add(transactionToCommand.get(current));
175                     it.forEachRemaining(tx -> commands.add(transactionToCommand.get(tx)));
176                     break;
177                 }
178             }
179
180             resetTransactionQueue();
181         }
182         return commands;
183     }
184
185     @VisibleForTesting
186     synchronized void resetTransactionQueue() {
187         chain.close();
188         chain = db.createTransactionChain(this);
189         pendingTransactions.clear();
190         transactionToCommand.clear();
191         failedTransactionQueue.clear();
192     }
193
194     synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
195         pendingTransactions.remove(transaction);
196         transactionToCommand.remove(transaction);
197     }
198
199     @VisibleForTesting
200     synchronized void recordPendingTransaction(final TransactionCommand command,
201             final ReadWriteTransaction transaction) {
202         transactionToCommand.put(transaction, command);
203         pendingTransactions.add(transaction);
204     }
205
206     @VisibleForTesting
207     List<TransactionCommand> extractCommands() throws InterruptedException {
208         List<TransactionCommand> commands = extractResubmitCommands();
209         commands.addAll(extractCommandsFromQueue());
210         return commands;
211     }
212
213     @VisibleForTesting
214     List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
215         List<TransactionCommand> result = new ArrayList<>();
216         TransactionCommand command = inputQueue.take();
217         result.add(command);
218         inputQueue.drainTo(result);
219         return result;
220     }
221
222     @Override
223     public void close() throws InterruptedException {
224         this.chain.close();
225         this.executor.shutdown();
226         if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
227             runTask.set(false);
228             this.executor.shutdownNow();
229         }
230     }
231 }