- /*
- * We use an executor for commit ListenableFuture callbacks that favors reusing available
- * threads over creating new threads at the expense of execution time. The assumption is
- * that most ListenableFuture callbacks won't execute a lot of business logic where we want
- * it to run quicker - many callbacks will likely just handle error conditions and do
- * nothing on success. The executor queue capacity is bounded and, if the capacity is
- * reached, subsequent submitted tasks will block the caller.
- */
- Executor listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(
- PropertyUtils.getIntSystemProperty(
- FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP,
- DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE),
- PropertyUtils.getIntSystemProperty(
- FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP,
- DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE), "CommitFutures");
-
- DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores,
- new DeadlockDetectingListeningExecutorService(commitExecutor,
- TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION,
- listenableFutureExecutor));
-
- return newDataBroker;
+ SerializedDOMDataBroker sdb = new SerializedDOMDataBroker(datastores,
+ new DeadlockDetectingListeningExecutorService(commitExecutor,
+ TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER,
+ listenableFutureExecutor));
+ commitStatsTracker = sdb.getCommitStatsTracker();
+
+ final AbstractMXBean commitExecutorStatsMXBean =
+ ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats",
+ JMX_BEAN_TYPE, null);
+ if(commitExecutorStatsMXBean != null) {
+ mBeans.add(commitExecutorStatsMXBean);
+ }
+
+ if(commitStatsTracker != null) {
+ final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl(
+ commitStatsTracker, JMX_BEAN_TYPE);
+ commitStatsMXBean.registerMBean();
+ mBeans.add(commitStatsMXBean);
+ }
+
+ final AbstractMXBean commitFutureStatsMXBean =
+ ThreadExecutorStatsMXBeanImpl.create(listenableFutureExecutor,
+ "CommitFutureExecutorStats", JMX_BEAN_TYPE, null);
+ if(commitFutureStatsMXBean != null) {
+ mBeans.add(commitFutureStatsMXBean);
+ }
+
+ sdb.setCloseable(new AutoCloseable() {
+ @Override
+ public void close() {
+ for(AbstractMXBean mBean: mBeans) {
+ mBean.unregisterMBean();
+ }
+ }
+ });
+
+ return sdb;