final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
- this.txChain = dataBroker.createTransactionChain(this);
+ this.txChain = dataBroker.createTransactionChain(this);
thread = new Thread(this);
thread.setDaemon(true);
thread.setName("FlowCapableInventoryProvider");
thread.join();
thread = null;
}
- if(txChain != null) {
+ if (txChain != null) {
txChain.close();
txChain = null;
}
@Override
public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
- final Throwable cause) {
- LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
+ final Throwable cause) {
+ LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
}
import com.google.common.util.concurrent.Futures;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class OperationProcessor implements Runnable {
+final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
private static final int MAX_TRANSACTION_OPERATIONS = 100;
private static final int OPERATION_QUEUE_DEPTH = 500;
private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
private final DataBroker dataBroker;
+ private final BindingTransactionChain transactionChain;
OperationProcessor(final DataBroker dataBroker) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
+ transactionChain = this.dataBroker.createTransactionChain(this);
}
void enqueueOperation(final TopologyOperation task) {
TopologyOperation op = queue.take();
LOG.debug("New operations available, starting transaction");
- final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
+ final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
+
int ops = 0;
do {
queue.poll();
}
}
+
+ @Override
+ public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
+ LOG.error("Failed to export Topology manager operations, Transaction {} failed.", transaction.getIdentifier(), cause);
+ }
+
+ @Override
+ public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+ //NOOP
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (transactionChain != null) {
+ transactionChain.close();
+ }
+
+ }
}