46b7934622a7023ac3c4d848c76186b2f89c4d05
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.southbound.transactions.md;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.LinkedBlockingQueue;
23 import java.util.concurrent.ThreadFactory;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35
36 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
37     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
38     private static final int QUEUE_SIZE = 10000;
39     private BindingTransactionChain chain;
40     private final DataBroker db;
41     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
42     private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
43         = new LinkedBlockingQueue<>(QUEUE_SIZE);
44     private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
45         = new LinkedBlockingQueue<>(QUEUE_SIZE);
46     private final ExecutorService executor;
47     private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
48         = new HashMap<>();
49     private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
50     private final AtomicBoolean runTask = new AtomicBoolean(true);
51
52     public TransactionInvokerImpl(DataBroker db) {
53         this.db = db;
54         this.chain = db.createTransactionChain(this);
55         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
56         executor = Executors.newSingleThreadExecutor(threadFact);
57         executor.execute(this);
58     }
59
60     @Override
61     public void invoke(final TransactionCommand command) {
62         // TODO what do we do if queue is full?
63         if (!inputQueue.offer(command)) {
64             LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
65         }
66     }
67
68     @Override
69     public void onTransactionChainFailed(TransactionChain<?, ?> chainArg,
70             AsyncTransaction<?, ?> transaction, Throwable cause) {
71         LOG.error("Failed to write operational topology", cause);
72         offerFailedTransaction(transaction);
73     }
74
75     @Override
76     public void onTransactionChainSuccessful(TransactionChain<?, ?> chainArg) {
77         // NO OP
78     }
79
80     @Override
81     public void run() {
82         while (runTask.get()) {
83             forgetSuccessfulTransactions();
84
85             List<TransactionCommand> commands = null;
86             try {
87                 commands = extractCommands();
88             } catch (InterruptedException e) {
89                 LOG.warn("Extracting commands was interrupted.", e);
90                 continue;
91             }
92
93             ReadWriteTransaction transactionInFlight = null;
94             try {
95                 for (TransactionCommand command: commands) {
96                     final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
97                     transactionInFlight = transaction;
98                     recordPendingTransaction(command, transaction);
99                     command.execute(transaction);
100                     Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
101                         @Override
102                         public void onSuccess(final Void result) {
103                             if (!successfulTransactionQueue.offer(transaction)) {
104                                 LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
105                                         successfulTransactionQueue.size(), transaction);
106                             }
107                             command.onSuccess();
108                         }
109
110                         @Override
111                         public void onFailure(final Throwable throwable) {
112                             command.onFailure(throwable);
113                             // NOOP - handled by failure of transaction chain
114                         }
115                     }, MoreExecutors.directExecutor());
116                 }
117             } catch (IllegalStateException e) {
118                 if (transactionInFlight != null) {
119                     // TODO: This method should distinguish exceptions on which the command should be
120                     // retried from exceptions on which the command should NOT be retried.
121                     // Then it should retry only the commands which should be retried, otherwise
122                     // this method will retry commands which will never be successful forever.
123                     offerFailedTransaction(transactionInFlight);
124                 }
125                 LOG.warn("Failed to process an update notification from OVS.", e);
126             }
127         }
128     }
129
130     private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
131         if (!failedTransactionQueue.offer(transaction)) {
132             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
133         }
134     }
135
136     private List<TransactionCommand> extractResubmitCommands() {
137         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
138         List<TransactionCommand> commands = new ArrayList<>();
139         if (transaction != null) {
140             int index = pendingTransactions.lastIndexOf(transaction);
141             List<ReadWriteTransaction> transactions =
142                     pendingTransactions.subList(index, pendingTransactions.size() - 1);
143             for (ReadWriteTransaction tx: transactions) {
144                 commands.add(transactionToCommand.get(tx));
145             }
146             resetTransactionQueue();
147         }
148         return commands;
149     }
150
151     private void resetTransactionQueue() {
152         chain.close();
153         chain = db.createTransactionChain(this);
154         pendingTransactions = new ArrayList<>();
155         transactionToCommand = new HashMap<>();
156         failedTransactionQueue.clear();
157         successfulTransactionQueue.clear();
158     }
159
160     private void recordPendingTransaction(TransactionCommand command,
161             final ReadWriteTransaction transaction) {
162         transactionToCommand.put(transaction, command);
163         pendingTransactions.add(transaction);
164     }
165
166     private List<TransactionCommand> extractCommands() throws InterruptedException {
167         List<TransactionCommand> commands = extractResubmitCommands();
168         commands.addAll(extractCommandsFromQueue());
169         return commands;
170     }
171
172     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
173         List<TransactionCommand> result = new ArrayList<>();
174         TransactionCommand command = inputQueue.take();
175         while (command != null) {
176             result.add(command);
177             command = inputQueue.poll();
178         }
179         return result;
180     }
181
182     private void forgetSuccessfulTransactions() {
183         ReadWriteTransaction transaction = successfulTransactionQueue.poll();
184         while (transaction != null) {
185             pendingTransactions.remove(transaction);
186             transactionToCommand.remove(transaction);
187             transaction = successfulTransactionQueue.poll();
188         }
189     }
190
191     @Override
192     public void close() throws InterruptedException {
193         this.chain.close();
194         this.executor.shutdown();
195         if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
196             runTask.set(false);
197             this.executor.shutdownNow();
198         }
199     }
200 }