MD-SAL API integration
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.southbound.transactions.md;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.AbstractMap.SimpleImmutableEntry;
16 import java.util.ArrayDeque;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map.Entry;
22 import java.util.Queue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import org.checkerframework.checker.lock.qual.GuardedBy;
31 import org.opendaylight.mdsal.binding.api.DataBroker;
32 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
33 import org.opendaylight.mdsal.binding.api.Transaction;
34 import org.opendaylight.mdsal.binding.api.TransactionChain;
35 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
40     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
41     private static final int QUEUE_SIZE = 10000;
42
43     private final DataBroker db;
44     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
45     private final BlockingQueue<Transaction> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
46     private final ExecutorService executor;
47
48     private final AtomicBoolean runTask = new AtomicBoolean(true);
49
50     @GuardedBy("this")
51     private final Queue<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions = new ArrayDeque<>();
52     @GuardedBy("this")
53     private TransactionChain chain;
54
55     public TransactionInvokerImpl(final DataBroker db) {
56         this.db = db;
57         this.chain = db.createTransactionChain(this);
58         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
59         executor = Executors.newSingleThreadExecutor(threadFact);
60         executor.execute(this);
61     }
62
63     @VisibleForTesting
64     TransactionInvokerImpl(final DataBroker db, final ExecutorService executor) {
65         this.db = db;
66         this.chain = db.createTransactionChain(this);
67         this.executor = executor;
68     }
69
70     @VisibleForTesting
71     TransactionInvokerImpl(final DataBroker db,
72             final List<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions,
73             final List<ReadWriteTransaction> failedTransactions) {
74         this(db, (ExecutorService) null);
75
76         // Initialize state
77         this.pendingTransactions.addAll(pendingTransactions);
78         this.failedTransactionQueue.addAll(failedTransactions);
79     }
80
81     @VisibleForTesting
82     TransactionInvokerImpl(final DataBroker db,
83             final List<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions) {
84         this(db, pendingTransactions, Collections.emptyList());
85     }
86
87     @Override
88     public void invoke(final TransactionCommand command) {
89         // TODO what do we do if queue is full?
90         if (!inputQueue.offer(command)) {
91             LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
92         }
93     }
94
95     @Override
96     public void onTransactionChainFailed(final TransactionChain chainArg,
97             final Transaction transaction, final Throwable cause) {
98         LOG.error("Failed to write operational topology", cause);
99         offerFailedTransaction(transaction);
100     }
101
102     @Override
103     public void onTransactionChainSuccessful(final TransactionChain chainArg) {
104         // NO OP
105     }
106
107     @Override
108     public void run() {
109         while (runTask.get()) {
110             final List<TransactionCommand> commands;
111             try {
112                 commands = extractCommands();
113             } catch (InterruptedException e) {
114                 LOG.warn("Extracting commands was interrupted.", e);
115                 continue;
116             }
117
118             commands.forEach(this::executeCommand);
119         }
120     }
121
122     private synchronized void executeCommand(final TransactionCommand command) {
123         ReadWriteTransaction transactionInFlight = null;
124         try {
125             final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
126             transactionInFlight = transaction;
127             recordPendingTransaction(command, transaction);
128             command.execute(transaction);
129             Futures.addCallback(transaction.commit(), new FutureCallback<Object>() {
130                 @Override
131                 public void onSuccess(final Object result) {
132                     forgetSuccessfulTransaction(transaction);
133                     command.onSuccess();
134                 }
135
136                 @Override
137                 public void onFailure(final Throwable throwable) {
138                     command.onFailure(throwable);
139                     // NOOP - handled by failure of transaction chain
140                 }
141             }, MoreExecutors.directExecutor());
142         } catch (IllegalStateException e) {
143             if (transactionInFlight != null) {
144                 // TODO: This method should distinguish exceptions on which the command should be
145                 // retried from exceptions on which the command should NOT be retried.
146                 // Then it should retry only the commands which should be retried, otherwise
147                 // this method will retry commands which will never be successful forever.
148                 offerFailedTransaction(transactionInFlight);
149             }
150             LOG.warn("Failed to process an update notification from OVS.", e);
151         }
152     }
153
154     private void offerFailedTransaction(final Transaction transaction) {
155         if (!failedTransactionQueue.offer(transaction)) {
156             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
157         }
158     }
159
160     @VisibleForTesting
161     synchronized List<TransactionCommand> extractResubmitCommands() {
162         Transaction transaction = failedTransactionQueue.poll();
163         List<TransactionCommand> commands = new ArrayList<>();
164         if (transaction != null) {
165             // Process all pending transactions, looking for the failed one...
166             final Iterator<Entry<ReadWriteTransaction, TransactionCommand>> it = pendingTransactions.iterator();
167             while (it.hasNext()) {
168                 final Entry<ReadWriteTransaction, TransactionCommand> current = it.next();
169                 if (transaction.equals(current.getKey())) {
170                     // .. collect current and all remaining pending transactions' values
171                     commands.add(current.getValue());
172                     it.forEachRemaining(entry -> commands.add(entry.getValue()));
173                     break;
174                 }
175             }
176
177             resetTransactionQueue();
178         }
179         return commands;
180     }
181
182     @VisibleForTesting
183     synchronized void resetTransactionQueue() {
184         chain.close();
185         chain = db.createTransactionChain(this);
186         pendingTransactions.clear();
187         failedTransactionQueue.clear();
188     }
189
190     synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
191         Iterator<Entry<ReadWriteTransaction, TransactionCommand>> it = pendingTransactions.iterator();
192         while (it.hasNext()) {
193             final Entry<ReadWriteTransaction, TransactionCommand> entry = it.next();
194             if (transaction.equals(entry.getKey())) {
195                 it.remove();
196                 break;
197             }
198         }
199     }
200
201     @VisibleForTesting
202     synchronized void recordPendingTransaction(final TransactionCommand command,
203             final ReadWriteTransaction transaction) {
204         pendingTransactions.add(new SimpleImmutableEntry<>(transaction, command));
205     }
206
207     @VisibleForTesting
208     List<TransactionCommand> extractCommands() throws InterruptedException {
209         List<TransactionCommand> commands = extractResubmitCommands();
210         commands.addAll(extractCommandsFromQueue());
211         return commands;
212     }
213
214     @VisibleForTesting
215     List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
216         List<TransactionCommand> result = new ArrayList<>();
217         TransactionCommand command = inputQueue.take();
218         result.add(command);
219         inputQueue.drainTo(result);
220         return result;
221     }
222
223     @Override
224     public void close() throws InterruptedException {
225         this.executor.shutdown();
226
227         synchronized (this) {
228             this.chain.close();
229         }
230
231         if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
232             runTask.set(false);
233             this.executor.shutdownNow();
234         }
235     }
236 }