6890544f36c9a3105726bfb8f71de208019e566c
[openflowplugin.git] / applications / topology-manager / src / main / java / org / opendaylight / openflowplugin / applications / topology / manager / OperationProcessor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.topology.manager;
9
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.LinkedBlockingQueue;
12 import javax.annotation.PostConstruct;
13 import javax.annotation.PreDestroy;
14 import javax.inject.Inject;
15 import javax.inject.Singleton;
16 import org.opendaylight.mdsal.binding.api.DataBroker;
17 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 @Singleton
22 public final class OperationProcessor implements AutoCloseable, Runnable {
23     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
24     private static final int MAX_TRANSACTION_OPERATIONS = 100;
25     private static final int OPERATION_QUEUE_DEPTH = 500;
26     private static final String TOPOLOGY_MANAGER = "ofp-topo-processor";
27
28     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
29     private final Thread thread;
30     private final TransactionChainManager transactionChainManager;
31     private volatile boolean finishing = false;
32
33     @Inject
34     public OperationProcessor(final DataBroker dataBroker) {
35         transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_MANAGER);
36         transactionChainManager.activateTransactionManager();
37         transactionChainManager.initialSubmitWriteTransaction();
38
39         thread = new Thread(this);
40         thread.setDaemon(true);
41         thread.setName("ofp-topo-expo-" + FlowCapableTopologyProvider.TOPOLOGY_ID);
42     }
43
44     void enqueueOperation(final TopologyOperation task) {
45         try {
46             queue.put(task);
47         } catch (InterruptedException e) {
48             LOG.warn("Interrupted while submitting task {}", task, e);
49         }
50     }
51
52     @PostConstruct
53     public void start() {
54         thread.start();
55     }
56
57     @Override
58     public void run() {
59         while (!finishing) {
60             try {
61                 TopologyOperation op = queue.take();
62
63                 LOG.debug("New {} operation available, starting transaction", op);
64
65                 int ops = 0;
66                 do {
67                     op.applyOperation(transactionChainManager);
68
69                     ops++;
70                     if (ops < MAX_TRANSACTION_OPERATIONS) {
71                         op = queue.poll();
72                     } else {
73                         op = null;
74                     }
75
76                     LOG.debug("Next operation {}", op);
77                 } while (op != null);
78
79                 LOG.debug("Processed {} operations, submitting transaction", ops);
80                 if (!transactionChainManager.submitTransaction()) {
81                     cleanDataStoreOperQueue();
82                 }
83             } catch (final InterruptedException e) {
84                 // This should mean we're shutting down.
85                 LOG.debug("Stat Manager DS Operation thread interrupted!", e);
86                 finishing = true;
87             }
88         }
89         // Drain all events, making sure any blocked threads are unblocked
90         cleanDataStoreOperQueue();
91     }
92
93     private void cleanDataStoreOperQueue() {
94         while (!queue.isEmpty()) {
95             queue.poll();
96         }
97     }
98
99     @Override
100     @PreDestroy
101     public void close() {
102         thread.interrupt();
103         try {
104             thread.join();
105         } catch (InterruptedException e) {
106             LOG.debug("Join of thread {} was interrupted", thread.getName(), e);
107         }
108
109         transactionChainManager.close();
110
111         LOG.debug("OperationProcessor closed");
112     }
113 }