Reduce use of powermockito
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.southbound.transactions.md;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import java.util.ArrayList;
17 import java.util.HashMap;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36
37 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
38     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
39     private static final int QUEUE_SIZE = 10000;
40     private BindingTransactionChain chain;
41     private final DataBroker db;
42     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
43     private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
44         = new LinkedBlockingQueue<>(QUEUE_SIZE);
45     private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
46         = new LinkedBlockingQueue<>(QUEUE_SIZE);
47     private final ExecutorService executor;
48     private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
49         = new HashMap<>();
50     private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
51     private final AtomicBoolean runTask = new AtomicBoolean(true);
52
53     public TransactionInvokerImpl(DataBroker db) {
54         this.db = db;
55         this.chain = db.createTransactionChain(this);
56         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
57         executor = Executors.newSingleThreadExecutor(threadFact);
58         executor.execute(this);
59     }
60
61     @Override
62     public void invoke(final TransactionCommand command) {
63         // TODO what do we do if queue is full?
64         if (!inputQueue.offer(command)) {
65             LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
66         }
67     }
68
69     @Override
70     public void onTransactionChainFailed(TransactionChain<?, ?> chainArg,
71             AsyncTransaction<?, ?> transaction, Throwable cause) {
72         LOG.error("Failed to write operational topology", cause);
73         offerFailedTransaction(transaction);
74     }
75
76     @Override
77     public void onTransactionChainSuccessful(TransactionChain<?, ?> chainArg) {
78         // NO OP
79     }
80
81     @Override
82     public void run() {
83         while (runTask.get()) {
84             forgetSuccessfulTransactions();
85
86             List<TransactionCommand> commands = null;
87             try {
88                 commands = extractCommands();
89             } catch (InterruptedException e) {
90                 LOG.warn("Extracting commands was interrupted.", e);
91                 continue;
92             }
93
94             ReadWriteTransaction transactionInFlight = null;
95             try {
96                 for (TransactionCommand command: commands) {
97                     final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
98                     transactionInFlight = transaction;
99                     recordPendingTransaction(command, transaction);
100                     command.execute(transaction);
101                     Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
102                         @Override
103                         public void onSuccess(final Void result) {
104                             if (!successfulTransactionQueue.offer(transaction)) {
105                                 LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
106                                         successfulTransactionQueue.size(), transaction);
107                             }
108                             command.onSuccess();
109                         }
110
111                         @Override
112                         public void onFailure(final Throwable throwable) {
113                             command.onFailure(throwable);
114                             // NOOP - handled by failure of transaction chain
115                         }
116                     }, MoreExecutors.directExecutor());
117                 }
118             } catch (IllegalStateException e) {
119                 if (transactionInFlight != null) {
120                     // TODO: This method should distinguish exceptions on which the command should be
121                     // retried from exceptions on which the command should NOT be retried.
122                     // Then it should retry only the commands which should be retried, otherwise
123                     // this method will retry commands which will never be successful forever.
124                     offerFailedTransaction(transactionInFlight);
125                 }
126                 LOG.warn("Failed to process an update notification from OVS.", e);
127             }
128         }
129     }
130
131     private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
132         if (!failedTransactionQueue.offer(transaction)) {
133             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
134         }
135     }
136
137     @VisibleForTesting
138     List<TransactionCommand> extractResubmitCommands() {
139         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
140         List<TransactionCommand> commands = new ArrayList<>();
141         if (transaction != null) {
142             int index = pendingTransactions.lastIndexOf(transaction);
143             List<ReadWriteTransaction> transactions =
144                     pendingTransactions.subList(index, pendingTransactions.size() - 1);
145             for (ReadWriteTransaction tx: transactions) {
146                 commands.add(transactionToCommand.get(tx));
147             }
148             resetTransactionQueue();
149         }
150         return commands;
151     }
152
153     @VisibleForTesting
154     void resetTransactionQueue() {
155         chain.close();
156         chain = db.createTransactionChain(this);
157         pendingTransactions = new ArrayList<>();
158         transactionToCommand = new HashMap<>();
159         failedTransactionQueue.clear();
160         successfulTransactionQueue.clear();
161     }
162
163     private void recordPendingTransaction(TransactionCommand command,
164             final ReadWriteTransaction transaction) {
165         transactionToCommand.put(transaction, command);
166         pendingTransactions.add(transaction);
167     }
168
169     private List<TransactionCommand> extractCommands() throws InterruptedException {
170         List<TransactionCommand> commands = extractResubmitCommands();
171         commands.addAll(extractCommandsFromQueue());
172         return commands;
173     }
174
175     @VisibleForTesting
176     List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
177         List<TransactionCommand> result = new ArrayList<>();
178         TransactionCommand command = inputQueue.take();
179         while (command != null) {
180             result.add(command);
181             command = inputQueue.poll();
182         }
183         return result;
184     }
185
186     private void forgetSuccessfulTransactions() {
187         ReadWriteTransaction transaction = successfulTransactionQueue.poll();
188         while (transaction != null) {
189             pendingTransactions.remove(transaction);
190             transactionToCommand.remove(transaction);
191             transaction = successfulTransactionQueue.poll();
192         }
193     }
194
195     @Override
196     public void close() throws InterruptedException {
197         this.chain.close();
198         this.executor.shutdown();
199         if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
200             runTask.set(false);
201             this.executor.shutdownNow();
202         }
203     }
204 }