Add tool to measure southbound notification performance
[netconf.git] / netconf / tools / netconf-test-perf / src / main / java / org / opendaylight / netconf / test / perf / MountedDeviceListener.java
1 /*
2  * Copyright (c) 2021 Pantheon Technologies, s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.test.perf;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.Iterator;
17 import java.util.Optional;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import javax.annotation.PreDestroy;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.checkerframework.checker.nullness.qual.Nullable;
24 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
25 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
26 import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
27 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
28 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
29 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
30 import org.opendaylight.mdsal.dom.api.DOMRpcService;
31 import org.opendaylight.netconf.test.perf.notifications.NotificationsCounter;
32 import org.opendaylight.netconf.test.perf.utils.TestUtils;
33 import org.opendaylight.yang.gen.v1.org.opendaylight.coretutorials.ncmount.example.notifications.rev150611.VrfRouteNotification;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
37 import org.opendaylight.yangtools.concepts.ListenerRegistration;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
41 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
42 import org.osgi.service.component.annotations.Activate;
43 import org.osgi.service.component.annotations.Component;
44 import org.osgi.service.component.annotations.Deactivate;
45 import org.osgi.service.component.annotations.Reference;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 @Singleton
50 @Component(immediate = true)
51 public class MountedDeviceListener implements DOMMountPointListener {
52     private static final Logger LOG = LoggerFactory.getLogger(MountedDeviceListener.class);
53     private static final String TEST_NODE_PREFIX = "perf-";
54     private static final String STREAM_DEFAULT_NAME = "STREAM-PERF-DEFAULT";
55     private static final QName CREATE_SUBSCRIPTION_QNAME = QName.create(CreateSubscriptionInput.QNAME,
56         "create-subscription");
57
58     private final ConcurrentMap<YangInstanceIdentifier, ListenerRegistration<?>> listeners = new ConcurrentHashMap<>();
59     private final DOMMountPointService mountPointService;
60     private final BindingNormalizedNodeSerializer serializer;
61     private final ListenerRegistration<?> reg;
62
63     @Inject
64     @Activate
65     public MountedDeviceListener(final @Reference DOMMountPointService mountPointService,
66                                  final @Reference BindingNormalizedNodeSerializer serializer) {
67         this.mountPointService = requireNonNull(mountPointService);
68         this.serializer = requireNonNull(serializer);
69         reg = mountPointService.registerProvisionListener(this);
70     }
71
72     @PreDestroy
73     @Deactivate
74     public void stop() {
75         reg.close();
76         final Iterator<ListenerRegistration<?>> it = listeners.values().iterator();
77         while (it.hasNext()) {
78             it.next().close();
79             it.remove();
80         }
81     }
82
83     @Override
84     public void onMountPointCreated(final YangInstanceIdentifier path) {
85         final Optional<String> optNodeId = TestUtils.getNodeId(path);
86         if (optNodeId.isPresent() && optNodeId.get().startsWith(TEST_NODE_PREFIX)) {
87             LOG.info("Test node mounted: {}", optNodeId.get());
88             trackNotificationsPerformance(path);
89         }
90     }
91
92     @Override
93     public void onMountPointRemoved(final YangInstanceIdentifier path) {
94         final ListenerRegistration<?> listener = listeners.remove(path);
95         if (listener != null) {
96             listener.close();
97         }
98     }
99
100     private void trackNotificationsPerformance(final YangInstanceIdentifier path) {
101         // 1. get nodeId from the path
102         final String nodeId = TestUtils.getNodeId(path).get();
103
104         // 2. extract needed services from the mount point
105         final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
106             .orElseThrow(() -> new RuntimeException("Unable to get mountpoint"));
107         final DOMRpcService rpcService = mountPoint.getService(DOMRpcService.class)
108             .orElseThrow(() -> new RuntimeException("Unable to get RPC Service from the mountpoint"));
109         final DOMNotificationService notificationService = mountPoint.getService(DOMNotificationService.class)
110             .orElseThrow(() -> new RuntimeException("Unable to get NotificationService from the mountpoint"));
111
112         // 3. create a listener for the notifications
113         listeners.put(path, notificationService.registerNotificationListener(
114             new NotificationsCounter(nodeId, serializer), Absolute.of(VrfRouteNotification.QNAME)));
115
116         // 4. send 'create-subscription' request to the device
117         final StreamNameType streamNameType = new StreamNameType(STREAM_DEFAULT_NAME);
118         final CreateSubscriptionInputBuilder subscriptionInputBuilder = new CreateSubscriptionInputBuilder();
119         subscriptionInputBuilder.setStream(streamNameType);
120         final CreateSubscriptionInput input = subscriptionInputBuilder.build();
121         final ContainerNode inputNode = serializer.toNormalizedNodeRpcData(input);
122         final ListenableFuture<? extends DOMRpcResult> resultFuture = rpcService.invokeRpc(CREATE_SUBSCRIPTION_QNAME,
123             inputNode);
124         Futures.addCallback(resultFuture, new FutureCallback<DOMRpcResult>() {
125             @Override
126             public void onSuccess(@Nullable final DOMRpcResult rpcResult) {
127                 LOG.info("Notification stream subscription succesfully completed");
128             }
129
130             @Override
131             public void onFailure(final Throwable throwable) {
132                 LOG.error("Notification stream subscription failed");
133             }
134         }, MoreExecutors.directExecutor());
135     }
136 }