import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
+public final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
private static final int MAX_TRANSACTION_OPERATIONS = 100;
private static final int OPERATION_QUEUE_DEPTH = 500;
private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
private final DataBroker dataBroker;
+ private final Thread thread;
private BindingTransactionChain transactionChain;
private volatile boolean finishing = false;
- OperationProcessor(final DataBroker dataBroker) {
+ public OperationProcessor(final DataBroker dataBroker) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
transactionChain = this.dataBroker.createTransactionChain(this);
+
+ thread = new Thread(this);
+ thread.setDaemon(true);
+ thread.setName("FlowCapableTopologyExporter-" + FlowCapableTopologyProvider.TOPOLOGY_ID);
}
void enqueueOperation(final TopologyOperation task) {
}
}
+ public void start() {
+ thread.start();
+ }
+
@Override
public void run() {
while (!finishing) {
} while (op != null);
LOG.debug("Processed {} operations, submitting transaction", ops);
-
- try {
- tx.submit().checkedGet();
- } catch (final TransactionCommitFailedException e) {
- LOG.warn("Stat DataStoreOperation unexpected State!", e);
- transactionChain.close();
- transactionChain = dataBroker.createTransactionChain(this);
- cleanDataStoreOperQueue();
- }
-
+ submitTransaction(tx);
} catch (final IllegalStateException e) {
LOG.warn("Stat DataStoreOperation unexpected State!", e);
transactionChain.close();
transactionChain = dataBroker.createTransactionChain(this);
cleanDataStoreOperQueue();
} catch (final InterruptedException e) {
- LOG.warn("Stat Manager DS Operation thread interupted!", e);
+ // This should mean we're shutting down.
+ LOG.debug("Stat Manager DS Operation thread interrupted!", e);
finishing = true;
} catch (final Exception e) {
LOG.warn("Stat DataStore Operation executor fail!", e);
cleanDataStoreOperQueue();
}
+ private void submitTransaction(ReadWriteTransaction tx) {
+ try {
+ tx.submit().checkedGet();
+ } catch (final TransactionCommitFailedException e) {
+ LOG.warn("Stat DataStoreOperation unexpected State!", e);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
+ }
+ }
+
private void cleanDataStoreOperQueue() {
while (!queue.isEmpty()) {
queue.poll();
}
@Override
- public void close() throws Exception {
+ public void close() {
+ thread.interrupt();
+ try {
+ thread.join();
+ } catch(InterruptedException e) {
+ LOG.debug("Join of thread {} was interrupted", thread.getName(), e);
+ }
+
if (transactionChain != null) {
transactionChain.close();
}
+ LOG.debug("OperationProcessor closed");
}
}