3312779c8ef1c98926ac3cb1d5bd558ad3dde016
[openflowplugin.git] / applications / topology-manager / src / main / java / org / opendaylight / openflowplugin / applications / topology / manager / OperationProcessor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.topology.manager;
9
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.LinkedBlockingQueue;
12 import javax.annotation.PostConstruct;
13 import javax.annotation.PreDestroy;
14 import javax.inject.Inject;
15 import javax.inject.Singleton;
16 import org.apache.aries.blueprint.annotation.service.Reference;
17 import org.opendaylight.mdsal.binding.api.DataBroker;
18 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 @Singleton
23 public final class OperationProcessor implements AutoCloseable, Runnable {
24     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
25     private static final int MAX_TRANSACTION_OPERATIONS = 100;
26     private static final int OPERATION_QUEUE_DEPTH = 500;
27     private static final String TOPOLOGY_MANAGER = "ofp-topo-processor";
28
29     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
30     private final Thread thread;
31     private final TransactionChainManager transactionChainManager;
32     private volatile boolean finishing = false;
33
34     @Inject
35     public OperationProcessor(@Reference final DataBroker dataBroker) {
36         transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_MANAGER);
37         transactionChainManager.activateTransactionManager();
38         transactionChainManager.initialSubmitWriteTransaction();
39
40         thread = new Thread(this);
41         thread.setDaemon(true);
42         thread.setName("ofp-topo-expo-" + FlowCapableTopologyProvider.TOPOLOGY_ID);
43     }
44
45     void enqueueOperation(final TopologyOperation task) {
46         try {
47             queue.put(task);
48         } catch (InterruptedException e) {
49             LOG.warn("Interrupted while submitting task {}", task, e);
50         }
51     }
52
53     @PostConstruct
54     public void start() {
55         thread.start();
56     }
57
58     @Override
59     public void run() {
60         while (!finishing) {
61             try {
62                 TopologyOperation op = queue.take();
63
64                 LOG.debug("New {} operation available, starting transaction", op);
65
66                 int ops = 0;
67                 do {
68                     op.applyOperation(transactionChainManager);
69
70                     ops++;
71                     if (ops < MAX_TRANSACTION_OPERATIONS) {
72                         op = queue.poll();
73                     } else {
74                         op = null;
75                     }
76
77                     LOG.debug("Next operation {}", op);
78                 } while (op != null);
79
80                 LOG.debug("Processed {} operations, submitting transaction", ops);
81                 if (!transactionChainManager.submitTransaction()) {
82                     cleanDataStoreOperQueue();
83                 }
84             } catch (final InterruptedException e) {
85                 // This should mean we're shutting down.
86                 LOG.debug("Stat Manager DS Operation thread interrupted!", e);
87                 finishing = true;
88             }
89         }
90         // Drain all events, making sure any blocked threads are unblocked
91         cleanDataStoreOperQueue();
92     }
93
94     private void cleanDataStoreOperQueue() {
95         while (!queue.isEmpty()) {
96             queue.poll();
97         }
98     }
99
100     @Override
101     @PreDestroy
102     public void close() {
103         thread.interrupt();
104         try {
105             thread.join();
106         } catch (InterruptedException e) {
107             LOG.debug("Join of thread {} was interrupted", thread.getName(), e);
108         }
109
110         transactionChainManager.close();
111
112         LOG.debug("OperationProcessor closed");
113     }
114 }