Merge "Bug 1309 - Cannot publish LinkDiscovered event"
[controller.git] / opendaylight / md-sal / topology-manager / src / main / java / org / opendaylight / md / controller / topology / manager / OperationProcessor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.md.controller.topology.manager;
9
10 import com.google.common.base.Preconditions;
11 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
12 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
13 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
14 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
15 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
16 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.LinkedBlockingQueue;
23
24 final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
25     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
26     private static final int MAX_TRANSACTION_OPERATIONS = 100;
27     private static final int OPERATION_QUEUE_DEPTH = 500;
28
29     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
30     private final DataBroker dataBroker;
31     private BindingTransactionChain transactionChain;
32
33     OperationProcessor(final DataBroker dataBroker) {
34         this.dataBroker = Preconditions.checkNotNull(dataBroker);
35         transactionChain = this.dataBroker.createTransactionChain(this);
36     }
37
38     void enqueueOperation(final TopologyOperation task) {
39         try {
40             queue.put(task);
41         } catch (InterruptedException e) {
42             LOG.warn("Interrupted while submitting task {}", task, e);
43         }
44     }
45
46     @Override
47     public void run() {
48         try {
49             for (; ; ) {
50                 TopologyOperation op = queue.take();
51
52                 LOG.debug("New {} operation available, starting transaction", op);
53
54                 final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
55
56                 int ops = 0;
57                 do {
58                     op.applyOperation(tx);
59
60                     ops++;
61                     if (ops < MAX_TRANSACTION_OPERATIONS) {
62                         op = queue.poll();
63                     } else {
64                         op = null;
65                     }
66
67                     LOG.debug("Next operation {}", op);
68                 } while (op != null);
69
70                 LOG.debug("Processed {} operations, submitting transaction", ops);
71
72                 try {
73                     tx.submit().checkedGet();
74                 } catch (final TransactionCommitFailedException e) {
75                     LOG.warn("Stat DataStoreOperation unexpected State!", e);
76                     transactionChain.close();
77                     transactionChain = dataBroker.createTransactionChain(this);
78                     cleanDataStoreOperQueue();
79                 }
80             }
81         } catch (final IllegalStateException e) {
82             LOG.warn("Stat DataStoreOperation unexpected State!", e);
83             transactionChain.close();
84             transactionChain = dataBroker.createTransactionChain(this);
85             cleanDataStoreOperQueue();
86         } catch (final InterruptedException e) {
87             LOG.warn("Stat Manager DS Operation thread interupted!", e);
88         } catch (final Exception e) {
89             LOG.warn("Stat DataStore Operation executor fail!", e);
90         }
91
92         // Drain all events, making sure any blocked threads are unblocked
93         cleanDataStoreOperQueue();
94
95     }
96
97     private void cleanDataStoreOperQueue() {
98         // Drain all events, making sure any blocked threads are unblocked
99         while (!queue.isEmpty()) {
100             queue.poll();
101         }
102     }
103
104     @Override
105     public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
106         LOG.error("Failed to export Topology manager operations, Transaction {} failed.", transaction.getIdentifier(), cause);
107     }
108
109     @Override
110     public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
111         //NOOP
112     }
113
114     @Override
115     public void close() throws Exception {
116         if (transactionChain != null) {
117             transactionChain.close();
118         }
119
120     }
121 }