+ /**
+ * Starts new empty transaction, custimizes it with submitted operations
+ * and submit it to data broker.
+ *
+ * If transaction chain failed during customization of transaction
+ * it allocates new chain and empty transaction and customizes it
+ * with submitted operations.
+ *
+ * This does not retry failed transaction. It only retries it when
+ * chain failed during customization of transaction chain.
+ *
+ * @param opsToApply
+ */
+ private void submitOperations(final ArrayList<InventoryOperation> opsToApply) {
+ final ReadWriteTransaction tx = createCustomizedTransaction(opsToApply);
+ LOG.debug("Processed {} operations, submitting transaction {}", opsToApply.size(), tx.getIdentifier());
+ try {
+ tx.submit();
+ } catch (final IllegalStateException e) {
+ /*
+ * Transaction chain failed during doing batch, so we need to null
+ * tx chain and continue processing queue.
+ *
+ * We fail current txChain which was allocated with createTransaction.
+ */
+ failCurrentChain(txChain);
+ /*
+ * We will retry transaction once in order to not loose any data.
+ *
+ */
+ final ReadWriteTransaction retryTx = createCustomizedTransaction(opsToApply);
+ retryTx.submit();
+ }
+ }
+
+ /**
+ * Creates new empty ReadWriteTransaction. If transaction chain
+ * was failed, it will allocate new transaction chain
+ * and assign it with this Operation Executor.
+ *
+ * This call is synchronized to prevent reace with {@link #failCurrentChain(TransactionChain)}.
+ *
+ * @return New Empty ReadWrite transaction, which continues this chain or starts new transaction
+ * chain.
+ */
+ private synchronized ReadWriteTransaction newEmptyTransaction() {
+ try {
+ if(txChain == null) {
+ // Chain was broken so we need to replace it.
+ txChain = dataBroker.createTransactionChain(this);
+ }
+ return txChain.newReadWriteTransaction();
+ } catch (final IllegalStateException e) {
+ LOG.debug("Chain is broken, need to allocate new transaction chain.",e);
+ /*
+ * Chain was broken by previous transaction,
+ * but there was race between this.
+ * Chain will be closed by #onTransactionChainFailed method.
+ */
+ txChain = dataBroker.createTransactionChain(this);
+ return txChain.newReadWriteTransaction();
+ }
+ }
+
+ /**
+ * Creates customized not-submitted transaction, which is ready to be submitted.
+ *
+ * @param opsToApply Operations which are used to customize transaction.
+ * @return Non-empty transaction.
+ */
+ private ReadWriteTransaction createCustomizedTransaction(final ArrayList<InventoryOperation> opsToApply) {
+ final ReadWriteTransaction tx = newEmptyTransaction();
+ for(final InventoryOperation op : opsToApply) {
+ op.applyOperation(tx);
+ }
+ return tx;
+ }
+
+ private synchronized void failCurrentChain(final TransactionChain<?, ?> chain) {
+ if(txChain == chain) {
+ txChain = null;
+ }
+ }
+