Bump upstreams
[netconf.git] / netconf / tools / netconf-test-perf / src / main / java / org / opendaylight / netconf / test / perf / MountedDeviceListener.java
1 /*
2  * Copyright (c) 2021 Pantheon Technologies, s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.test.perf;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import javax.annotation.PreDestroy;
19 import javax.inject.Inject;
20 import javax.inject.Singleton;
21 import org.checkerframework.checker.nullness.qual.Nullable;
22 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
23 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
24 import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
25 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
26 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
27 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
28 import org.opendaylight.mdsal.dom.api.DOMRpcService;
29 import org.opendaylight.netconf.test.perf.notifications.NotificationsCounter;
30 import org.opendaylight.netconf.test.perf.utils.TestUtils;
31 import org.opendaylight.yang.gen.v1.org.opendaylight.coretutorials.ncmount.example.notifications.rev150611.VrfRouteNotification;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
35 import org.opendaylight.yangtools.concepts.Registration;
36 import org.opendaylight.yangtools.yang.common.QName;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
39 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
40 import org.osgi.service.component.annotations.Activate;
41 import org.osgi.service.component.annotations.Component;
42 import org.osgi.service.component.annotations.Deactivate;
43 import org.osgi.service.component.annotations.Reference;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 @Singleton
48 @Component(immediate = true)
49 public final class MountedDeviceListener implements DOMMountPointListener {
50     private static final Logger LOG = LoggerFactory.getLogger(MountedDeviceListener.class);
51     private static final String TEST_NODE_PREFIX = "perf-";
52     private static final String STREAM_DEFAULT_NAME = "STREAM-PERF-DEFAULT";
53     private static final QName CREATE_SUBSCRIPTION_QNAME = QName.create(CreateSubscriptionInput.QNAME,
54         "create-subscription");
55
56     private final ConcurrentMap<YangInstanceIdentifier, Registration> listeners = new ConcurrentHashMap<>();
57     private final DOMMountPointService mountPointService;
58     private final BindingNormalizedNodeSerializer serializer;
59     private final Registration reg;
60
61     @Inject
62     @Activate
63     public MountedDeviceListener(final @Reference DOMMountPointService mountPointService,
64                                  final @Reference BindingNormalizedNodeSerializer serializer) {
65         this.mountPointService = requireNonNull(mountPointService);
66         this.serializer = requireNonNull(serializer);
67         reg = mountPointService.registerProvisionListener(this);
68     }
69
70     @PreDestroy
71     @Deactivate
72     public void stop() {
73         reg.close();
74         final var it = listeners.values().iterator();
75         while (it.hasNext()) {
76             it.next().close();
77             it.remove();
78         }
79     }
80
81     @Override
82     public void onMountPointCreated(final YangInstanceIdentifier path) {
83         TestUtils.getNodeId(path).ifPresent(nodeId -> {
84             if (nodeId.startsWith(TEST_NODE_PREFIX)) {
85                 LOG.info("Test node mounted: {}", nodeId);
86                 trackNotificationsPerformance(path);
87             }
88         });
89     }
90
91     @Override
92     public void onMountPointRemoved(final YangInstanceIdentifier path) {
93         final var listener = listeners.remove(path);
94         if (listener != null) {
95             listener.close();
96         }
97     }
98
99     private void trackNotificationsPerformance(final YangInstanceIdentifier path) {
100         // 1. get nodeId from the path
101         final String nodeId = TestUtils.getNodeId(path).orElseThrow();
102
103         // 2. extract needed services from the mount point
104         final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
105             .orElseThrow(() -> new RuntimeException("Unable to get mountpoint"));
106         final DOMRpcService rpcService = mountPoint.getService(DOMRpcService.class)
107             .orElseThrow(() -> new RuntimeException("Unable to get RPC Service from the mountpoint"));
108         final DOMNotificationService notificationService = mountPoint.getService(DOMNotificationService.class)
109             .orElseThrow(() -> new RuntimeException("Unable to get NotificationService from the mountpoint"));
110
111         // 3. create a listener for the notifications
112         listeners.put(path, notificationService.registerNotificationListener(
113             new NotificationsCounter(nodeId, serializer), Absolute.of(VrfRouteNotification.QNAME)));
114
115         // 4. send 'create-subscription' request to the device
116         final StreamNameType streamNameType = new StreamNameType(STREAM_DEFAULT_NAME);
117         final CreateSubscriptionInputBuilder subscriptionInputBuilder = new CreateSubscriptionInputBuilder();
118         subscriptionInputBuilder.setStream(streamNameType);
119         final CreateSubscriptionInput input = subscriptionInputBuilder.build();
120         final ContainerNode inputNode = serializer.toNormalizedNodeRpcData(input);
121         final ListenableFuture<? extends DOMRpcResult> resultFuture = rpcService.invokeRpc(CREATE_SUBSCRIPTION_QNAME,
122             inputNode);
123         Futures.addCallback(resultFuture, new FutureCallback<DOMRpcResult>() {
124             @Override
125             public void onSuccess(@Nullable final DOMRpcResult rpcResult) {
126                 LOG.info("Notification stream subscription succesfully completed");
127             }
128
129             @Override
130             public void onFailure(final Throwable throwable) {
131                 LOG.error("Notification stream subscription failed");
132             }
133         }, MoreExecutors.directExecutor());
134     }
135 }