<type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-async-data-broker</type>
<name>inmemory-data-broker</name>
</async-data-broker>
+ <notification-queue-depth>65536</notification-queue-depth>
+ <notification-queue-spin>1</notification-queue-spin>
+ <notification-queue-park>30</notification-queue-park>
</module>
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-data-compatible-broker</type>
*/
package org.opendaylight.controller.config.yang.md.sal.dom.impl;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ClassToInstanceMap;
import com.google.common.collect.MutableClassToInstanceMap;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
import org.opendaylight.controller.sal.dom.broker.BrokerImpl;
import org.opendaylight.controller.sal.dom.broker.GlobalBundleScanningSchemaServiceImpl;
-/**
-*
-*/
public final class DomBrokerImplModule extends org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractDomBrokerImplModule
{
}
@Override
- public void validate(){
+ public void validate() {
super.validate();
+ final long depth = getNotificationQueueDepth().getValue();
+ Preconditions.checkArgument(Long.lowestOneBit(depth) == Long.highestOneBit(depth), "Queue depth %s is not power-of-two", depth);
}
@Override
final ClassToInstanceMap<BrokerService> services = MutableClassToInstanceMap.create();
- // TODO: retrieve from config subsystem
- final int queueDepth = 1024;
-
- final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(queueDepth);
+ final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(getNotificationQueueDepth().getValue().intValue(),
+ getNotificationQueueSpin().longValue(), getNotificationQueuePark().longValue(), TimeUnit.MILLISECONDS);
services.putInstance(DOMNotificationService.class, domNotificationRouter);
services.putInstance(DOMNotificationPublishService.class, domNotificationRouter);
private volatile Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> listeners = ImmutableMultimap.of();
private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners = ListenerRegistry.create();
- private DOMNotificationRouter(final ExecutorService executor, final Disruptor<DOMNotificationRouterEvent> disruptor) {
+ @SuppressWarnings("unchecked")
+ private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
this.executor = Preconditions.checkNotNull(executor);
- this.disruptor = Preconditions.checkNotNull(disruptor);
+
+ disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, strategy);
+ disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
+ disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
+ disruptor.start();
}
- @SuppressWarnings("unchecked")
public static DOMNotificationRouter create(final int queueDepth) {
final ExecutorService executor = Executors.newCachedThreadPool();
- final Disruptor<DOMNotificationRouterEvent> disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, DEFAULT_STRATEGY);
- disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
- disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
- disruptor.start();
+ return new DOMNotificationRouter(executor, queueDepth, DEFAULT_STRATEGY);
+ }
+
+ public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime, final TimeUnit unit) {
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit);
- return new DOMNotificationRouter(executor, disruptor);
+ return new DOMNotificationRouter(executor, queueDepth, strategy);
}
@Override
config:java-name-prefix SchemaServiceImplSingleton;
}
+ typedef max-queue-depth {
+ type uint32 {
+ range 1..1073741824;
+ }
+ }
+
augment "/config:modules/config:module/config:configuration" {
case dom-broker-impl {
when "/config:modules/config:module/config:type = 'dom-broker-impl'";
}
}
}
+
+ leaf notification-queue-depth {
+ description "Maximum number of elements in the notification queue, must be power-of-two.";
+ type max-queue-depth;
+ default 65536;
+ }
+ leaf notification-queue-spin {
+ description "Number of milliseconds notification queue should spin for new requests before parking.";
+ type uint16;
+ units milliseconds;
+ default 1;
+ }
+ leaf notification-queue-park {
+ description "Number of milliseconds notification queue should park for new requests before blocking.";
+ type uint16;
+ units milliseconds;
+ default 30;
+ }
}
}