- public void setNotificationService(
- final NotificationProviderService notificationService) {
- this.notificationService = notificationService;
+ @Override
+ public void run() {
+ try {
+ for (; ; ) {
+ InventoryOperation op = queue.take();
+
+ final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
+ LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
+
+ int ops = 0;
+ do {
+ op.applyOperation(tx);
+
+ ops++;
+ if (ops < MAX_BATCH) {
+ op = queue.poll();
+ } else {
+ op = null;
+ }
+ } while (op != null);
+
+ LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
+
+ final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
+ Futures.addCallback(result, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void aVoid) {
+ //NOOP
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.error("Transaction {} failed.", tx.getIdentifier(), throwable);
+ }
+ });
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Processing interrupted, terminating", e);
+ }
+
+ // Drain all events, making sure any blocked threads are unblocked
+ while (!queue.isEmpty()) {
+ queue.poll();
+ }