2 * Copyright (c) 2021 Pantheon Technologies, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.test.perf;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.Iterator;
17 import java.util.Optional;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import javax.annotation.PreDestroy;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.checkerframework.checker.nullness.qual.Nullable;
24 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
25 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
26 import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
27 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
28 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
29 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
30 import org.opendaylight.mdsal.dom.api.DOMRpcService;
31 import org.opendaylight.netconf.test.perf.notifications.NotificationsCounter;
32 import org.opendaylight.netconf.test.perf.utils.TestUtils;
33 import org.opendaylight.yang.gen.v1.org.opendaylight.coretutorials.ncmount.example.notifications.rev150611.VrfRouteNotification;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
37 import org.opendaylight.yangtools.concepts.ListenerRegistration;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
41 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
42 import org.osgi.service.component.annotations.Activate;
43 import org.osgi.service.component.annotations.Component;
44 import org.osgi.service.component.annotations.Deactivate;
45 import org.osgi.service.component.annotations.Reference;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 @Component(immediate = true)
51 public class MountedDeviceListener implements DOMMountPointListener {
52 private static final Logger LOG = LoggerFactory.getLogger(MountedDeviceListener.class);
53 private static final String TEST_NODE_PREFIX = "perf-";
54 private static final String STREAM_DEFAULT_NAME = "STREAM-PERF-DEFAULT";
55 private static final QName CREATE_SUBSCRIPTION_QNAME = QName.create(CreateSubscriptionInput.QNAME,
56 "create-subscription");
58 private final ConcurrentMap<YangInstanceIdentifier, ListenerRegistration<?>> listeners = new ConcurrentHashMap<>();
59 private final DOMMountPointService mountPointService;
60 private final BindingNormalizedNodeSerializer serializer;
61 private final ListenerRegistration<?> reg;
65 public MountedDeviceListener(final @Reference DOMMountPointService mountPointService,
66 final @Reference BindingNormalizedNodeSerializer serializer) {
67 this.mountPointService = requireNonNull(mountPointService);
68 this.serializer = requireNonNull(serializer);
69 reg = mountPointService.registerProvisionListener(this);
76 final Iterator<ListenerRegistration<?>> it = listeners.values().iterator();
77 while (it.hasNext()) {
84 public void onMountPointCreated(final YangInstanceIdentifier path) {
85 final Optional<String> optNodeId = TestUtils.getNodeId(path);
86 if (optNodeId.isPresent() && optNodeId.get().startsWith(TEST_NODE_PREFIX)) {
87 LOG.info("Test node mounted: {}", optNodeId.get());
88 trackNotificationsPerformance(path);
93 public void onMountPointRemoved(final YangInstanceIdentifier path) {
94 final ListenerRegistration<?> listener = listeners.remove(path);
95 if (listener != null) {
100 private void trackNotificationsPerformance(final YangInstanceIdentifier path) {
101 // 1. get nodeId from the path
102 final String nodeId = TestUtils.getNodeId(path).get();
104 // 2. extract needed services from the mount point
105 final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
106 .orElseThrow(() -> new RuntimeException("Unable to get mountpoint"));
107 final DOMRpcService rpcService = mountPoint.getService(DOMRpcService.class)
108 .orElseThrow(() -> new RuntimeException("Unable to get RPC Service from the mountpoint"));
109 final DOMNotificationService notificationService = mountPoint.getService(DOMNotificationService.class)
110 .orElseThrow(() -> new RuntimeException("Unable to get NotificationService from the mountpoint"));
112 // 3. create a listener for the notifications
113 listeners.put(path, notificationService.registerNotificationListener(
114 new NotificationsCounter(nodeId, serializer), Absolute.of(VrfRouteNotification.QNAME)));
116 // 4. send 'create-subscription' request to the device
117 final StreamNameType streamNameType = new StreamNameType(STREAM_DEFAULT_NAME);
118 final CreateSubscriptionInputBuilder subscriptionInputBuilder = new CreateSubscriptionInputBuilder();
119 subscriptionInputBuilder.setStream(streamNameType);
120 final CreateSubscriptionInput input = subscriptionInputBuilder.build();
121 final ContainerNode inputNode = serializer.toNormalizedNodeRpcData(input);
122 final ListenableFuture<? extends DOMRpcResult> resultFuture = rpcService.invokeRpc(CREATE_SUBSCRIPTION_QNAME,
124 Futures.addCallback(resultFuture, new FutureCallback<DOMRpcResult>() {
126 public void onSuccess(@Nullable final DOMRpcResult rpcResult) {
127 LOG.info("Notification stream subscription succesfully completed");
131 public void onFailure(final Throwable throwable) {
132 LOG.error("Notification stream subscription failed");
134 }, MoreExecutors.directExecutor());