Merge "BUG 1839 - HTTP delete of non existing data"
[controller.git] / opendaylight / md-sal / topology-manager / src / main / java / org / opendaylight / md / controller / topology / manager / OperationProcessor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.md.controller.topology.manager;
9
10 import com.google.common.base.Preconditions;
11 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
12 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
13 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
14 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
15 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
16 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.LinkedBlockingQueue;
23
24 final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
25     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
26     private static final int MAX_TRANSACTION_OPERATIONS = 100;
27     private static final int OPERATION_QUEUE_DEPTH = 500;
28
29     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
30     private final DataBroker dataBroker;
31     private BindingTransactionChain transactionChain;
32     private volatile boolean finishing = false;
33
34     OperationProcessor(final DataBroker dataBroker) {
35         this.dataBroker = Preconditions.checkNotNull(dataBroker);
36         transactionChain = this.dataBroker.createTransactionChain(this);
37     }
38
39     void enqueueOperation(final TopologyOperation task) {
40         try {
41             queue.put(task);
42         } catch (InterruptedException e) {
43             LOG.warn("Interrupted while submitting task {}", task, e);
44         }
45     }
46
47     @Override
48     public void run() {
49             while (!finishing) {
50                 try {
51                     TopologyOperation op = queue.take();
52
53                     LOG.debug("New {} operation available, starting transaction", op);
54
55                     final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
56
57                     int ops = 0;
58                     do {
59                         op.applyOperation(tx);
60
61                         ops++;
62                         if (ops < MAX_TRANSACTION_OPERATIONS) {
63                             op = queue.poll();
64                         } else {
65                             op = null;
66                         }
67
68                         LOG.debug("Next operation {}", op);
69                     } while (op != null);
70
71                     LOG.debug("Processed {} operations, submitting transaction", ops);
72
73                     try {
74                         tx.submit().checkedGet();
75                     } catch (final TransactionCommitFailedException e) {
76                         LOG.warn("Stat DataStoreOperation unexpected State!", e);
77                         transactionChain.close();
78                         transactionChain = dataBroker.createTransactionChain(this);
79                         cleanDataStoreOperQueue();
80                     }
81
82                 } catch (final IllegalStateException e) {
83                     LOG.warn("Stat DataStoreOperation unexpected State!", e);
84                     transactionChain.close();
85                     transactionChain = dataBroker.createTransactionChain(this);
86                     cleanDataStoreOperQueue();
87                 } catch (final InterruptedException e) {
88                     LOG.warn("Stat Manager DS Operation thread interupted!", e);
89                     finishing = true;
90                 } catch (final Exception e) {
91                     LOG.warn("Stat DataStore Operation executor fail!", e);
92                 }
93             }
94         // Drain all events, making sure any blocked threads are unblocked
95         cleanDataStoreOperQueue();
96     }
97
98     private void cleanDataStoreOperQueue() {
99         while (!queue.isEmpty()) {
100             queue.poll();
101         }
102     }
103
104     @Override
105     public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
106         LOG.error("Failed to export Topology manager operations, Transaction {} failed.", transaction.getIdentifier(), cause);
107         transactionChain.close();
108         transactionChain = dataBroker.createTransactionChain(this);
109         cleanDataStoreOperQueue();
110     }
111
112     @Override
113     public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
114         //NOOP
115     }
116
117     @Override
118     public void close() throws Exception {
119         if (transactionChain != null) {
120             transactionChain.close();
121         }
122
123     }
124 }