Migrate topology-manager to OSGi DS
[openflowplugin.git] / applications / topology-manager / src / main / java / org / opendaylight / openflowplugin / applications / topology / manager / OperationProcessor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.topology.manager;
9
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.LinkedBlockingQueue;
12 import javax.annotation.PreDestroy;
13 import javax.inject.Inject;
14 import javax.inject.Singleton;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
17 import org.osgi.service.component.annotations.Activate;
18 import org.osgi.service.component.annotations.Component;
19 import org.osgi.service.component.annotations.Deactivate;
20 import org.osgi.service.component.annotations.Reference;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 @Singleton
25 @Component(service = OperationProcessor.class)
26 public final class OperationProcessor implements AutoCloseable, Runnable {
27     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
28     private static final int MAX_TRANSACTION_OPERATIONS = 100;
29     private static final int OPERATION_QUEUE_DEPTH = 500;
30     private static final String TOPOLOGY_MANAGER = "ofp-topo-processor";
31
32     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
33     private final Thread thread;
34     private final TransactionChainManager transactionChainManager;
35     private volatile boolean finishing = false;
36
37     @Inject
38     @Activate
39     public OperationProcessor(@Reference final DataBroker dataBroker) {
40         transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_MANAGER);
41         transactionChainManager.activateTransactionManager();
42         transactionChainManager.initialSubmitWriteTransaction();
43
44         thread = new Thread(this);
45         thread.setDaemon(true);
46         thread.setName("ofp-topo-expo-" + FlowCapableTopologyProvider.TOPOLOGY_ID);
47         thread.start();
48         LOG.debug("OperationProcessor started");
49     }
50
51     void enqueueOperation(final TopologyOperation task) {
52         try {
53             queue.put(task);
54         } catch (InterruptedException e) {
55             LOG.warn("Interrupted while submitting task {}", task, e);
56         }
57     }
58
59     @Override
60     public void run() {
61         while (!finishing) {
62             try {
63                 TopologyOperation op = queue.take();
64
65                 LOG.debug("New {} operation available, starting transaction", op);
66
67                 int ops = 0;
68                 do {
69                     op.applyOperation(transactionChainManager);
70
71                     ops++;
72                     if (ops < MAX_TRANSACTION_OPERATIONS) {
73                         op = queue.poll();
74                     } else {
75                         op = null;
76                     }
77
78                     LOG.debug("Next operation {}", op);
79                 } while (op != null);
80
81                 LOG.debug("Processed {} operations, submitting transaction", ops);
82                 if (!transactionChainManager.submitTransaction()) {
83                     cleanDataStoreOperQueue();
84                 }
85             } catch (final InterruptedException e) {
86                 // This should mean we're shutting down.
87                 LOG.debug("Stat Manager DS Operation thread interrupted!", e);
88                 finishing = true;
89             }
90         }
91         // Drain all events, making sure any blocked threads are unblocked
92         cleanDataStoreOperQueue();
93     }
94
95     private void cleanDataStoreOperQueue() {
96         while (!queue.isEmpty()) {
97             queue.poll();
98         }
99     }
100
101     @PreDestroy
102     @Deactivate
103     @Override
104     public void close() {
105         thread.interrupt();
106         try {
107             thread.join();
108         } catch (InterruptedException e) {
109             LOG.debug("Join of thread {} was interrupted", thread.getName(), e);
110         }
111
112         transactionChainManager.close();
113         LOG.debug("OperationProcessor stopped");
114     }
115 }