Merge "Bug 2103: Make sure InventoryManager survives Transaction failure."
[controller.git] / opendaylight / md-sal / inventory-manager / src / main / java / org / opendaylight / controller / md / inventory / manager / FlowCapableInventoryProvider.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.inventory.manager;
9
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.LinkedBlockingDeque;
12
13 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
20 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import com.google.common.base.Preconditions;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.google.common.util.concurrent.FutureCallback;
28 import com.google.common.util.concurrent.Futures;
29
30 class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
31     private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
32     private static final int QUEUE_DEPTH = 500;
33     private static final int MAX_BATCH = 100;
34
35     private final BlockingQueue<InventoryOperation> queue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
36     private final NotificationProviderService notificationService;
37
38     private final DataBroker dataBroker;
39     private BindingTransactionChain txChain;
40     private ListenerRegistration<?> listenerRegistration;
41     private Thread thread;
42
43     FlowCapableInventoryProvider(final DataBroker dataBroker, final NotificationProviderService notificationService) {
44         this.dataBroker = Preconditions.checkNotNull(dataBroker);
45         this.notificationService = Preconditions.checkNotNull(notificationService);
46     }
47
48     void start() {
49         final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
50         this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
51
52         this.txChain = dataBroker.createTransactionChain(this);
53         thread = new Thread(this);
54         thread.setDaemon(true);
55         thread.setName("FlowCapableInventoryProvider");
56         thread.start();
57
58         LOG.info("Flow Capable Inventory Provider started.");
59     }
60
61     void enqueue(final InventoryOperation op) {
62         try {
63             queue.put(op);
64         } catch (final InterruptedException e) {
65             LOG.warn("Failed to enqueue operation {}", op, e);
66         }
67     }
68
69     @Override
70     public void close() throws InterruptedException {
71         LOG.info("Flow Capable Inventory Provider stopped.");
72         if (this.listenerRegistration != null) {
73             try {
74                 this.listenerRegistration.close();
75             } catch (final Exception e) {
76                 LOG.error("Failed to stop inventory provider", e);
77             }
78             listenerRegistration = null;
79         }
80
81         if (thread != null) {
82             thread.interrupt();
83             thread.join();
84             thread = null;
85         }
86         if (txChain != null) {
87             txChain.close();
88             txChain = null;
89         }
90
91
92     }
93
94     @Override
95     public void run() {
96         try {
97             for (; ; ) {
98                 InventoryOperation op = queue.take();
99
100                 ReadWriteTransaction tx;
101                 try {
102                     tx = txChain.newReadWriteTransaction();
103                 } catch (final IllegalStateException e) {
104                     txChain.close();
105                     txChain = dataBroker.createTransactionChain(this);
106                     tx = txChain.newReadWriteTransaction();
107                 }
108                 LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
109
110                 int ops = 0;
111                 do {
112                     op.applyOperation(tx);
113
114                     ops++;
115                     if (ops < MAX_BATCH) {
116                         op = queue.poll();
117                     } else {
118                         op = null;
119                     }
120                 } while (op != null);
121
122                 LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
123
124                 final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
125                 final Object ident = tx.getIdentifier();
126                 Futures.addCallback(result, new FutureCallback<Void>() {
127                     @Override
128                     public void onSuccess(final Void aVoid) {
129                         //NOOP
130                     }
131
132                     @Override
133                     public void onFailure(final Throwable throwable) {
134                         LOG.error("Transaction {} failed.", ident, throwable);
135                     }
136                 });
137             }
138         } catch (final InterruptedException e) {
139             LOG.info("Processing interrupted, terminating", e);
140         }
141
142         // Drain all events, making sure any blocked threads are unblocked
143         while (!queue.isEmpty()) {
144             queue.poll();
145         }
146     }
147
148     @Override
149     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
150                                          final Throwable cause) {
151         LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
152
153     }
154
155     @Override
156     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
157         // NOOP
158     }
159 }