Merge "Removed getClass comparison if FlowComparator."
[controller.git] / opendaylight / md-sal / inventory-manager / src / main / java / org / opendaylight / controller / md / inventory / manager / FlowCapableInventoryProvider.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.inventory.manager;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.LinkedBlockingDeque;
16 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
22 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
23 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
24 import org.opendaylight.yangtools.concepts.ListenerRegistration;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
29     private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
30     private static final int QUEUE_DEPTH = 500;
31     private static final int MAX_BATCH = 100;
32
33     private final BlockingQueue<InventoryOperation> queue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
34     private final NotificationProviderService notificationService;
35
36     private final DataBroker dataBroker;
37     private BindingTransactionChain txChain;
38     private ListenerRegistration<?> listenerRegistration;
39     private Thread thread;
40
41     FlowCapableInventoryProvider(final DataBroker dataBroker, final NotificationProviderService notificationService) {
42         this.dataBroker = Preconditions.checkNotNull(dataBroker);
43         this.notificationService = Preconditions.checkNotNull(notificationService);
44     }
45
46     void start() {
47         final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
48         this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
49
50         this.txChain =  dataBroker.createTransactionChain(this);
51         thread = new Thread(this);
52         thread.setDaemon(true);
53         thread.setName("FlowCapableInventoryProvider");
54         thread.start();
55
56         LOG.info("Flow Capable Inventory Provider started.");
57     }
58
59     void enqueue(final InventoryOperation op) {
60         try {
61             queue.put(op);
62         } catch (InterruptedException e) {
63             LOG.warn("Failed to enqueue operation {}", op, e);
64         }
65     }
66
67     @Override
68     public void close() throws InterruptedException {
69         LOG.info("Flow Capable Inventory Provider stopped.");
70         if (this.listenerRegistration != null) {
71             try {
72                 this.listenerRegistration.close();
73             } catch (Exception e) {
74                 LOG.error("Failed to stop inventory provider", e);
75             }
76             listenerRegistration = null;
77         }
78
79         if (thread != null) {
80             thread.interrupt();
81             thread.join();
82             thread = null;
83         }
84         if(txChain != null) {
85             txChain.close();
86             txChain = null;
87         }
88
89
90     }
91
92     @Override
93     public void run() {
94         try {
95             for (; ; ) {
96                 InventoryOperation op = queue.take();
97
98                 final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
99                 LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
100
101                 int ops = 0;
102                 do {
103                     op.applyOperation(tx);
104
105                     ops++;
106                     if (ops < MAX_BATCH) {
107                         op = queue.poll();
108                     } else {
109                         op = null;
110                     }
111                 } while (op != null);
112
113                 LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
114
115                 final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
116                 Futures.addCallback(result, new FutureCallback<Void>() {
117                     @Override
118                     public void onSuccess(final Void aVoid) {
119                         //NOOP
120                     }
121
122                     @Override
123                     public void onFailure(final Throwable throwable) {
124                         LOG.error("Transaction {} failed.", tx.getIdentifier(), throwable);
125                     }
126                 });
127             }
128         } catch (InterruptedException e) {
129             LOG.info("Processing interrupted, terminating", e);
130         }
131
132         // Drain all events, making sure any blocked threads are unblocked
133         while (!queue.isEmpty()) {
134             queue.poll();
135         }
136     }
137
138     @Override
139     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
140             final Throwable cause) {
141         LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
142
143     }
144
145     @Override
146     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
147         // NOOP
148     }
149 }