BUG:5888 - Changing FRM from clustered DCN to clustered DTCN
[openflowplugin.git] / applications / topology-manager / src / main / java / org / opendaylight / openflowplugin / applications / topology / manager / OperationProcessor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.topology.manager;
9
10 import com.google.common.base.Preconditions;
11 import java.util.concurrent.BlockingQueue;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
24     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
25     private static final int MAX_TRANSACTION_OPERATIONS = 100;
26     private static final int OPERATION_QUEUE_DEPTH = 500;
27
28     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
29     private final DataBroker dataBroker;
30     private BindingTransactionChain transactionChain;
31     private volatile boolean finishing = false;
32
33     OperationProcessor(final DataBroker dataBroker) {
34         this.dataBroker = Preconditions.checkNotNull(dataBroker);
35         transactionChain = this.dataBroker.createTransactionChain(this);
36     }
37
38     void enqueueOperation(final TopologyOperation task) {
39         try {
40             queue.put(task);
41         } catch (InterruptedException e) {
42             LOG.warn("Interrupted while submitting task {}", task, e);
43         }
44     }
45
46     @Override
47     public void run() {
48             while (!finishing) {
49                 try {
50                     TopologyOperation op = queue.take();
51
52                     LOG.debug("New {} operation available, starting transaction", op);
53
54                     final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
55
56                     int ops = 0;
57                     do {
58                         op.applyOperation(tx);
59
60                         ops++;
61                         if (ops < MAX_TRANSACTION_OPERATIONS) {
62                             op = queue.poll();
63                         } else {
64                             op = null;
65                         }
66
67                         LOG.debug("Next operation {}", op);
68                     } while (op != null);
69
70                     LOG.debug("Processed {} operations, submitting transaction", ops);
71
72                     try {
73                         tx.submit().checkedGet();
74                     } catch (final TransactionCommitFailedException e) {
75                         LOG.warn("Stat DataStoreOperation unexpected State!", e);
76                         transactionChain.close();
77                         transactionChain = dataBroker.createTransactionChain(this);
78                         cleanDataStoreOperQueue();
79                     }
80
81                 } catch (final IllegalStateException e) {
82                     LOG.warn("Stat DataStoreOperation unexpected State!", e);
83                     transactionChain.close();
84                     transactionChain = dataBroker.createTransactionChain(this);
85                     cleanDataStoreOperQueue();
86                 } catch (final InterruptedException e) {
87                     LOG.warn("Stat Manager DS Operation thread interupted!", e);
88                     finishing = true;
89                 } catch (final Exception e) {
90                     LOG.warn("Stat DataStore Operation executor fail!", e);
91                 }
92             }
93         // Drain all events, making sure any blocked threads are unblocked
94         cleanDataStoreOperQueue();
95     }
96
97     private void cleanDataStoreOperQueue() {
98         while (!queue.isEmpty()) {
99             queue.poll();
100         }
101     }
102
103     @Override
104     public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
105         LOG.warn("Failed to export Topology manager operations, Transaction {} failed: {}", transaction.getIdentifier(), cause.getMessage());
106         LOG.debug("Failed to export Topology manager operations.. ", cause);
107         transactionChain.close();
108         transactionChain = dataBroker.createTransactionChain(this);
109         cleanDataStoreOperQueue();
110     }
111
112     @Override
113     public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
114         //NOOP
115     }
116
117     @Override
118     public void close() throws Exception {
119         if (transactionChain != null) {
120             transactionChain.close();
121         }
122
123     }
124 }