import com.google.common.util.concurrent.Futures;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
+class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
private static final int QUEUE_DEPTH = 500;
private static final int MAX_BATCH = 100;
private final NotificationProviderService notificationService;
private final DataBroker dataBroker;
+ private BindingTransactionChain txChain;
private ListenerRegistration<?> listenerRegistration;
private Thread thread;
final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
+ this.txChain = dataBroker.createTransactionChain(this);
thread = new Thread(this);
thread.setDaemon(true);
thread.setName("FlowCapableInventoryProvider");
thread.join();
thread = null;
}
+ if(txChain != null) {
+ txChain.close();
+ txChain = null;
+ }
}
for (; ; ) {
InventoryOperation op = queue.take();
- final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
+ final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
int ops = 0;
final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
Futures.addCallback(result, new FutureCallback<Void>() {
@Override
- public void onSuccess(Void aVoid) {
+ public void onSuccess(final Void aVoid) {
//NOOP
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.error("Transaction {} failed.", tx.getIdentifier(), throwable);
}
});
queue.poll();
}
}
+
+ @Override
+ public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
+ final Throwable cause) {
+ LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
+
+ }
+
+ @Override
+ public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+ // NOOP
+ }
}