+ @Override
+ public void run() {
+ try {
+ for (; ; ) {
+ InventoryOperation op = queue.take();
+
+ ReadWriteTransaction tx;
+ try {
+ tx = txChain.newReadWriteTransaction();
+ } catch (final IllegalStateException e) {
+ txChain.close();
+ txChain = dataBroker.createTransactionChain(this);
+ tx = txChain.newReadWriteTransaction();
+ }
+ LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
+
+ int ops = 0;
+ do {
+ op.applyOperation(tx);
+
+ ops++;
+ if (ops < MAX_BATCH) {
+ op = queue.poll();
+ } else {
+ op = null;
+ }
+ } while (op != null);
+
+ LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
+
+ final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
+ final Object ident = tx.getIdentifier();
+ Futures.addCallback(result, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void aVoid) {
+ //NOOP
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.error("Transaction {} failed.", ident, throwable);
+ }
+ });
+ }
+ } catch (final InterruptedException e) {
+ LOG.info("Processing interrupted, terminating", e);
+ }
+
+ // Drain all events, making sure any blocked threads are unblocked
+ while (!queue.isEmpty()) {
+ queue.poll();
+ }