ae8ea401ac50f5755e1a98610ef00571a0867dcd
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.southbound.transactions.md;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14 import java.util.ArrayList;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34
35 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
36     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
37     private static final int QUEUE_SIZE = 10000;
38     private BindingTransactionChain chain;
39     private DataBroker db;
40     private BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
41     private BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
42         = new LinkedBlockingQueue<>(QUEUE_SIZE);
43     private BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
44         = new LinkedBlockingQueue<>(QUEUE_SIZE);
45     private ExecutorService executor;
46     private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
47         = new HashMap<>();
48     private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
49     private final AtomicBoolean runTask = new AtomicBoolean( true );
50
51     public TransactionInvokerImpl(DataBroker db) {
52         this.db = db;
53         this.chain = db.createTransactionChain(this);
54         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
55         executor = Executors.newSingleThreadExecutor(threadFact);
56         executor.submit(this);
57     }
58
59     @Override
60     public void invoke(final TransactionCommand command) {
61         // TODO what do we do if queue is full?
62         inputQueue.offer(command);
63     }
64
65     @Override
66     public void onTransactionChainFailed(TransactionChain<?, ?> chainArg,
67             AsyncTransaction<?, ?> transaction, Throwable cause) {
68         failedTransactionQueue.offer(transaction);
69     }
70
71     @Override
72     public void onTransactionChainSuccessful(TransactionChain<?, ?> chainArg) {
73         // NO OP
74     }
75
76     @Override
77     public void run() {
78         while (runTask.get()) {
79             forgetSuccessfulTransactions();
80             try {
81                 List<TransactionCommand> commands = extractCommands();
82                 for (TransactionCommand command: commands) {
83                     final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
84                     recordPendingTransaction(command, transaction);
85                     command.execute(transaction);
86                     Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
87                         @Override
88                         public void onSuccess(final Void result) {
89                             successfulTransactionQueue.offer(transaction);
90                         }
91
92                         @Override
93                         public void onFailure(final Throwable throwable) {
94                             // NOOP - handled by failure of transaction chain
95                         }
96                     });
97                 }
98             } catch (InterruptedException e) {
99                 LOG.warn("Exception invoking Transaction: ", e);
100             }
101         }
102     }
103
104     private List<TransactionCommand> extractResubmitCommands() {
105         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
106         List<TransactionCommand> commands = new ArrayList<>();
107         if (transaction != null) {
108             int index = pendingTransactions.lastIndexOf(transaction);
109             List<ReadWriteTransaction> transactions =
110                     pendingTransactions.subList(index, pendingTransactions.size() - 1);
111             for (ReadWriteTransaction tx: transactions) {
112                 commands.add(transactionToCommand.get(tx));
113             }
114             resetTransactionQueue();
115         }
116         return commands;
117     }
118
119     private void resetTransactionQueue() {
120         chain.close();
121         chain = db.createTransactionChain(this);
122         pendingTransactions = new ArrayList<>();
123         transactionToCommand = new HashMap<>();
124         failedTransactionQueue.clear();
125         successfulTransactionQueue.clear();
126     }
127
128     private void recordPendingTransaction(TransactionCommand command,
129             final ReadWriteTransaction transaction) {
130         transactionToCommand.put(transaction, command);
131         pendingTransactions.add(transaction);
132     }
133
134     private List<TransactionCommand> extractCommands() throws InterruptedException {
135         List<TransactionCommand> commands = extractResubmitCommands();
136         commands.addAll(extractCommandsFromQueue());
137         return commands;
138     }
139
140     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
141         List<TransactionCommand> result = new ArrayList<>();
142         TransactionCommand command = inputQueue.take();
143         while (command != null) {
144             result.add(command);
145             command = inputQueue.poll();
146         }
147         return result;
148     }
149
150     private void forgetSuccessfulTransactions() {
151         ReadWriteTransaction transaction = successfulTransactionQueue.poll();
152         while (transaction != null) {
153             pendingTransactions.remove(transaction);
154             transactionToCommand.remove(transaction);
155             transaction = successfulTransactionQueue.poll();
156         }
157     }
158
159     @Override
160     public void close() throws Exception {
161         this.executor.shutdown();
162         if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
163             runTask.set(false);
164             this.executor.shutdownNow();
165         }
166     }
167 }