2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.spi;
10 import com.google.common.util.concurrent.ForwardingBlockingQueue;
11 import com.google.common.util.concurrent.ThreadFactoryBuilder;
12 import java.util.concurrent.BlockingQueue;
13 import java.util.concurrent.Executor;
14 import java.util.concurrent.LinkedBlockingQueue;
15 import java.util.concurrent.RejectedExecutionException;
16 import java.util.concurrent.RejectedExecutionHandler;
17 import java.util.concurrent.ThreadFactory;
18 import java.util.concurrent.ThreadPoolExecutor;
19 import java.util.concurrent.TimeUnit;
20 import org.eclipse.jdt.annotation.NonNullByDefault;
21 import org.osgi.service.component.annotations.Activate;
22 import org.osgi.service.component.annotations.Component;
23 import org.osgi.service.component.annotations.Deactivate;
24 import org.osgi.service.metatype.annotations.AttributeDefinition;
25 import org.osgi.service.metatype.annotations.Designate;
26 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
29 @Component(service = NetconfTopologySchemaAssembler.class, configurationPid = "org.opendaylight.netconf.topology")
30 @Designate(ocd = NetconfTopologySchemaAssembler.Configuration.class)
31 public final class NetconfTopologySchemaAssembler implements AutoCloseable {
32 @ObjectClassDefinition
33 public @interface Configuration {
34 @AttributeDefinition(min = "0")
35 int assembler$_$min$_$threads() default 1;
36 @AttributeDefinition(min = "1")
37 int assembler$_$max$_$threads() default 4;
38 @AttributeDefinition(min = "0")
39 long assembler$_$keep$_$alive$_$millis() default 60_000;
42 private static final class SynchronousBlockingQueue extends ForwardingBlockingQueue<Runnable> {
43 private final LinkedBlockingQueue<Runnable> delegate = new LinkedBlockingQueue<>();
46 protected BlockingQueue<Runnable> delegate() {
51 @SuppressWarnings("checkstyle:parameterName")
52 public boolean offer(final Runnable o) {
53 // ThreadPoolExecutor will spawn a new thread after core size is reached only if an offer is rejected. We
54 // always do that and recover via the execution handler.
59 private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
60 .setNameFormat("topology-schema-assembler-%d")
63 private static final RejectedExecutionHandler BLOCKING_REJECTED_EXECUTION_HANDLER = (runnable, executor) -> {
64 // if maximum number of threads are reached, the threadpool would reject the execution. We override that
65 // behaviour and block until the queue accepts the task or we get interrupted
67 executor.getQueue().put(runnable);
68 } catch (InterruptedException e) {
69 throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
73 private final ThreadPoolExecutor executor;
75 public NetconfTopologySchemaAssembler(final int minThreads, final int maxThreads, final long keepAliveTime,
76 final TimeUnit unit) {
77 executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, unit, new SynchronousBlockingQueue(),
78 THREAD_FACTORY, BLOCKING_REJECTED_EXECUTION_HANDLER);
82 public NetconfTopologySchemaAssembler(final Configuration config) {
83 this(config.assembler$_$min$_$threads(), config.assembler$_$max$_$threads(),
84 config.assembler$_$keep$_$alive$_$millis(), TimeUnit.MILLISECONDS);