Bug-5198 : He Plugin:Inventory manager is deleting
[openflowplugin.git] / applications / inventory-manager / src / main / java / org / opendaylight / openflowplugin / applications / inventory / manager / FlowCapableInventoryProvider.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.inventory.manager;
9
10 import java.util.ArrayList;
11 import java.util.concurrent.BlockingQueue;
12 import java.util.concurrent.LinkedBlockingDeque;
13
14 import com.google.common.base.Optional;
15 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
18 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
19 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
20 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
23 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
24 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
26 import org.opendaylight.yangtools.concepts.ListenerRegistration;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import com.google.common.base.Preconditions;
31
32 class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
33     private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
34     private static final int QUEUE_DEPTH = 500;
35     private static final int MAX_BATCH = 100;
36
37     private final BlockingQueue<InventoryOperation> queue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
38     private final NotificationProviderService notificationService;
39     private final EntityOwnershipService eos;
40
41     private final DataBroker dataBroker;
42     private BindingTransactionChain txChain;
43     private ListenerRegistration<?> listenerRegistration;
44     private ListenerRegistration<?> tableFeatureListenerRegistration;
45     private Thread thread;
46
47     FlowCapableInventoryProvider(final DataBroker dataBroker, final NotificationProviderService notificationService, EntityOwnershipService eos) {
48         this.dataBroker = Preconditions.checkNotNull(dataBroker);
49         this.notificationService = Preconditions.checkNotNull(notificationService);
50         this.eos = eos;
51     }
52
53     void start() {
54         final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
55         this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
56
57         final NodeTablesFeatureCommitter nodeTablesFeatureCommitter =
58                 new NodeTablesFeatureCommitter(FlowCapableInventoryProvider.this);
59         this.tableFeatureListenerRegistration = this.notificationService.registerNotificationListener(nodeTablesFeatureCommitter);
60
61
62         this.txChain = (dataBroker.createTransactionChain(this));
63         thread = new Thread(this);
64         thread.setDaemon(true);
65         thread.setName("FlowCapableInventoryProvider");
66         thread.start();
67
68         LOG.info("Flow Capable Inventory Provider started.");
69     }
70
71     void enqueue(final InventoryOperation op) {
72         try {
73             queue.put(op);
74         } catch (final InterruptedException e) {
75             LOG.warn("Failed to enqueue operation {}", op, e);
76         }
77     }
78
79     @Override
80     public void run() {
81         try {
82             for (; ; ) {
83                 InventoryOperation op = queue.take();
84                 int ops = 0;
85                 final ArrayList<InventoryOperation> opsToApply = new ArrayList<>(MAX_BATCH);
86                 do {
87                     opsToApply.add(op);
88                     ops++;
89                     if (ops < MAX_BATCH) {
90                         op = queue.poll();
91                     } else {
92                         op = null;
93                     }
94                 } while (op != null);
95                 submitOperations(opsToApply);
96             }
97         } catch (final InterruptedException e) {
98             LOG.info("Processing interrupted, terminating", e);
99         }
100
101         // Drain all events, making sure any blocked threads are unblocked
102         while (!queue.isEmpty()) {
103             queue.poll();
104         }
105     }
106
107     /**
108      * Starts new empty transaction, custimizes it with submitted operations
109      * and submit it to data broker.
110      *
111      * If transaction chain failed during customization of transaction
112      * it allocates new chain and empty transaction and  customizes it
113      * with submitted operations.
114      *
115      * This does not retry failed transaction. It only retries it when
116      * chain failed during customization of transaction chain.
117      *
118      * @param opsToApply
119      */
120     private void submitOperations(final ArrayList<InventoryOperation> opsToApply) {
121         final ReadWriteTransaction tx = createCustomizedTransaction(opsToApply);
122         LOG.debug("Processed {} operations, submitting transaction {}", opsToApply.size(), tx.getIdentifier());
123         try {
124             tx.submit();
125         } catch (final IllegalStateException e) {
126             /*
127              * Transaction chain failed during doing batch, so we need to null
128              * tx chain and continue processing queue.
129              *
130              * We fail current txChain which was allocated with createTransaction.
131              */
132             failCurrentChain(txChain);
133             /*
134              * We will retry transaction once in order to not loose any data.
135              *
136              */
137             final ReadWriteTransaction retryTx = createCustomizedTransaction(opsToApply);
138             retryTx.submit();
139         }
140     }
141
142     /**
143      * Creates new empty ReadWriteTransaction. If transaction chain
144      * was failed, it will allocate new transaction chain
145      * and assign it with this Operation Executor.
146      *
147      * This call is synchronized to prevent reace with {@link #failCurrentChain(TransactionChain)}.
148      *
149      * @return New Empty ReadWrite transaction, which continues this chain or starts new transaction
150      *          chain.
151      */
152     private synchronized ReadWriteTransaction newEmptyTransaction() {
153         try {
154             if(txChain == null) {
155                 // Chain was broken so we need to replace it.
156                 txChain = dataBroker.createTransactionChain(this);
157             }
158             return txChain.newReadWriteTransaction();
159         } catch (final IllegalStateException e) {
160             LOG.debug("Chain is broken, need to allocate new transaction chain.",e);
161             /*
162              *  Chain was broken by previous transaction,
163              *  but there was race between this.
164              *  Chain will be closed by #onTransactionChainFailed method.
165              */
166             txChain = dataBroker.createTransactionChain(this);
167             return txChain.newReadWriteTransaction();
168         }
169     }
170
171     /**
172      * Creates customized not-submitted transaction, which is ready to be submitted.
173      *
174      * @param opsToApply Operations which are used to customize transaction.
175      * @return Non-empty transaction.
176      */
177     private ReadWriteTransaction createCustomizedTransaction(final ArrayList<InventoryOperation> opsToApply) {
178         final ReadWriteTransaction tx = newEmptyTransaction();
179         for(final InventoryOperation op : opsToApply) {
180             op.applyOperation(tx);
181         }
182         return tx;
183     }
184
185     private synchronized void failCurrentChain(final TransactionChain<?, ?> chain) {
186         if(txChain == chain) {
187             txChain = null;
188         }
189     }
190
191     @Override
192     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
193                                          final Throwable cause) {
194         LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
195         chain.close();
196         if(txChain == chain) {
197             // Current chain is broken, so we will null it, in order to not use it.
198             failCurrentChain(chain);
199         }
200     }
201
202     @Override
203     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
204         // NOOP
205     }
206
207     @Override
208     public void close() throws InterruptedException {
209         LOG.info("Flow Capable Inventory Provider stopped.");
210         if (this.listenerRegistration != null) {
211             try {
212                 this.listenerRegistration.close();
213             } catch (final Exception e) {
214                 LOG.error("Failed to stop inventory provider", e);
215             }
216             listenerRegistration = null;
217         }
218
219         if (this.tableFeatureListenerRegistration != null) {
220             try {
221                 this.tableFeatureListenerRegistration.close();
222             } catch (final Exception e) {
223                 LOG.error("Failed to stop inventory provider", e);
224             }
225             tableFeatureListenerRegistration = null;
226         }
227
228         if (thread != null) {
229             thread.interrupt();
230             thread.join();
231             thread = null;
232         }
233         if (txChain != null) {
234             try {
235                 txChain.close();
236             } catch (final IllegalStateException e) {
237                 // It is possible chain failed and was closed by #onTransactionChainFailed
238                 LOG.debug("Chain was already closed.");
239             }
240             txChain = null;
241         }
242     }
243
244     public boolean deviceDataDeleteAllowed(NodeId nodeId) {
245         Entity device = new Entity("openflow",nodeId.getValue());
246         Optional<EntityOwnershipState> entityOwnershipState =  eos.getOwnershipState(device);
247         if(entityOwnershipState.isPresent()){
248             EntityOwnershipState eState = entityOwnershipState.get();
249             if(eState.isOwner()) { return true; }
250
251             return !eState.hasOwner();
252         }
253         return true;
254     }
255 }