From: Robert Varga Date: Tue, 16 Jan 2024 19:09:19 +0000 (+0100) Subject: Factor out DataBrokerCommitExecutor X-Git-Tag: v9.0.0~5 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=refs%2Fchanges%2F10%2F109810%2F9 Factor out DataBrokerCommitExecutor DataBrokerCommitExecutor holds the executor service and manages its statistics and lifecycle. This leaves OSGiDOMDataBroker absolutely bare forwarder. Change-Id: I55751c49c40aba9cfad289e0a1b913241df4f66f Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DataBrokerCommitExecutor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DataBrokerCommitExecutor.java new file mode 100644 index 0000000000..bdea393fc8 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DataBrokerCommitExecutor.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl; +import org.opendaylight.yangtools.util.DurationStatisticsTracker; +import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.Designate; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Component( + service = DataBrokerCommitExecutor.class, + configurationPid = "org.opendaylight.controller.cluster.datastore.broker") +@Designate(ocd = DataBrokerCommitExecutor.Config.class) +public final class DataBrokerCommitExecutor { + @ObjectClassDefinition + public @interface Config { + @AttributeDefinition(name = "max-data-broker-future-callback-queue-size") + int callbackQueueSize() default 1000; + @AttributeDefinition(name = "max-data-broker-future-callback-pool-size") + int callbackPoolSize() default 20; + } + + private static final Logger LOG = LoggerFactory.getLogger(DataBrokerCommitExecutor.class); + + private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent(); + private final ThreadExecutorStatsMXBeanImpl threadStats; + private final CommitStatsMXBeanImpl commitStats; + private final ExecutorService executorService; + + @Activate + public DataBrokerCommitExecutor(final Config config) { + executorService = SpecialExecutors.newBlockingBoundedCachedThreadPool(config.callbackPoolSize(), + config.callbackQueueSize(), "CommitFutures", ConcurrentDOMDataBroker.class); + threadStats = ThreadExecutorStatsMXBeanImpl.create(executorService, "CommitFutureExecutorStats", + "DOMDataBroker"); + commitStats = new CommitStatsMXBeanImpl(commitStatsTracker, "DOMDataBroker"); + commitStats.register(); + LOG.info("DOM Data Broker commit exector started"); + } + + @Deactivate + void deactivate() { + LOG.info("DOM Data Broker commit exector stopping"); + commitStats.unregister(); + threadStats.unregister(); + executorService.shutdown(); + try { + executorService.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOG.warn("Future executor failed to finish in time, giving up", e); + } + LOG.info("DOM Data Broker commit exector stopped"); + } + + Executor executor() { + return executorService; + } + + DurationStatisticsTracker commitStatsTracker() { + return commitStatsTracker; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/OSGiDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/OSGiDOMDataBroker.java index 3d13f783e4..7788ffee3d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/OSGiDOMDataBroker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/OSGiDOMDataBroker.java @@ -9,63 +9,32 @@ package org.opendaylight.controller.cluster.databroker; import com.google.common.annotations.Beta; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import org.eclipse.jdt.annotation.NonNull; -import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.spi.ForwardingDOMDataBroker; import org.opendaylight.mdsal.dom.spi.store.DOMStore; -import org.opendaylight.yangtools.util.DurationStatisticsTracker; -import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; -import org.osgi.service.metatype.annotations.AttributeDefinition; -import org.osgi.service.metatype.annotations.Designate; -import org.osgi.service.metatype.annotations.ObjectClassDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Beta -@Component(service = DOMDataBroker.class, configurationPid = "org.opendaylight.controller.cluster.datastore.broker", - property = "type=default") -@Designate(ocd = OSGiDOMDataBroker.Config.class) +@Component(service = DOMDataBroker.class, property = "type=default") public final class OSGiDOMDataBroker extends ForwardingDOMDataBroker { - @ObjectClassDefinition - public @interface Config { - @AttributeDefinition(name = "max-data-broker-future-callback-queue-size") - int callbackQueueSize() default 1000; - @AttributeDefinition(name = "max-data-broker-future-callback-pool-size") - int callbackPoolSize() default 20; - } - private static final Logger LOG = LoggerFactory.getLogger(OSGiDOMDataBroker.class); private final @NonNull ConcurrentDOMDataBroker delegate; - private final ThreadExecutorStatsMXBeanImpl threadStats; - private final CommitStatsMXBeanImpl commitStats; - private final ExecutorService executorService; @Activate - public OSGiDOMDataBroker( + public OSGiDOMDataBroker(@Reference final DataBrokerCommitExecutor commitExecutor, @Reference(target = "(type=distributed-config)") final DOMStore configDatastore, - @Reference(target = "(type=distributed-operational)") final DOMStore operDatastore, final Config config) { - LOG.info("DOM Data Broker starting"); - final var commitStatsTracker = DurationStatisticsTracker.createConcurrent(); - - executorService = SpecialExecutors.newBlockingBoundedCachedThreadPool(config.callbackPoolSize(), - config.callbackQueueSize(), "CommitFutures", ConcurrentDOMDataBroker.class); - threadStats = ThreadExecutorStatsMXBeanImpl.create(executorService, "CommitFutureExecutorStats", - "DOMDataBroker"); - commitStats = new CommitStatsMXBeanImpl(commitStatsTracker, "DOMDataBroker"); - commitStats.register(); - + @Reference(target = "(type=distributed-operational)") final DOMStore operDatastore) { delegate = new ConcurrentDOMDataBroker(Map.of( LogicalDatastoreType.CONFIGURATION, configDatastore, LogicalDatastoreType.OPERATIONAL, operDatastore), - executorService, commitStatsTracker); + commitExecutor.executor(), commitExecutor.commitStatsTracker()); LOG.info("DOM Data Broker started"); } @@ -78,14 +47,6 @@ public final class OSGiDOMDataBroker extends ForwardingDOMDataBroker { void deactivate() { LOG.info("DOM Data Broker stopping"); delegate.close(); - commitStats.unregister(); - threadStats.unregister(); - executorService.shutdown(); - try { - executorService.awaitTermination(1, TimeUnit.MINUTES); - } catch (InterruptedException e) { - LOG.warn("Future executor failed to finish in time, giving up", e); - } LOG.info("DOM Data Broker stopped"); } }