2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.topology.manager;
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.LinkedBlockingQueue;
12 import javax.annotation.PreDestroy;
13 import javax.inject.Inject;
14 import javax.inject.Singleton;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
17 import org.osgi.service.component.annotations.Activate;
18 import org.osgi.service.component.annotations.Component;
19 import org.osgi.service.component.annotations.Deactivate;
20 import org.osgi.service.component.annotations.Reference;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
25 @Component(service = OperationProcessor.class)
26 public final class OperationProcessor implements AutoCloseable, Runnable {
27 private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
28 private static final int MAX_TRANSACTION_OPERATIONS = 100;
29 private static final int OPERATION_QUEUE_DEPTH = 500;
30 private static final String TOPOLOGY_MANAGER = "ofp-topo-processor";
32 private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
33 private final Thread thread;
34 private final TransactionChainManager transactionChainManager;
35 private volatile boolean finishing = false;
39 public OperationProcessor(@Reference final DataBroker dataBroker) {
40 transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_MANAGER);
41 transactionChainManager.activateTransactionManager();
42 transactionChainManager.initialSubmitWriteTransaction();
44 thread = new Thread(this);
45 thread.setDaemon(true);
46 thread.setName("ofp-topo-expo-" + FlowCapableTopologyProvider.TOPOLOGY_ID);
48 LOG.debug("OperationProcessor started");
51 void enqueueOperation(final TopologyOperation task) {
54 } catch (InterruptedException e) {
55 LOG.warn("Interrupted while submitting task {}", task, e);
63 TopologyOperation op = queue.take();
65 LOG.debug("New {} operation available, starting transaction", op);
69 op.applyOperation(transactionChainManager);
72 if (ops < MAX_TRANSACTION_OPERATIONS) {
78 LOG.debug("Next operation {}", op);
81 LOG.debug("Processed {} operations, submitting transaction", ops);
82 if (!transactionChainManager.submitTransaction()) {
83 cleanDataStoreOperQueue();
85 } catch (final InterruptedException e) {
86 // This should mean we're shutting down.
87 LOG.debug("Stat Manager DS Operation thread interrupted!", e);
91 // Drain all events, making sure any blocked threads are unblocked
92 cleanDataStoreOperQueue();
95 private void cleanDataStoreOperQueue() {
96 while (!queue.isEmpty()) {
104 public void close() {
108 } catch (InterruptedException e) {
109 LOG.debug("Join of thread {} was interrupted", thread.getName(), e);
112 transactionChainManager.close();
113 LOG.debug("OperationProcessor stopped");