Bump MRI upstreams
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.southbound.transactions.md;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.AbstractMap.SimpleImmutableEntry;
16 import java.util.ArrayDeque;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map.Entry;
22 import java.util.Queue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import org.checkerframework.checker.lock.qual.GuardedBy;
31 import org.opendaylight.mdsal.binding.api.DataBroker;
32 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
33 import org.opendaylight.mdsal.binding.api.Transaction;
34 import org.opendaylight.mdsal.binding.api.TransactionChain;
35 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 public final class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable,
40         AutoCloseable {
41     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
42     private static final int QUEUE_SIZE = 10000;
43
44     private final DataBroker db;
45     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
46     private final BlockingQueue<Transaction> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
47     private final ExecutorService executor;
48
49     private final AtomicBoolean runTask = new AtomicBoolean(true);
50
51     @GuardedBy("this")
52     private final Queue<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions = new ArrayDeque<>();
53     @GuardedBy("this")
54     private TransactionChain chain;
55
56     public TransactionInvokerImpl(final DataBroker db) {
57         this.db = db;
58         this.chain = db.createTransactionChain(this);
59         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
60         executor = Executors.newSingleThreadExecutor(threadFact);
61         executor.execute(this);
62     }
63
64     @VisibleForTesting
65     TransactionInvokerImpl(final DataBroker db, final ExecutorService executor) {
66         this.db = db;
67         this.chain = db.createTransactionChain(this);
68         this.executor = executor;
69     }
70
71     @VisibleForTesting
72     TransactionInvokerImpl(final DataBroker db,
73             final List<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions,
74             final List<ReadWriteTransaction> failedTransactions) {
75         this(db, (ExecutorService) null);
76
77         // Initialize state
78         this.pendingTransactions.addAll(pendingTransactions);
79         this.failedTransactionQueue.addAll(failedTransactions);
80     }
81
82     @VisibleForTesting
83     TransactionInvokerImpl(final DataBroker db,
84             final List<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions) {
85         this(db, pendingTransactions, Collections.emptyList());
86     }
87
88     @Override
89     public void invoke(final TransactionCommand command) {
90         // TODO what do we do if queue is full?
91         if (!inputQueue.offer(command)) {
92             LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
93         }
94     }
95
96     @Override
97     public void onTransactionChainFailed(final TransactionChain chainArg,
98             final Transaction transaction, final Throwable cause) {
99         LOG.error("Failed to write operational topology", cause);
100         offerFailedTransaction(transaction);
101     }
102
103     @Override
104     public void onTransactionChainSuccessful(final TransactionChain chainArg) {
105         // NO OP
106     }
107
108     @Override
109     public void run() {
110         while (runTask.get()) {
111             final List<TransactionCommand> commands;
112             try {
113                 commands = extractCommands();
114             } catch (InterruptedException e) {
115                 LOG.warn("Extracting commands was interrupted.", e);
116                 continue;
117             }
118
119             commands.forEach(this::executeCommand);
120         }
121     }
122
123     private synchronized void executeCommand(final TransactionCommand command) {
124         ReadWriteTransaction transactionInFlight = null;
125         try {
126             final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
127             transactionInFlight = transaction;
128             recordPendingTransaction(command, transaction);
129             command.execute(transaction);
130             Futures.addCallback(transaction.commit(), new FutureCallback<Object>() {
131                 @Override
132                 public void onSuccess(final Object result) {
133                     forgetSuccessfulTransaction(transaction);
134                     command.onSuccess();
135                 }
136
137                 @Override
138                 public void onFailure(final Throwable throwable) {
139                     command.onFailure(throwable);
140                     // NOOP - handled by failure of transaction chain
141                 }
142             }, MoreExecutors.directExecutor());
143         } catch (IllegalStateException e) {
144             if (transactionInFlight != null) {
145                 // TODO: This method should distinguish exceptions on which the command should be
146                 // retried from exceptions on which the command should NOT be retried.
147                 // Then it should retry only the commands which should be retried, otherwise
148                 // this method will retry commands which will never be successful forever.
149                 offerFailedTransaction(transactionInFlight);
150             }
151             LOG.warn("Failed to process an update notification from OVS.", e);
152         }
153     }
154
155     private void offerFailedTransaction(final Transaction transaction) {
156         if (!failedTransactionQueue.offer(transaction)) {
157             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
158         }
159     }
160
161     @VisibleForTesting
162     synchronized List<TransactionCommand> extractResubmitCommands() {
163         Transaction transaction = failedTransactionQueue.poll();
164         List<TransactionCommand> commands = new ArrayList<>();
165         if (transaction != null) {
166             // Process all pending transactions, looking for the failed one...
167             final Iterator<Entry<ReadWriteTransaction, TransactionCommand>> it = pendingTransactions.iterator();
168             while (it.hasNext()) {
169                 final Entry<ReadWriteTransaction, TransactionCommand> current = it.next();
170                 if (transaction.equals(current.getKey())) {
171                     // .. collect current and all remaining pending transactions' values
172                     commands.add(current.getValue());
173                     it.forEachRemaining(entry -> commands.add(entry.getValue()));
174                     break;
175                 }
176             }
177
178             resetTransactionQueue();
179         }
180         return commands;
181     }
182
183     @VisibleForTesting
184     synchronized void resetTransactionQueue() {
185         chain.close();
186         chain = db.createTransactionChain(this);
187         pendingTransactions.clear();
188         failedTransactionQueue.clear();
189     }
190
191     synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
192         Iterator<Entry<ReadWriteTransaction, TransactionCommand>> it = pendingTransactions.iterator();
193         while (it.hasNext()) {
194             final Entry<ReadWriteTransaction, TransactionCommand> entry = it.next();
195             if (transaction.equals(entry.getKey())) {
196                 it.remove();
197                 break;
198             }
199         }
200     }
201
202     @VisibleForTesting
203     synchronized void recordPendingTransaction(final TransactionCommand command,
204             final ReadWriteTransaction transaction) {
205         pendingTransactions.add(new SimpleImmutableEntry<>(transaction, command));
206     }
207
208     @VisibleForTesting
209     List<TransactionCommand> extractCommands() throws InterruptedException {
210         List<TransactionCommand> commands = extractResubmitCommands();
211         commands.addAll(extractCommandsFromQueue());
212         return commands;
213     }
214
215     @VisibleForTesting
216     List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
217         List<TransactionCommand> result = new ArrayList<>();
218         TransactionCommand command = inputQueue.take();
219         result.add(command);
220         inputQueue.drainTo(result);
221         return result;
222     }
223
224     @Override
225     public void close() throws InterruptedException {
226         this.executor.shutdown();
227
228         synchronized (this) {
229             this.chain.close();
230         }
231
232         if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
233             runTask.set(false);
234             this.executor.shutdownNow();
235         }
236     }
237 }