Stream name should be used instead of BASE_STREAM_NAME 67/92767/10
authorIaroslav <iaroslav.kholiavko@pantheon.tech>
Tue, 29 Sep 2020 09:43:41 +0000 (12:43 +0300)
committerTomas Cere <tomas.cere@pantheon.tech>
Mon, 22 Mar 2021 12:51:36 +0000 (12:51 +0000)
Refactored onNotification method to use stream parameter instead of "BASE_STREAM_NAME".
Fix ConcurrentModificationException while closing listeners/publishers.

JIRA: NETCONF-306
Change-Id: I3f08a294adf53e34c31dd23ca8d9627a46fb585c
Signed-off-by: Iaroslav <iaroslav.kholiavko@pantheon.tech>
Signed-off-by: Vladyslav Marchenko <vladyslav.marchenko@pantheon.tech>
netconf/mdsal-netconf-notification/src/main/java/org/opendaylight/netconf/mdsal/notification/impl/NetconfNotificationManager.java
netconf/mdsal-netconf-notification/src/test/java/org/opendaylight/netconf/mdsal/notification/impl/NetconfNotificationManagerTest.java

index 60221039d09df7d79a14899517f6dce440f872cf..d26b2cbd4ac5af7a406dcf617286579d0270dae8 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multiset;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -101,8 +102,8 @@ public class NetconfNotificationManager implements NetconfNotificationCollector,
             LOG.debug("Notification of type {} detected: {}", stream, notification);
         }
 
-        for (final GenericNotificationListenerReg listenerReg : notificationListeners.get(BASE_STREAM_NAME)) {
-            listenerReg.getListener().onNotification(BASE_STREAM_NAME, notification);
+        for (final GenericNotificationListenerReg listenerReg : notificationListeners.get(stream)) {
+            listenerReg.getListener().onNotification(stream, notification);
         }
     }
 
@@ -115,7 +116,7 @@ public class NetconfNotificationManager implements NetconfNotificationCollector,
 
         LOG.trace("Notification listener registered for stream: {}", stream);
 
-        final GenericNotificationListenerReg reg = new GenericNotificationListenerReg(listener) {
+        final GenericNotificationListenerReg reg = new GenericNotificationListenerReg(listener, stream) {
             @Override
             public void close() {
                 synchronized (NetconfNotificationManager.this) {
@@ -125,7 +126,7 @@ public class NetconfNotificationManager implements NetconfNotificationCollector,
             }
         };
 
-        notificationListeners.put(BASE_STREAM_NAME, reg);
+        notificationListeners.put(stream, reg);
         return reg;
     }
 
@@ -159,13 +160,15 @@ public class NetconfNotificationManager implements NetconfNotificationCollector,
     @Override
     public synchronized void close() {
         // Unregister all listeners
-        for (final GenericNotificationListenerReg genericNotificationListenerReg : notificationListeners.values()) {
-            genericNotificationListenerReg.close();
+        // Use new list to avoid ConcurrentModificationException
+        for (final GenericNotificationListenerReg listenerReg : new ArrayList<>(notificationListeners.values())) {
+            listenerReg.close();
         }
         notificationListeners.clear();
 
         // Unregister all publishers
-        for (final GenericNotificationPublisherReg notificationPublisher : notificationPublishers) {
+        // Use new list to avoid ConcurrentModificationException
+        for (final GenericNotificationPublisherReg notificationPublisher : new ArrayList<>(notificationPublishers)) {
             notificationPublisher.close();
         }
         notificationPublishers.clear();
@@ -350,9 +353,12 @@ public class NetconfNotificationManager implements NetconfNotificationCollector,
 
     private class GenericNotificationListenerReg implements NotificationListenerRegistration {
         private final NetconfNotificationListener listener;
+        private final StreamNameType listenedStream;
 
-        GenericNotificationListenerReg(final NetconfNotificationListener listener) {
+        GenericNotificationListenerReg(final NetconfNotificationListener listener,
+                                       final StreamNameType listenedStream) {
             this.listener = listener;
+            this.listenedStream = listenedStream;
         }
 
         public NetconfNotificationListener getListener() {
@@ -361,7 +367,7 @@ public class NetconfNotificationManager implements NetconfNotificationCollector,
 
         @Override
         public void close() {
-            notificationListeners.remove(BASE_STREAM_NAME, this);
+            notificationListeners.remove(listenedStream, this);
         }
     }
 }
index 87cc01fbab7075fcd4f50798de448a6f53aa68ed..0e3838db83326617f032467ccb2076ce92b5be65 100644 (file)
@@ -10,8 +10,10 @@ package org.opendaylight.netconf.mdsal.notification.impl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
@@ -25,19 +27,19 @@ import java.util.Iterator;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.binding.dom.codec.impl.DefaultBindingDOMCodecFactory;
 import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator;
+import org.opendaylight.netconf.api.xml.XmlUtil;
 import org.opendaylight.netconf.mdsal.notification.impl.ops.NotificationsTransformUtil;
 import org.opendaylight.netconf.notifications.BaseNotificationPublisherRegistration;
 import org.opendaylight.netconf.notifications.NetconfNotification;
 import org.opendaylight.netconf.notifications.NetconfNotificationCollector;
 import org.opendaylight.netconf.notifications.NetconfNotificationListener;
-import org.opendaylight.netconf.notifications.NetconfNotificationRegistry;
 import org.opendaylight.netconf.notifications.NotificationListenerRegistration;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
 import org.opendaylight.yangtools.yang.model.parser.api.YangParserException;
@@ -45,10 +47,7 @@ import org.opendaylight.yangtools.yang.parser.impl.YangParserFactoryImpl;
 
 @RunWith(MockitoJUnitRunner.class)
 public class NetconfNotificationManagerTest {
-
     public static final String RFC3339_DATE_FORMAT_WITH_MILLIS_BLUEPRINT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
-    @Mock
-    private NetconfNotificationRegistry notificationRegistry;
 
     @Test
     public void testEventTime() throws Exception {
@@ -161,6 +160,35 @@ public class NetconfNotificationManagerTest {
         verifyNoMoreInteractions(listener);
     }
 
+    @Test
+    public void testCustomNotificationListeners() throws Exception {
+        final NetconfNotificationManager netconfNotificationManager = createManager();
+
+        final StreamNameType testStreamName = new StreamNameType("TEST_STREAM");
+        final Stream testStream = new StreamBuilder().setName(testStreamName).build();
+
+        final NetconfNotificationListener listenerBase = mock(NetconfNotificationListener.class);
+        netconfNotificationManager.registerNotificationListener(
+            NetconfNotificationManager.BASE_NETCONF_STREAM.getName(), listenerBase);
+
+        final NetconfNotificationListener listener = mock(NetconfNotificationListener.class);
+        netconfNotificationManager.registerNotificationListener(testStream.getName(), listener);
+
+        doNothing().when(listener).onNotification(eq(testStreamName), any(NetconfNotification.class));
+
+        final NetconfNotification notification = new NetconfNotification(
+            XmlUtil.readXmlToDocument("<notification/>"));
+        netconfNotificationManager.onNotification(testStream.getName(), notification);
+
+        verify(listener).onNotification(eq(testStream.getName()), eq(notification));
+
+        netconfNotificationManager.close();
+        netconfNotificationManager.onNotification(testStream.getName(), notification);
+
+        verifyNoMoreInteractions(listener);
+        verify(listenerBase, never()).onNotification(eq(testStream.getName()), eq(notification));
+    }
+
     @Test
     public void testClose() throws Exception {
         final NetconfNotificationManager netconfNotificationManager = createManager();