tx.submit().checkedGet(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final Exception e) {
LOG.warn("Unable to fill the initial item list.", e);
-
- try {
- itemProducer.close();
- } catch (final DOMDataTreeProducerException exception) {
- LOG.warn("Failure while closing producer.", exception);
- }
+ closeProducer(itemProducer);
return Futures.immediateFuture(RpcResultBuilder.<ProduceTransactionsOutput>failed()
.withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
final ProduceTransactionsHandler handler = new ProduceTransactionsHandler(itemProducer,
new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(list.getIdentifier())
.toOptimized()), input);
+ // It is handler's responsibility to close itemProducer when the work is finished.
handler.doStart();
return handler.future;
}
+ private static void closeProducer(final DOMDataTreeProducer producer) {
+ try {
+ producer.close();
+ } catch (final DOMDataTreeProducerException exception) {
+ LOG.warn("Failure while closing producer.", exception);
+ }
+ }
+
@Override
ListenableFuture<Void> execWrite(final long txId) {
final int i = random.nextInt(MAX_ITEM + 1);
@Override
void runFailed(final Throwable cause) {
+ closeProducer(itemProducer);
future.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
.withError(RpcError.ErrorType.APPLICATION, "Submit failed", cause).build());
}
@Override
void runSuccessful(final long allTx) {
+ closeProducer(itemProducer);
final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
.setAllTx(allTx)
.setInsertTx(insertTx)
@Override
void runTimedOut(final Exception cause) {
+ closeProducer(itemProducer);
future.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
.withError(RpcError.ErrorType.APPLICATION,
"Final submit was timed out by the test provider or was interrupted", cause).build());