Merge "BUG 1839 - HTTP delete of non existing data"
[controller.git] / opendaylight / md-sal / inventory-manager / src / main / java / org / opendaylight / controller / md / inventory / manager / FlowCapableInventoryProvider.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.inventory.manager;
9
10 import java.util.ArrayList;
11 import java.util.concurrent.BlockingQueue;
12 import java.util.concurrent.LinkedBlockingDeque;
13
14 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
15 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
16 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
20 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import com.google.common.base.Preconditions;
26
27 class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
28     private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
29     private static final int QUEUE_DEPTH = 500;
30     private static final int MAX_BATCH = 100;
31
32     private final BlockingQueue<InventoryOperation> queue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
33     private final NotificationProviderService notificationService;
34
35     private final DataBroker dataBroker;
36     private BindingTransactionChain txChain;
37     private ListenerRegistration<?> listenerRegistration;
38     private Thread thread;
39
40     FlowCapableInventoryProvider(final DataBroker dataBroker, final NotificationProviderService notificationService) {
41         this.dataBroker = Preconditions.checkNotNull(dataBroker);
42         this.notificationService = Preconditions.checkNotNull(notificationService);
43     }
44
45     void start() {
46         final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
47         this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
48
49         this.txChain = (dataBroker.createTransactionChain(this));
50         thread = new Thread(this);
51         thread.setDaemon(true);
52         thread.setName("FlowCapableInventoryProvider");
53         thread.start();
54
55         LOG.info("Flow Capable Inventory Provider started.");
56     }
57
58     void enqueue(final InventoryOperation op) {
59         try {
60             queue.put(op);
61         } catch (final InterruptedException e) {
62             LOG.warn("Failed to enqueue operation {}", op, e);
63         }
64     }
65
66     @Override
67     public void run() {
68         try {
69             for (; ; ) {
70                 InventoryOperation op = queue.take();
71                 int ops = 0;
72                 final ArrayList<InventoryOperation> opsToApply = new ArrayList<>(MAX_BATCH);
73                 do {
74                     opsToApply.add(op);
75                     ops++;
76                     if (ops < MAX_BATCH) {
77                         op = queue.poll();
78                     } else {
79                         op = null;
80                     }
81                 } while (op != null);
82                 submitOperations(opsToApply);
83             }
84         } catch (final InterruptedException e) {
85             LOG.info("Processing interrupted, terminating", e);
86         }
87
88         // Drain all events, making sure any blocked threads are unblocked
89         while (!queue.isEmpty()) {
90             queue.poll();
91         }
92     }
93
94     /**
95      * Starts new empty transaction, custimizes it with submitted operations
96      * and submit it to data broker.
97      *
98      * If transaction chain failed during customization of transaction
99      * it allocates new chain and empty transaction and  customizes it
100      * with submitted operations.
101      *
102      * This does not retry failed transaction. It only retries it when
103      * chain failed during customization of transaction chain.
104      *
105      * @param opsToApply
106      */
107     private void submitOperations(final ArrayList<InventoryOperation> opsToApply) {
108         final ReadWriteTransaction tx = createCustomizedTransaction(opsToApply);
109         LOG.debug("Processed {} operations, submitting transaction {}", opsToApply.size(), tx.getIdentifier());
110         try {
111             tx.submit();
112         } catch (final IllegalStateException e) {
113             /*
114              * Transaction chain failed during doing batch, so we need to null
115              * tx chain and continue processing queue.
116              *
117              * We fail current txChain which was allocated with createTransaction.
118              */
119             failCurrentChain(txChain);
120             /*
121              * We will retry transaction once in order to not loose any data.
122              *
123              */
124             final ReadWriteTransaction retryTx = createCustomizedTransaction(opsToApply);
125             retryTx.submit();
126         }
127     }
128
129     /**
130      * Creates new empty ReadWriteTransaction. If transaction chain
131      * was failed, it will allocate new transaction chain
132      * and assign it with this Operation Executor.
133      *
134      * This call is synchronized to prevent reace with {@link #failCurrentChain(TransactionChain)}.
135      *
136      * @return New Empty ReadWrite transaction, which continues this chain or starts new transaction
137      *          chain.
138      */
139     private synchronized ReadWriteTransaction newEmptyTransaction() {
140         try {
141             if(txChain == null) {
142                 // Chain was broken so we need to replace it.
143                 txChain = dataBroker.createTransactionChain(this);
144             }
145             return txChain.newReadWriteTransaction();
146         } catch (final IllegalStateException e) {
147             LOG.debug("Chain is broken, need to allocate new transaction chain.",e);
148             /*
149              *  Chain was broken by previous transaction,
150              *  but there was race between this.
151              *  Chain will be closed by #onTransactionChainFailed method.
152              */
153             txChain = dataBroker.createTransactionChain(this);
154             return txChain.newReadWriteTransaction();
155         }
156     }
157
158     /**
159      * Creates customized not-submitted transaction, which is ready to be submitted.
160      *
161      * @param opsToApply Operations which are used to customize transaction.
162      * @return Non-empty transaction.
163      */
164     private ReadWriteTransaction createCustomizedTransaction(final ArrayList<InventoryOperation> opsToApply) {
165         final ReadWriteTransaction tx = newEmptyTransaction();
166         for(final InventoryOperation op : opsToApply) {
167             op.applyOperation(tx);
168         }
169         return tx;
170     }
171
172     private synchronized void failCurrentChain(final TransactionChain<?, ?> chain) {
173         if(txChain == chain) {
174             txChain = null;
175         }
176     }
177
178     @Override
179     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
180                                          final Throwable cause) {
181         LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
182         chain.close();
183         if(txChain == chain) {
184             // Current chain is broken, so we will null it, in order to not use it.
185             failCurrentChain(chain);
186         }
187     }
188
189     @Override
190     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
191         // NOOP
192     }
193
194     @Override
195     public void close() throws InterruptedException {
196         LOG.info("Flow Capable Inventory Provider stopped.");
197         if (this.listenerRegistration != null) {
198             try {
199                 this.listenerRegistration.close();
200             } catch (final Exception e) {
201                 LOG.error("Failed to stop inventory provider", e);
202             }
203             listenerRegistration = null;
204         }
205
206         if (thread != null) {
207             thread.interrupt();
208             thread.join();
209             thread = null;
210         }
211         if (txChain != null) {
212             try {
213                 txChain.close();
214             } catch (final IllegalStateException e) {
215                 // It is possible chain failed and was closed by #onTransactionChainFailed
216                 LOG.debug("Chain was already closed.");
217             }
218             txChain = null;
219         }
220     }
221 }