- public void setNotificationService(
- final NotificationProviderService notificationService) {
- this.notificationService = notificationService;
+ @Override
+ public void run() {
+ try {
+ for (;;) {
+ InventoryOperation op = queue.take();
+
+ final DataModificationTransaction tx = dataService.beginTransaction();
+ LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
+
+ int ops = 0;
+ do {
+ op.applyOperation(tx);
+
+ ops++;
+ if (ops < MAX_BATCH) {
+ op = queue.poll();
+ } else {
+ op = null;
+ }
+ } while (op != null);
+
+ LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
+
+ try {
+ final RpcResult<TransactionStatus> result = tx.commit().get();
+ if(!result.isSuccessful()) {
+ LOG.error("Transaction {} failed", tx.getIdentifier());
+ }
+ } catch (ExecutionException e) {
+ LOG.warn("Failed to commit inventory change", e.getCause());
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Processing interrupted, terminating", e);
+ }
+
+ // Drain all events, making sure any blocked threads are unblocked
+ while (!queue.isEmpty()) {
+ queue.poll();
+ }