Merge "Bug 3864: Notify netconf monitoring about changes in session"
authorTomas Cere <tcere@cisco.com>
Fri, 6 May 2016 10:45:30 +0000 (10:45 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 6 May 2016 10:45:30 +0000 (10:45 +0000)
22 files changed:
features/netconf/src/main/features/features.xml
netconf/mdsal-netconf-monitoring/src/main/java/org/opendaylight/controller/config/yang/netconf/mdsal/monitoring/MonitoringToMdsalWriter.java
netconf/mdsal-netconf-monitoring/src/test/java/org/opendaylight/controller/config/yang/netconf/mdsal/monitoring/MonitoringToMdsalWriterTest.java
netconf/mdsal-netconf-notification/src/main/java/org/opendaylight/controller/config/yang/netconf/mdsal/notification/SessionNotificationProducer.java
netconf/mdsal-netconf-notification/src/test/java/org/opendaylight/controller/config/yang/netconf/mdsal/notification/SessionNotificationProducerTest.java
netconf/netconf-api/src/main/java/org/opendaylight/netconf/api/monitoring/NetconfMonitoringService.java
netconf/netconf-api/src/main/java/org/opendaylight/netconf/api/monitoring/SessionEvent.java [new file with mode: 0644]
netconf/netconf-api/src/main/java/org/opendaylight/netconf/api/monitoring/SessionListener.java
netconf/netconf-impl/pom.xml
netconf/netconf-impl/src/main/java/org/opendaylight/controller/config/yang/config/netconf/northbound/impl/NetconfServerMonitoringModule.java
netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/NetconfServerSession.java
netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/NetconfServerSessionListener.java
netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfCapabilityMonitoringService.java [new file with mode: 0644]
netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfMonitoringServiceImpl.java
netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringService.java [new file with mode: 0644]
netconf/netconf-impl/src/main/yang/netconf-northbound-impl.yang
netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/ConcurrentClientsTest.java
netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfCapabilityMonitoringServiceTest.java [moved from netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfMonitoringServiceImplTest.java with 82% similarity]
netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringServiceTest.java [new file with mode: 0644]
netconf/netconf-mdsal-config/src/main/resources/initial/08-netconf-mdsal.xml
netconf/netconf-monitoring/src/test/java/org/opendaylight/netconf/monitoring/xml/JaxBSerializerTest.java
netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/DummyMonitoringService.java

index 493d6d97773b36d0f45e01d5ea82ad6cabd845ec..0552553cff47312ee4b5b08ac75cf6faa6463d8e 100644 (file)
@@ -52,6 +52,7 @@
     <feature version='${project.version}'>odl-netconf-mapping-api</feature>
     <feature version='${project.version}'>odl-netconf-util</feature>
     <feature version='${project.version}'>odl-netconf-netty-util</feature>
+    <feature version='${config.version}'>odl-config-netty</feature>
     <!-- Netconf server without config connector is just an empty shell -->
     <feature version='${project.version}'>odl-config-netconf-connector</feature>
     <!-- Netconf will not provide schemas without monitoring -->
@@ -68,6 +69,7 @@
     <feature version='${project.version}'>odl-netconf-netty-util</feature>
     <bundle>mvn:org.opendaylight.netconf/netconf-impl/{{VERSION}}</bundle>
     <feature version='${project.version}'>odl-netconf-notifications-api</feature>
+    <feature version='${config.version}'>odl-config-netty</feature>
     <bundle>mvn:org.opendaylight.netconf/netconf-notifications-impl/{{VERSION}}</bundle>
     <bundle>mvn:org.opendaylight.netconf/config-netconf-connector/{{VERSION}}</bundle>
   </feature>
index 11fac96f92ecc9f58943da97ab9f7503b0633dc3..7c434f2813eae2916abd909a6f4d7882c440a6a8 100644 (file)
@@ -11,6 +11,8 @@ package org.opendaylight.controller.config.yang.netconf.mdsal.monitoring;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import java.util.Collection;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
@@ -23,12 +25,15 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.mon
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Schemas;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Sessions;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
-import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class MonitoringToMdsalWriter implements AutoCloseable, NetconfMonitoringService.MonitoringListener, BindingAwareProvider {
+/**
+ * Writes netconf server state changes received from NetconfMonitoringService to netconf-state datastore subtree.
+ */
+final class MonitoringToMdsalWriter implements AutoCloseable, NetconfMonitoringService.CapabilitiesListener,
+        NetconfMonitoringService.SessionsListener, BindingAwareProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(MonitoringToMdsalWriter.class);
 
@@ -48,43 +53,49 @@ final class MonitoringToMdsalWriter implements AutoCloseable, NetconfMonitoringS
 
     @Override
     public void close() {
-        deleteFromDatastore(InstanceIdentifier.create(NetconfState.class));
+        runTransaction((tx) -> tx.delete(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(NetconfState.class)));
     }
 
     @Override
     public void onSessionStarted(Session session) {
         final InstanceIdentifier<Session> sessionPath =
                 SESSIONS_INSTANCE_IDENTIFIER.child(Session.class, session.getKey());
-        putToDatastore(sessionPath, session);
+        runTransaction((tx) -> tx.put(LogicalDatastoreType.OPERATIONAL, sessionPath, session));
     }
 
     @Override
     public void onSessionEnded(Session session) {
         final InstanceIdentifier<Session> sessionPath =
                 SESSIONS_INSTANCE_IDENTIFIER.child(Session.class, session.getKey());
-        deleteFromDatastore(sessionPath);
+        runTransaction((tx) -> tx.delete(LogicalDatastoreType.OPERATIONAL, sessionPath));
+    }
+
+    @Override
+    public void onSessionsUpdated(Collection<Session> sessions) {
+        runTransaction((tx) -> updateSessions(tx, sessions));
     }
 
     @Override
     public void onCapabilitiesChanged(Capabilities capabilities) {
-        putToDatastore(CAPABILITIES_INSTANCE_IDENTIFIER, capabilities);
+        runTransaction((tx) -> tx.put(LogicalDatastoreType.OPERATIONAL, CAPABILITIES_INSTANCE_IDENTIFIER, capabilities));
     }
 
     @Override
     public void onSchemasChanged(Schemas schemas) {
-        putToDatastore(SCHEMAS_INSTANCE_IDENTIFIER, schemas);
+        runTransaction((tx) -> tx.put(LogicalDatastoreType.OPERATIONAL, SCHEMAS_INSTANCE_IDENTIFIER, schemas));
     }
 
     @Override
     public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
         dataBroker = providerContext.getSALService(DataBroker.class);
-        serverMonitoringDependency.registerListener(this);
+        serverMonitoringDependency.registerCapabilitiesListener(this);
+        serverMonitoringDependency.registerSessionsListener(this);
     }
 
-    private <T extends DataObject> void putToDatastore(InstanceIdentifier<T> path, T value) {
+    private void runTransaction(Consumer<WriteTransaction> txUser) {
         Preconditions.checkState(dataBroker != null);
         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
-        tx.put(LogicalDatastoreType.OPERATIONAL, path, value);
+        txUser.accept(tx);
         Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
             @Override
             public void onSuccess(@Nullable Void result) {
@@ -98,20 +109,11 @@ final class MonitoringToMdsalWriter implements AutoCloseable, NetconfMonitoringS
         });
     }
 
-    private <T extends DataObject> void deleteFromDatastore(InstanceIdentifier<T> path) {
-        Preconditions.checkState(dataBroker != null);
-        final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
-        tx.delete(LogicalDatastoreType.OPERATIONAL, path);
-        Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(@Nullable Void result) {
-                LOG.debug("Netconf state updated successfully");
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOG.warn("Unable to update netconf state", t);
-            }
-        });
+    private static void updateSessions(WriteTransaction tx, Collection<Session> sessions) {
+        for (Session session : sessions) {
+            final InstanceIdentifier<Session> sessionPath =
+                    SESSIONS_INSTANCE_IDENTIFIER.child(Session.class, session.getKey());
+            tx.put(LogicalDatastoreType.OPERATIONAL, sessionPath, session);
+        }
     }
 }
index 2a38a1ffc7fd3dd695fb4eb24023662892ba1288..ce52b2508e9e1b68357d630e1010ee714989e857 100644 (file)
@@ -16,6 +16,9 @@ import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.verify;
 
 import com.google.common.util.concurrent.Futures;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
@@ -55,7 +58,8 @@ public class MonitoringToMdsalWriterTest {
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
 
-        doReturn(null).when(monitoring).registerListener(any());
+        doReturn(null).when(monitoring).registerCapabilitiesListener(any());
+        doReturn(null).when(monitoring).registerSessionsListener(any());
 
         doReturn(dataBroker).when(context).getSALService(DataBroker.class);
 
@@ -125,9 +129,30 @@ public class MonitoringToMdsalWriterTest {
         inOrder.verify(writeTransaction).submit();
     }
 
+    @Test
+    public void testOnSessionsUpdated() throws Exception {
+        Session session1 = new SessionBuilder()
+                .setSessionId(1L)
+                .build();
+        Session session2 = new SessionBuilder()
+                .setSessionId(2L)
+                .build();
+        List<Session> sessions = new ArrayList<>();
+        sessions.add(session1);
+        sessions.add(session2);
+        final InstanceIdentifier<Session> id1 = InstanceIdentifier.create(NetconfState.class).child(Sessions.class).child(Session.class, session1.getKey());
+        final InstanceIdentifier<Session> id2 = InstanceIdentifier.create(NetconfState.class).child(Sessions.class).child(Session.class, session2.getKey());
+        writer.onSessionInitiated(context);
+        writer.onSessionsUpdated(sessions);
+        InOrder inOrder = inOrder(writeTransaction);
+        inOrder.verify(writeTransaction).put(LogicalDatastoreType.OPERATIONAL, id1, session1);
+        inOrder.verify(writeTransaction).put(LogicalDatastoreType.OPERATIONAL, id2, session2);
+        inOrder.verify(writeTransaction).submit();
+    }
+
     @Test
     public void testOnSessionInitiated() throws Exception {
         writer.onSessionInitiated(context);
-        verify(monitoring).registerListener(writer);
+        verify(monitoring).registerCapabilitiesListener(writer);
     }
 }
\ No newline at end of file
index 06b3e05e5691c3043d262d306a2ce9d00c80c892..54f67d39789233959e2b481723788bd2dc9699d3 100644 (file)
@@ -46,7 +46,7 @@ public class SessionNotificationProducer extends OperationalDatastoreListener<Se
             switch (modificationType) {
                 case WRITE:
                     final Session created = rootNode.getDataAfter();
-                    if (created != null) {
+                    if (created != null && rootNode.getDataBefore() == null) {
                         publishStartedSession(created);
                     }
                     break;
index 5f8e5ea9a940ab7dba531391baa3dbcb0c8a108b..e0ddae7feee1d97b9288fafc431915987aa0db4e 100644 (file)
@@ -11,6 +11,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 import java.util.Collections;
@@ -28,6 +29,7 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.mon
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.SessionBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionEnd;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionStart;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.ZeroBasedCounter32;
 
 public class SessionNotificationProducerTest {
 
@@ -56,6 +58,22 @@ public class SessionNotificationProducerTest {
         Assert.assertEquals(session.getUsername(), value.getUsername());
     }
 
+    @Test
+    public void testOnDataChangedSessionUpdated() throws Exception {
+        final DataTreeModification<Session> treeChange = mock(DataTreeModification.class);
+        final DataObjectModification<Session> changeObject = mock(DataObjectModification.class);
+        final Session sessionBefore = createSessionWithInRpcCount(1, 0);
+        final Session sessionAfter = createSessionWithInRpcCount(1, 1);
+        doReturn(sessionBefore).when(changeObject).getDataBefore();
+        doReturn(sessionAfter).when(changeObject).getDataAfter();
+        doReturn(DataObjectModification.ModificationType.WRITE).when(changeObject).getModificationType();
+        doReturn(changeObject).when(treeChange).getRootNode();
+        publisher.onDataTreeChanged(Collections.singleton(treeChange));
+        //session didn't start, only stats changed. No notification should be produced
+        verify(registration, never()).onSessionStarted(any());
+        verify(registration, never()).onSessionEnded(any());
+    }
+
     @Test
     public void testOnDataChangedSessionDeleted() throws Exception {
         final Session session = createSession(1);
@@ -70,10 +88,15 @@ public class SessionNotificationProducerTest {
     }
 
     private Session createSession(long id) {
+        return createSessionWithInRpcCount(id, 0);
+    }
+
+    private Session createSessionWithInRpcCount(long id, long inRpc) {
         return new SessionBuilder()
                 .setSessionId(id)
                 .setSourceHost(new Host("0.0.0.0".toCharArray()))
                 .setUsername("user")
+                .setInRpcs(new ZeroBasedCounter32(inRpc))
                 .build();
     }
 
@@ -83,6 +106,7 @@ public class SessionNotificationProducerTest {
         final DataObjectModification<Session> changeObject = mock(DataObjectModification.class);
         switch (type) {
             case WRITE:
+                doReturn(null).when(changeObject).getDataBefore();
                 doReturn(session).when(changeObject).getDataAfter();
                 break;
             case DELETE:
index da186336cd7e63ee09f77ef5f13e24d605998fd3..e60b71eb1f2dcdd0a7476fc0e20744da55605def 100644 (file)
@@ -8,15 +8,22 @@
 package org.opendaylight.netconf.api.monitoring;
 
 import com.google.common.base.Optional;
+import java.util.Collection;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Capabilities;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Schemas;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Sessions;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
 
-public interface NetconfMonitoringService extends CapabilityListener, SessionListener {
+public interface NetconfMonitoringService {
 
     Sessions getSessions();
 
+    /**
+     * Returns session monitoring service session listener, which is used to notify monitoring service about state of session.
+     * @return session listener
+     */
+    SessionListener getSessionListener();
+
     Schemas getSchemas();
 
     String getSchemaForCapability(String moduleName, Optional<String> revision);
@@ -24,13 +31,35 @@ public interface NetconfMonitoringService extends CapabilityListener, SessionLis
     Capabilities getCapabilities();
 
     /**
-     * Allows push based state information transfer. After the listener is registered, current state is pushed to the listener.
+     * Allows push based capabilities information transfer. After the listener is registered, current state is pushed to the listener.
      * @param listener Monitoring listener
      * @return listener registration
      */
-    AutoCloseable registerListener(MonitoringListener listener);
+    AutoCloseable registerCapabilitiesListener(CapabilitiesListener listener);
+
+    /**
+     * Allows push based sessions information transfer.
+     * @param listener Monitoring listener
+     * @return listener registration
+     */
+    AutoCloseable registerSessionsListener(SessionsListener listener);
+
+    interface CapabilitiesListener {
+
+        /**
+         * Callback used to notify about a change in used capabilities
+         * @param capabilities resulting capabilities
+         */
+        void onCapabilitiesChanged(Capabilities capabilities);
+
+        /**
+         * Callback used to notify about a change in used schemas
+         * @param schemas resulting schemas
+         */
+        void onSchemasChanged(Schemas schemas);
+    }
 
-    interface MonitoringListener {
+    interface SessionsListener {
         /**
          * Callback used to notify about netconf session start
          * @param session started session
@@ -44,15 +73,12 @@ public interface NetconfMonitoringService extends CapabilityListener, SessionLis
         void onSessionEnded(Session session);
 
         /**
-         * Callback used to notify about a change in used capabilities
-         * @param capabilities actual capabilities
+         * Callback used to notify about activity in netconf session, like
+         * rpc or notification. It is triggered at regular time interval. Session parameter
+         * contains only sessions which state was changed.
+         * @param sessions updated sessions
          */
-        void onCapabilitiesChanged(Capabilities capabilities);
+        void onSessionsUpdated(Collection<Session> sessions);
 
-        /**
-         * Callback used to notify about a change in used schemas
-         * @param schemas actual schemas
-         */
-        void onSchemasChanged(Schemas schemas);
     }
 }
diff --git a/netconf/netconf-api/src/main/java/org/opendaylight/netconf/api/monitoring/SessionEvent.java b/netconf/netconf-api/src/main/java/org/opendaylight/netconf/api/monitoring/SessionEvent.java
new file mode 100644 (file)
index 0000000..bd30c22
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.api.monitoring;
+
+/**
+ * Class represents change in netconf session.
+ */
+public class SessionEvent {
+    private final NetconfManagementSession session;
+    private final Type type;
+
+    private SessionEvent(NetconfManagementSession session, Type type) {
+        this.session = session;
+        this.type = type;
+    }
+
+    /**
+     * Returns session, where event occurred
+     * @return session
+     */
+    public NetconfManagementSession getSession() {
+        return session;
+    }
+
+    /**
+     * Returns event type
+     * @return type
+     */
+    public Type getType() {
+        return type;
+    }
+
+    public static SessionEvent inRpcSuccess(NetconfManagementSession session) {
+        return new SessionEvent(session, Type.IN_RPC_SUCCESS);
+    }
+
+    public static SessionEvent inRpcFail(NetconfManagementSession session) {
+        return new SessionEvent(session, Type.IN_RPC_FAIL);
+    }
+
+    public static SessionEvent outRpcError(NetconfManagementSession session) {
+        return new SessionEvent(session, Type.OUT_RPC_ERROR);
+    }
+
+    public static SessionEvent notification(NetconfManagementSession session) {
+        return new SessionEvent(session, Type.NOTIFICATION);
+    }
+
+    /**
+     * Session event type
+     */
+    public enum Type {
+
+        /**
+         * Correct rpc message received
+         */
+        IN_RPC_SUCCESS,
+
+        /**
+         * Incorrect rpc message received
+         */
+        IN_RPC_FAIL,
+
+        /**
+         * rpc-reply messages sent that contained an rpc-error element.
+         */
+        OUT_RPC_ERROR,
+
+        /**
+         *  Notification message sent
+         */
+        NOTIFICATION
+    }
+}
index f568eaa867241877b0f4c7a2af5083d5fa56a87c..74df29cc789b0c977ad4eccd9b3fa910cf4df74e 100644 (file)
@@ -12,7 +12,23 @@ package org.opendaylight.netconf.api.monitoring;
  * Created by mmarsale on 13.2.2015.
  */
 public interface SessionListener {
+
+    /**
+     * Callback used to notify about netconf session start
+     * @param session started session
+     */
     void onSessionUp(NetconfManagementSession session);
 
+    /**
+     * Callback used to notify about netconf session end
+     * @param session ended session
+     */
     void onSessionDown(NetconfManagementSession session);
+
+    /**
+     * Callback used to notify about activity in netconf session, like
+     * rpc or notification
+     * @param event session event, contains session and type of event
+     */
+    void onSessionEvent(SessionEvent event);
 }
index a98c76cb86763598c4e3820c260c6d8a04d755e6..2f18c3e9d17323866b5cb37cc17ea562a9e9c9f3 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>protocol-framework</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>threadpool-config-api</artifactId>
+    </dependency>
     <!-- test dependencies -->
     <dependency>
       <groupId>org.opendaylight.yangtools</groupId>
index 5a7aa06ad72148600a024fc0e211545f1f498c1e..02521e81258ddec99fd1c317ee265aec0a2287c5 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.config.yang.config.netconf.northbound.impl;
 
+import com.google.common.base.Optional;
 import org.opendaylight.netconf.impl.osgi.NetconfMonitoringServiceImpl;
 
 public class NetconfServerMonitoringModule extends org.opendaylight.controller.config.yang.config.netconf.northbound.impl.AbstractNetconfServerMonitoringModule {
@@ -26,7 +27,9 @@ public class NetconfServerMonitoringModule extends org.opendaylight.controller.c
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-        return new NetconfMonitoringServiceImpl(getAggregatorDependency());
+        return new NetconfMonitoringServiceImpl(getAggregatorDependency(),
+                Optional.fromNullable(getScheduledThreadpoolDependency()),
+                getMonitoringUpdateInterval());
     }
 
 }
index b6c88a60b255418dddde5fdcfbc6454ff104408c..f14a8f0ae021e4316d2916984ac9d46499b0e3ef 100644 (file)
@@ -29,6 +29,7 @@ import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
 import org.opendaylight.netconf.nettyutil.AbstractNetconfSession;
 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder;
 import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
+import org.opendaylight.netconf.notifications.NetconfNotification;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
@@ -52,15 +53,17 @@ public final class NetconfServerSession extends AbstractNetconfSession<NetconfSe
     private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
 
     private final NetconfHelloMessageAdditionalHeader header;
+    private final NetconfServerSessionListener sessionListener;
 
     private ZonedDateTime loginTime;
-    private long inRpcSuccess, inRpcFail, outRpcError;
+    private long inRpcSuccess, inRpcFail, outRpcError, outNotification;
     private volatile boolean delayedClose;
 
     public NetconfServerSession(final NetconfServerSessionListener sessionListener, final Channel channel, final long sessionId,
             final NetconfHelloMessageAdditionalHeader header) {
         super(sessionListener, channel, sessionId);
         this.header = header;
+        this.sessionListener = sessionListener;
         LOG.debug("Session {} created", toString());
     }
 
@@ -82,6 +85,10 @@ public final class NetconfServerSession extends AbstractNetconfSession<NetconfSe
     @Override
     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
         final ChannelFuture channelFuture = super.sendMessage(netconfMessage);
+        if (netconfMessage instanceof NetconfNotification) {
+            outNotification++;
+            sessionListener.onNotification(this, (NetconfNotification) netconfMessage);
+        }
         // delayed close was set, close after the message was sent
         if(delayedClose) {
             channelFuture.addListener(new ChannelFutureListener() {
@@ -137,7 +144,7 @@ public final class NetconfServerSession extends AbstractNetconfSession<NetconfSe
         builder.setUsername(header.getUserName());
         builder.setTransport(getTransportForString(header.getTransport()));
 
-        builder.setOutNotifications(new ZeroBasedCounter32(0L));
+        builder.setOutNotifications(new ZeroBasedCounter32(outNotification));
 
         builder.setKey(new SessionKey(getSessionId()));
 
index f3a26ce40b97494d71a00cf10b72c6f3f0069d62..4117c3189f29e8cf13a1913238e54ec7f9995518 100644 (file)
@@ -13,14 +13,17 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import org.opendaylight.controller.config.util.xml.DocumentedException;
 import org.opendaylight.controller.config.util.xml.XmlUtil;
-import org.opendaylight.netconf.util.messages.SendErrorExceptionUtil;
 import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.NetconfSessionListener;
 import org.opendaylight.netconf.api.NetconfTerminationReason;
 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionEvent;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
 import org.opendaylight.netconf.impl.osgi.NetconfOperationRouter;
 import org.opendaylight.netconf.util.messages.SubtreeFilter;
+import org.opendaylight.netconf.notifications.NetconfNotification;
+import org.opendaylight.netconf.util.messages.SendErrorExceptionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
@@ -30,22 +33,20 @@ import org.w3c.dom.Node;
 public class NetconfServerSessionListener implements NetconfSessionListener<NetconfServerSession> {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfServerSessionListener.class);
-    private final NetconfMonitoringService monitoringService;
+    private final SessionListener monitoringSessionListener;
     private final NetconfOperationRouter operationRouter;
     private final AutoCloseable onSessionDownCloseable;
 
     public NetconfServerSessionListener(final NetconfOperationRouter operationRouter, final NetconfMonitoringService monitoringService,
                                         final AutoCloseable onSessionDownCloseable) {
         this.operationRouter = operationRouter;
-        this.monitoringService = monitoringService;
+        this.monitoringSessionListener = monitoringService.getSessionListener();
         this.onSessionDownCloseable = onSessionDownCloseable;
     }
 
     @Override
     public void onSessionUp(final NetconfServerSession netconfNetconfServerSession) {
-        monitoringService.onSessionUp(netconfNetconfServerSession);
-        // FIXME monitoring service should be also notified about all the other changes to netconf session (from ietf-netconf-monitoring point of view)
-        // This means also notifying after every message is processed
+        monitoringSessionListener.onSessionUp(netconfNetconfServerSession);
     }
 
     @Override
@@ -55,7 +56,7 @@ public class NetconfServerSessionListener implements NetconfSessionListener<Netc
     }
 
     public void onDown(final NetconfServerSession netconfNetconfServerSession) {
-        monitoringService.onSessionDown(netconfNetconfServerSession);
+        monitoringSessionListener.onSessionDown(netconfNetconfServerSession);
 
         try {
             operationRouter.close();
@@ -87,19 +88,27 @@ public class NetconfServerSessionListener implements NetconfSessionListener<Netc
                     session);
             LOG.debug("Responding with message {}", message);
             session.sendMessage(message);
+            monitoringSessionListener.onSessionEvent(SessionEvent.inRpcSuccess(session));
         } catch (final RuntimeException e) {
             // TODO: should send generic error or close session?
             LOG.error("Unexpected exception", e);
             session.onIncommingRpcFail();
+            monitoringSessionListener.onSessionEvent(SessionEvent.inRpcFail(session));
             throw new IllegalStateException("Unable to process incoming message " + netconfMessage, e);
         } catch (DocumentedException e) {
             LOG.trace("Error occurred while processing message",e);
             session.onOutgoingRpcError();
             session.onIncommingRpcFail();
+            monitoringSessionListener.onSessionEvent(SessionEvent.inRpcFail(session));
+            monitoringSessionListener.onSessionEvent(SessionEvent.outRpcError(session));
             SendErrorExceptionUtil.sendErrorMessage(session, e, netconfMessage);
         }
     }
 
+    public void onNotification(final NetconfServerSession session, final NetconfNotification notification) {
+        monitoringSessionListener.onSessionEvent(SessionEvent.notification(session));
+    }
+
     private NetconfMessage processDocument(final NetconfMessage netconfMessage, final NetconfServerSession session)
             throws DocumentedException {
 
diff --git a/netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfCapabilityMonitoringService.java b/netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfCapabilityMonitoringService.java
new file mode 100644 (file)
index 0000000..afc1da6
--- /dev/null
@@ -0,0 +1,267 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.impl.osgi;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.config.util.capability.BasicCapability;
+import org.opendaylight.controller.config.util.capability.Capability;
+import org.opendaylight.netconf.api.monitoring.CapabilityListener;
+import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
+import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
+import org.opendaylight.netconf.notifications.BaseNotificationPublisherRegistration;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.Yang;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Capabilities;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Schemas;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SchemasBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.Schema;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.SchemaBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.SchemaKey;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.ChangedByBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.changed.by.server.or.user.ServerBuilder;
+
+class NetconfCapabilityMonitoringService implements CapabilityListener, AutoCloseable {
+
+    private static final Schema.Location NETCONF_LOCATION = new Schema.Location(Schema.Location.Enumeration.NETCONF);
+    private static final List<Schema.Location> NETCONF_LOCATIONS = ImmutableList.of(NETCONF_LOCATION);
+    private static final BasicCapability CANDIDATE_CAPABILITY = new BasicCapability("urn:ietf:params:netconf:capability:candidate:1.0");
+    private static final Function<Capability, Uri> CAPABILITY_TO_URI = new Function<Capability, Uri>() {
+        @Override
+        public Uri apply(final Capability input) {
+            return new Uri(input.getCapabilityUri());
+        }
+    };
+
+    private final Set<NetconfManagementSession> sessions = new HashSet<>();
+    private final NetconfOperationServiceFactory netconfOperationProvider;
+    private final Map<Uri, Capability> capabilities = new HashMap<>();
+    private final Map<String, Map<String, String>> mappedModulesToRevisionToSchema = Maps.newHashMap();
+
+
+    private final Set<NetconfMonitoringService.CapabilitiesListener> listeners = Sets.newHashSet();
+    private volatile BaseNotificationPublisherRegistration notificationPublisher;
+
+    NetconfCapabilityMonitoringService(final NetconfOperationServiceFactory netconfOperationProvider) {
+        this.netconfOperationProvider = netconfOperationProvider;
+        netconfOperationProvider.registerCapabilityListener(this);
+    }
+
+    synchronized Schemas getSchemas() {
+        try {
+            return transformSchemas(netconfOperationProvider.getCapabilities());
+        } catch (final RuntimeException e) {
+            throw e;
+        } catch (final Exception e) {
+            throw new IllegalStateException("Exception while closing", e);
+        }
+    }
+
+    synchronized String getSchemaForModuleRevision(final String moduleName, final Optional<String> revision) {
+
+        Map<String, String> revisionMapRequest = mappedModulesToRevisionToSchema.get(moduleName);
+        Preconditions.checkState(revisionMapRequest != null, "Capability for module %s not present, " + ""
+                + "available modules : %s", moduleName, Collections2.transform(capabilities.values(), CAPABILITY_TO_URI));
+
+        if (revision.isPresent()) {
+            String schema = revisionMapRequest.get(revision.get());
+
+            Preconditions.checkState(schema != null,
+                    "Capability for module %s:%s not present, available revisions for module: %s", moduleName,
+                    revision.get(), revisionMapRequest.keySet());
+
+            return schema;
+        } else {
+            Preconditions.checkState(revisionMapRequest.size() == 1,
+                    "Expected 1 capability for module %s, available revisions : %s", moduleName,
+                    revisionMapRequest.keySet());
+            //Only one revision is present, so return it
+            return revisionMapRequest.values().iterator().next();
+        }
+    }
+
+    private void updateCapabilityToSchemaMap(final Set<Capability> added, final Set<Capability> removed) {
+        for (final Capability cap : added) {
+            if (!isValidModuleCapability(cap)){
+                continue;
+            }
+
+            final String currentModuleName = cap.getModuleName().get();
+            Map<String, String> revisionMap = mappedModulesToRevisionToSchema.get(currentModuleName);
+            if (revisionMap == null) {
+                revisionMap = Maps.newHashMap();
+                mappedModulesToRevisionToSchema.put(currentModuleName, revisionMap);
+            }
+
+            final String currentRevision = cap.getRevision().get();
+            revisionMap.put(currentRevision, cap.getCapabilitySchema().get());
+        }
+        for (final Capability cap : removed) {
+            if (!isValidModuleCapability(cap)){
+                continue;
+            }
+            final Map<String, String> revisionMap = mappedModulesToRevisionToSchema.get(cap.getModuleName().get());
+            if (revisionMap != null) {
+                revisionMap.remove(cap.getRevision().get());
+                if (revisionMap.isEmpty()) {
+                    mappedModulesToRevisionToSchema.remove(cap.getModuleName().get());
+                }
+            }
+        }
+    }
+
+    private static boolean isValidModuleCapability(Capability cap) {
+        return cap.getModuleName().isPresent()
+                && cap.getRevision().isPresent()
+                && cap.getCapabilitySchema().isPresent();
+    }
+
+
+    synchronized Capabilities getCapabilities() {
+        return new CapabilitiesBuilder().setCapability(Lists.newArrayList(capabilities.keySet())).build();
+    }
+
+    synchronized AutoCloseable registerListener(final NetconfMonitoringService.CapabilitiesListener listener) {
+        listeners.add(listener);
+        listener.onCapabilitiesChanged(getCapabilities());
+        listener.onSchemasChanged(getSchemas());
+        return new AutoCloseable() {
+            @Override
+            public void close() throws Exception {
+                synchronized (NetconfCapabilityMonitoringService.this) {
+                    listeners.remove(listener);
+                }
+            }
+        };
+    }
+
+    private static Schemas transformSchemas(final Set<Capability> caps) {
+        final List<Schema> schemas = new ArrayList<>(caps.size());
+        for (final Capability cap : caps) {
+            if (cap.getCapabilitySchema().isPresent()) {
+                final SchemaBuilder builder = new SchemaBuilder();
+
+                Preconditions.checkState(isValidModuleCapability(cap));
+
+                builder.setNamespace(new Uri(cap.getModuleNamespace().get()));
+
+                final String version = cap.getRevision().get();
+                builder.setVersion(version);
+
+                final String identifier = cap.getModuleName().get();
+                builder.setIdentifier(identifier);
+
+                builder.setFormat(Yang.class);
+
+                builder.setLocation(transformLocations(cap.getLocation()));
+
+                builder.setKey(new SchemaKey(Yang.class, identifier, version));
+
+                schemas.add(builder.build());
+            }
+        }
+
+        return new SchemasBuilder().setSchema(schemas).build();
+    }
+
+    private static List<Schema.Location> transformLocations(final Collection<String> locations) {
+        if (locations.isEmpty()) {
+            return NETCONF_LOCATIONS;
+        }
+
+        final Builder<Schema.Location> b = ImmutableList.builder();
+        b.add(NETCONF_LOCATION);
+
+        for (final String location : locations) {
+            b.add(new Schema.Location(new Uri(location)));
+        }
+
+        return b.build();
+    }
+
+    private static Set<Capability> setupCapabilities(final Set<Capability> caps) {
+        Set<Capability> capabilities = new HashSet<>(caps);
+        capabilities.add(CANDIDATE_CAPABILITY);
+        // TODO rollback on error not supported EditConfigXmlParser:100
+        // [RFC6241] 8.5.  Rollback-on-Error Capability
+        // capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:rollback-on-error:1.0"));
+        return capabilities;
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        listeners.clear();
+        sessions.clear();
+        capabilities.clear();
+    }
+
+    @Override
+    public synchronized void onCapabilitiesChanged(Set<Capability> added, Set<Capability> removed) {
+        onCapabilitiesAdded(added);
+        onCapabilitiesRemoved(removed);
+        updateCapabilityToSchemaMap(added, removed);
+        notifyCapabilityChanged(getCapabilities());
+
+        // publish notification to notification collector about changed capabilities
+        if (notificationPublisher != null) {
+            notificationPublisher.onCapabilityChanged(computeDiff(added, removed));
+        }
+    }
+
+    private void notifyCapabilityChanged(Capabilities capabilities) {
+        for (NetconfMonitoringService.CapabilitiesListener listener : listeners) {
+            listener.onCapabilitiesChanged(capabilities);
+            listener.onSchemasChanged(getSchemas());
+        }
+    }
+
+
+    private static NetconfCapabilityChange computeDiff(final Set<Capability> added, final Set<Capability> removed) {
+        final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
+        netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build());
+        netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, CAPABILITY_TO_URI)));
+        netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, CAPABILITY_TO_URI)));
+        // TODO modified should be computed ... but why ?
+        netconfCapabilityChangeBuilder.setModifiedCapability(Collections.<Uri>emptyList());
+        return netconfCapabilityChangeBuilder.build();
+    }
+
+
+    private void onCapabilitiesAdded(final Set<Capability> addedCaps) {
+        this.capabilities.putAll(Maps.uniqueIndex(setupCapabilities(addedCaps), CAPABILITY_TO_URI));
+    }
+
+    private void onCapabilitiesRemoved(final Set<Capability> removedCaps) {
+        for (final Capability addedCap : removedCaps) {
+            capabilities.remove(CAPABILITY_TO_URI.apply(addedCap));
+        }
+    }
+
+    void setNotificationPublisher(final BaseNotificationPublisherRegistration notificationPublisher) {
+        this.notificationPublisher = notificationPublisher;
+    }
+}
index 72850737a2b3fefe966c61d287a69513c98b20dd..9c8905afffc451e1b809c8f2d5ec7258d8812414 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  */
 package org.opendaylight.netconf.impl.osgi;
 
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.config.util.capability.BasicCapability;
-import org.opendaylight.controller.config.util.capability.Capability;
-import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
 import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
 import org.opendaylight.netconf.notifications.BaseNotificationPublisherRegistration;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.Yang;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Capabilities;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Schemas;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SchemasBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Sessions;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SessionsBuilder;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.Schema;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.SchemaBuilder;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.SchemaKey;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.ChangedByBuilder;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.changed.by.server.or.user.ServerBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class NetconfMonitoringServiceImpl implements NetconfMonitoringService, AutoCloseable {
 
-    private static final Schema.Location NETCONF_LOCATION = new Schema.Location(Schema.Location.Enumeration.NETCONF);
-    private static final List<Schema.Location> NETCONF_LOCATIONS = ImmutableList.of(NETCONF_LOCATION);
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfMonitoringServiceImpl.class);
-    private static final Function<NetconfManagementSession, Session> SESSION_FUNCTION = new Function<NetconfManagementSession, Session>() {
-        @Override
-        public Session apply(@Nonnull final NetconfManagementSession input) {
-            return input.toManagementSession();
-        }
-    };
-    private static final Function<Capability, Uri> CAPABILITY_TO_URI = new Function<Capability, Uri>() {
-        @Override
-        public Uri apply(final Capability input) {
-            return new Uri(input.getCapabilityUri());
-        }
-    };
+    private final NetconfCapabilityMonitoringService capabilityMonitoring;
+    private final NetconfSessionMonitoringService sessionMonitoring;
 
-    private final Set<NetconfManagementSession> sessions = new HashSet<>();
-    private final NetconfOperationServiceFactory netconfOperationProvider;
-    private final Map<Uri, Capability> capabilities = new HashMap<>();
-    private final Map<String, Map<String, String>> mappedModulesToRevisionToSchema = Maps.newHashMap();
-
-    private final Set<MonitoringListener> listeners = Sets.newHashSet();
-    private volatile BaseNotificationPublisherRegistration notificationPublisher;
-
-    public NetconfMonitoringServiceImpl(final NetconfOperationServiceFactory netconfOperationProvider) {
-        this.netconfOperationProvider = netconfOperationProvider;
-        netconfOperationProvider.registerCapabilityListener(this);
+    public NetconfMonitoringServiceImpl(NetconfOperationServiceFactory opProvider) {
+        this(opProvider, Optional.absent(), 0);
     }
 
-    @Override
-    public synchronized void onSessionUp(final NetconfManagementSession session) {
-        LOG.debug("Session {} up", session);
-        Preconditions.checkState(!sessions.contains(session), "Session %s was already added", session);
-        sessions.add(session);
-        notifySessionUp(session);
-    }
+    public NetconfMonitoringServiceImpl(NetconfOperationServiceFactory opProvider,
+                                        Optional<ScheduledThreadPool> threadPool,
+                                        long updateInterval) {
+        this.capabilityMonitoring = new NetconfCapabilityMonitoringService(opProvider);
+        this.sessionMonitoring = new NetconfSessionMonitoringService(threadPool, updateInterval);
 
-    @Override
-    public synchronized void onSessionDown(final NetconfManagementSession session) {
-        LOG.debug("Session {} down", session);
-        Preconditions.checkState(sessions.contains(session), "Session %s not present", session);
-        sessions.remove(session);
-        notifySessionDown(session);
     }
 
     @Override
-    public synchronized Sessions getSessions() {
-        return new SessionsBuilder().setSession(ImmutableList.copyOf(Collections2.transform(sessions, SESSION_FUNCTION))).build();
+    public Sessions getSessions() {
+        return sessionMonitoring.getSessions();
     }
 
     @Override
-    public synchronized Schemas getSchemas() {
-        try {
-            return transformSchemas(netconfOperationProvider.getCapabilities());
-        } catch (final RuntimeException e) {
-            throw e;
-        } catch (final Exception e) {
-            throw new IllegalStateException("Exception while closing", e);
-        }
+    public SessionListener getSessionListener() {
+        return sessionMonitoring;
     }
 
     @Override
-    public synchronized String getSchemaForCapability(final String moduleName, final Optional<String> revision) {
-
-        Map<String, String> revisionMapRequest = mappedModulesToRevisionToSchema.get(moduleName);
-        Preconditions.checkState(revisionMapRequest != null, "Capability for module %s not present, " + ""
-                + "available modules : %s", moduleName, Collections2.transform(capabilities.values(), CAPABILITY_TO_URI));
-
-        if (revision.isPresent()) {
-            String schema = revisionMapRequest.get(revision.get());
-
-            Preconditions.checkState(schema != null,
-                    "Capability for module %s:%s not present, available revisions for module: %s", moduleName,
-                    revision.get(), revisionMapRequest.keySet());
-
-            return schema;
-        } else {
-            Preconditions.checkState(revisionMapRequest.size() == 1,
-                    "Expected 1 capability for module %s, available revisions : %s", moduleName,
-                    revisionMapRequest.keySet());
-            return revisionMapRequest.values().iterator().next();
-        }
-    }
-
-    private synchronized void updateCapabilityToSchemaMap(final Set<Capability> added, final Set<Capability> removed) {
-        for (final Capability cap : added) {
-            if (!isValidModuleCapability(cap)){
-                continue;
-            }
-
-            final String currentModuleName = cap.getModuleName().get();
-            Map<String, String> revisionMap = mappedModulesToRevisionToSchema.get(currentModuleName);
-            if (revisionMap == null) {
-                revisionMap = Maps.newHashMap();
-                mappedModulesToRevisionToSchema.put(currentModuleName, revisionMap);
-            }
-
-            final String currentRevision = cap.getRevision().get();
-            revisionMap.put(currentRevision, cap.getCapabilitySchema().get());
-        }
-        for (final Capability cap : removed) {
-            if (!isValidModuleCapability(cap)){
-                continue;
-            }
-            final Map<String, String> revisionMap = mappedModulesToRevisionToSchema.get(cap.getModuleName().get());
-            if (revisionMap != null) {
-                revisionMap.remove(cap.getRevision().get());
-                if (revisionMap.isEmpty()) {
-                    mappedModulesToRevisionToSchema.remove(cap.getModuleName().get());
-                }
-            }
-        }
-    }
-
-    private boolean isValidModuleCapability(Capability cap) {
-        return cap.getModuleName().isPresent()
-                && cap.getRevision().isPresent()
-                && cap.getCapabilitySchema().isPresent();
+    public Schemas getSchemas() {
+        return capabilityMonitoring.getSchemas();
     }
 
     @Override
-    public synchronized Capabilities getCapabilities() {
-        return new CapabilitiesBuilder().setCapability(Lists.newArrayList(capabilities.keySet())).build();
+    public String getSchemaForCapability(String moduleName, Optional<String> revision) {
+        return capabilityMonitoring.getSchemaForModuleRevision(moduleName, revision);
     }
 
     @Override
-    public synchronized AutoCloseable registerListener(final MonitoringListener listener) {
-        listeners.add(listener);
-        listener.onCapabilitiesChanged(getCapabilities());
-        listener.onSchemasChanged(getSchemas());
-        return new AutoCloseable() {
-            @Override
-            public void close() throws Exception {
-                listeners.remove(listener);
-            }
-        };
-    }
-
-    private static Schemas transformSchemas(final Set<Capability> caps) {
-        final List<Schema> schemas = new ArrayList<>(caps.size());
-        for (final Capability cap : caps) {
-            if (cap.getCapabilitySchema().isPresent()) {
-                final SchemaBuilder builder = new SchemaBuilder();
-                Preconditions.checkState(cap.getModuleNamespace().isPresent());
-                builder.setNamespace(new Uri(cap.getModuleNamespace().get()));
-
-                Preconditions.checkState(cap.getRevision().isPresent());
-                final String version = cap.getRevision().get();
-                builder.setVersion(version);
-
-                Preconditions.checkState(cap.getModuleName().isPresent());
-                final String identifier = cap.getModuleName().get();
-                builder.setIdentifier(identifier);
-
-                builder.setFormat(Yang.class);
-
-                builder.setLocation(transformLocations(cap.getLocation()));
-
-                builder.setKey(new SchemaKey(Yang.class, identifier, version));
-
-                schemas.add(builder.build());
-            }
-        }
-
-        return new SchemasBuilder().setSchema(schemas).build();
-    }
-
-    private static List<Schema.Location> transformLocations(final Collection<String> locations) {
-        if (locations.isEmpty()) {
-            return NETCONF_LOCATIONS;
-        }
-
-        final Builder<Schema.Location> b = ImmutableList.builder();
-        b.add(NETCONF_LOCATION);
-
-        for (final String location : locations) {
-            b.add(new Schema.Location(new Uri(location)));
-        }
-
-        return b.build();
-    }
-
-    public static Set<Capability> setupCapabilities(final Set<Capability> caps) {
-        Set<Capability> capabilities = new HashSet<>(caps);
-        capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:candidate:1.0"));
-        // TODO rollback on error not supported EditConfigXmlParser:100
-        // [RFC6241] 8.5.  Rollback-on-Error Capability
-        // capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:rollback-on-error:1.0"));
-        return capabilities;
+    public Capabilities getCapabilities() {
+        return capabilityMonitoring.getCapabilities();
     }
 
     @Override
-    public synchronized void close() throws Exception {
-        listeners.clear();
-        sessions.clear();
-        capabilities.clear();
+    public AutoCloseable registerCapabilitiesListener(CapabilitiesListener listener) {
+        return capabilityMonitoring.registerListener(listener);
     }
 
     @Override
-    public void onCapabilitiesChanged(Set<Capability> added, Set<Capability> removed) {
-        onCapabilitiesAdded(added);
-        onCapabilitiesRemoved(removed);
-        updateCapabilityToSchemaMap(added, removed);
-        notifyCapabilityChanged(getCapabilities());
-
-        // publish notification to notification collector about changed capabilities
-        if (notificationPublisher != null) {
-            notificationPublisher.onCapabilityChanged(computeDiff(added, removed));
-        }
-    }
-
-    private void notifyCapabilityChanged(Capabilities capabilities) {
-        for (MonitoringListener listener : listeners) {
-            listener.onCapabilitiesChanged(capabilities);
-            listener.onSchemasChanged(getSchemas());
-        }
-    }
-
-    private void notifySessionUp(NetconfManagementSession managementSession) {
-        Session session = SESSION_FUNCTION.apply(managementSession);
-        for (MonitoringListener listener : listeners) {
-            listener.onSessionStarted(session);
-        }
+    public AutoCloseable registerSessionsListener(SessionsListener listener) {
+        return sessionMonitoring.registerListener(listener);
     }
 
-    private void notifySessionDown(NetconfManagementSession managementSession) {
-        Session session = SESSION_FUNCTION.apply(managementSession);
-        for (MonitoringListener listener : listeners) {
-            listener.onSessionEnded(session);
-        }
+    public void setNotificationPublisher(BaseNotificationPublisherRegistration notificationPublisher) {
+        this.capabilityMonitoring.setNotificationPublisher(notificationPublisher);
     }
 
-    static NetconfCapabilityChange computeDiff(final Set<Capability> added, final Set<Capability> removed) {
-        final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
-        netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build());
-        netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, CAPABILITY_TO_URI)));
-        netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, CAPABILITY_TO_URI)));
-        // TODO modified should be computed ... but why ?
-        netconfCapabilityChangeBuilder.setModifiedCapability(Collections.<Uri>emptyList());
-        return netconfCapabilityChangeBuilder.build();
-    }
-
-
-    private synchronized void onCapabilitiesAdded(final Set<Capability> addedCaps) {
-        this.capabilities.putAll(Maps.uniqueIndex(setupCapabilities(addedCaps), CAPABILITY_TO_URI));
-    }
-
-    private synchronized void onCapabilitiesRemoved(final Set<Capability> addedCaps) {
-        for (final Capability addedCap : addedCaps) {
-            capabilities.remove(CAPABILITY_TO_URI.apply(addedCap));
-        }
-    }
-
-    public void setNotificationPublisher(final BaseNotificationPublisherRegistration notificationPublisher) {
-        this.notificationPublisher = notificationPublisher;
+    @Override
+    public void close() throws Exception {
+        capabilityMonitoring.close();
+        sessionMonitoring.close();
     }
 }
diff --git a/netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringService.java b/netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringService.java
new file mode 100644 (file)
index 0000000..68883fd
--- /dev/null
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.impl.osgi;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
+import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionEvent;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Sessions;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SessionsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements {@link SessionListener} to receive updates about Netconf sessions. Instance notifies its listeners
+ * about session start and end. It also publishes on regular interval list of sessions,
+ * where events like rpc or notification happened.
+ */
+class NetconfSessionMonitoringService implements SessionListener, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfSessionMonitoringService.class);
+
+    private final Set<NetconfManagementSession> sessions = Sets.newHashSet();
+    private final Set<NetconfManagementSession> changedSessions = Sets.newHashSet();
+    private final Set<NetconfMonitoringService.SessionsListener> listeners = Sets.newHashSet();
+    private final ScheduledExecutorService executor;
+    private final long updateInterval;
+    private boolean running;
+
+    /**
+     * @param schedulingThreadPool thread pool for scheduling session stats updates. If not present, updates won't be scheduled.
+     * @param updateInterval update interval. If is less than 0, updates won't be scheduled
+     */
+    NetconfSessionMonitoringService(Optional<ScheduledThreadPool> schedulingThreadPool, long updateInterval) {
+        this.updateInterval = updateInterval;
+        if (schedulingThreadPool.isPresent() && updateInterval > 0) {
+            this.executor =  schedulingThreadPool.get().getExecutor();
+            LOG.info("/netconf-state/sessions will be updated every {} seconds.", updateInterval);
+        } else {
+            LOG.info("Scheduling thread pool is present = {}, update interval {}: /netconf-state/sessions won't be updated.",
+                    schedulingThreadPool.isPresent(), updateInterval);
+            this.executor = null;
+        }
+    }
+
+    synchronized Sessions getSessions() {
+        final Collection<Session> managementSessions = Collections2.transform(sessions, NetconfManagementSession::toManagementSession);
+        return new SessionsBuilder().setSession(ImmutableList.copyOf(managementSessions)).build();
+    }
+
+    @Override
+    public synchronized void onSessionUp(final NetconfManagementSession session) {
+        LOG.debug("Session {} up", session);
+        Preconditions.checkState(!sessions.contains(session), "Session %s was already added", session);
+        sessions.add(session);
+        notifySessionUp(session);
+    }
+
+    @Override
+    public synchronized void onSessionDown(final NetconfManagementSession session) {
+        LOG.debug("Session {} down", session);
+        Preconditions.checkState(sessions.contains(session), "Session %s not present", session);
+        sessions.remove(session);
+        changedSessions.remove(session);
+        notifySessionDown(session);
+    }
+
+    @Override
+    public synchronized void onSessionEvent(SessionEvent event) {
+        changedSessions.add(event.getSession());
+    }
+
+    synchronized AutoCloseable registerListener(final NetconfMonitoringService.SessionsListener listener) {
+        listeners.add(listener);
+        if (!running) {
+            startUpdateSessionStats();
+        }
+        return new AutoCloseable() {
+            @Override
+            public void close() throws Exception {
+                listeners.remove(listener);
+            }
+        };
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        stopUpdateSessionStats();
+        listeners.clear();
+        sessions.clear();
+    }
+
+    private synchronized void updateSessionStats() {
+        if (changedSessions.isEmpty()) {
+            return;
+        }
+        final List<Session> changed = changedSessions.stream()
+                .map(NetconfManagementSession::toManagementSession)
+                .collect(Collectors.toList());
+        final ImmutableList<Session> sessionImmutableList = ImmutableList.copyOf(changed);
+        for (NetconfMonitoringService.SessionsListener listener : listeners) {
+            listener.onSessionsUpdated(sessionImmutableList);
+        }
+        changedSessions.clear();
+    }
+
+    private void notifySessionUp(NetconfManagementSession managementSession) {
+        Session session = managementSession.toManagementSession();
+        for (NetconfMonitoringService.SessionsListener listener : listeners) {
+            listener.onSessionStarted(session);
+        }
+    }
+
+    private void notifySessionDown(NetconfManagementSession managementSession) {
+        Session session = managementSession.toManagementSession();
+        for (NetconfMonitoringService.SessionsListener listener : listeners) {
+            listener.onSessionEnded(session);
+        }
+    }
+
+    private void startUpdateSessionStats() {
+        if (executor != null) {
+            executor.scheduleAtFixedRate(this::updateSessionStats, 1, updateInterval, TimeUnit.SECONDS);
+            running = true;
+        }
+    }
+
+    private void stopUpdateSessionStats() {
+        if (executor != null) {
+            executor.shutdownNow();
+            running = false;
+        }
+    }
+}
index 7ad1fef55da1975e486012e690b8de0d929f3a15..aa6485a00ad0215488cb7cc798e70cb7680a2f57 100644 (file)
@@ -9,6 +9,7 @@ module netconf-northbound-impl {
     import netconf-northbound-mapper { prefix nnm; revision-date 2015-01-14; }
     import netconf-northbound { prefix nn; revision-date 2015-01-14; }
     import netty {prefix netty; }
+    import threadpool {prefix th;}
 
     description
         "This module contains the base YANG definitions for
@@ -106,6 +107,20 @@ module netconf-northbound-impl {
                     }
                 }
             }
+            container scheduled-threadpool {
+                uses config:service-ref {
+                    refine type {
+                        mandatory false;
+                        config:required-identity th:scheduled-threadpool;
+                    }
+                }
+                description "Dedicated to update netconf-state/sessions subtree on session change.";
+            }
+            leaf monitoring-update-interval {
+                description "Specifies interval in seconds after which session stats are updated. If zero, stats won't be updated.";
+                type uint32;
+                default 0;
+            }
 
         }
     }
index 35abee564046d5249f47cc3efd18090bc39d963b..420a8f5d59ec0a6eca3dd299f737cd22ac44af25 100644 (file)
@@ -12,7 +12,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anySetOf;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -52,6 +51,8 @@ import org.junit.runners.Parameterized;
 import org.opendaylight.controller.config.util.capability.Capability;
 import org.opendaylight.controller.config.util.xml.DocumentedException;
 import org.opendaylight.controller.config.util.xml.XmlUtil;
+import org.opendaylight.netconf.api.monitoring.SessionEvent;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
 import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
 import org.opendaylight.netconf.api.monitoring.CapabilityListener;
@@ -119,15 +120,17 @@ public class ConcurrentClientsTest {
 
     public static NetconfMonitoringService createMockedMonitoringService() {
         NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
-        doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
-        doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
+        final SessionListener sessionListener = mock(SessionListener.class);
+        doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class));
+        doNothing().when(sessionListener).onSessionDown(any(NetconfServerSession.class));
+        doNothing().when(sessionListener).onSessionEvent(any(SessionEvent.class));
         doReturn(new AutoCloseable() {
             @Override
             public void close() throws Exception {
 
             }
-        }).when(monitoring).registerListener(any(NetconfMonitoringService.MonitoringListener.class));
-        doNothing().when(monitoring).onCapabilitiesChanged(anySetOf(Capability.class), anySetOf(Capability.class));
+        }).when(monitoring).registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
+        doReturn(sessionListener).when(monitoring).getSessionListener();
         doReturn(new CapabilitiesBuilder().setCapability(Collections.<Uri>emptyList()).build()).when(monitoring).getCapabilities();
         return monitoring;
     }
similarity index 82%
rename from netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfMonitoringServiceImplTest.java
rename to netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfCapabilityMonitoringServiceTest.java
index 1359a12cd329cc6edd7b336fe9e14a69ff34c6fc..b69a2f2a03128c84071adb85c493437097d2cbfe 100644 (file)
@@ -47,7 +47,7 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.not
 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
 import org.opendaylight.yangtools.yang.model.api.Module;
 
-public class NetconfMonitoringServiceImplTest {
+public class NetconfCapabilityMonitoringServiceTest {
 
     private static final String TEST_MODULE_CONTENT = "content";
     private static final String TEST_MODULE_CONTENT2 = "content2";
@@ -78,11 +78,11 @@ public class NetconfMonitoringServiceImplTest {
     @Mock
     private NetconfManagementSession sessionMock;
     @Mock
-    private NetconfMonitoringService.MonitoringListener listener;
+    private NetconfMonitoringService.CapabilitiesListener listener;
     @Mock
     private BaseNotificationPublisherRegistration notificationPublisher;
 
-    private NetconfMonitoringServiceImpl monitoringService;
+    private NetconfCapabilityMonitoringService monitoringService;
 
     @BeforeClass
     public static void suiteSetUp() throws Exception {
@@ -111,19 +111,17 @@ public class NetconfMonitoringServiceImplTest {
         CAPABILITIES.add(new BasicCapability("urn:ietf:params:xml:ns:yang:ietf-inet-types?module=ietf-inet-types&amp;revision=2010-09-24"));
 
         doReturn(CAPABILITIES).when(operationServiceFactoryMock).getCapabilities();
-        doReturn(null).when(operationServiceFactoryMock).registerCapabilityListener(any(NetconfMonitoringServiceImpl.class));
+        doReturn(null).when(operationServiceFactoryMock).registerCapabilityListener(any(NetconfCapabilityMonitoringService.class));
 
         doReturn(SESSION).when(sessionMock).toManagementSession();
         doNothing().when(listener).onCapabilitiesChanged(any());
         doNothing().when(listener).onSchemasChanged(any());
-        doNothing().when(listener).onSessionStarted(any());
-        doNothing().when(listener).onSessionEnded(any());
 
         doNothing().when(notificationPublisher).onCapabilityChanged(any());
         doNothing().when(notificationPublisher).onSessionStarted(any());
         doNothing().when(notificationPublisher).onSessionEnded(any());
 
-        monitoringService = new NetconfMonitoringServiceImpl(operationServiceFactoryMock);
+        monitoringService = new NetconfCapabilityMonitoringService(operationServiceFactoryMock);
         monitoringService.onCapabilitiesChanged(CAPABILITIES, Collections.emptySet());
         monitoringService.setNotificationPublisher(notificationPublisher);
         monitoringService.registerListener(listener);
@@ -132,13 +130,9 @@ public class NetconfMonitoringServiceImplTest {
 
     @Test
     public void testListeners() throws Exception {
-        monitoringService.onSessionUp(sessionMock);
         HashSet<Capability> added = new HashSet<>();
         added.add(new BasicCapability("toAdd"));
         monitoringService.onCapabilitiesChanged(added, Collections.emptySet());
-        monitoringService.onSessionDown(sessionMock);
-        verify(listener).onSessionStarted(any());
-        verify(listener).onSessionEnded(any());
         //onCapabilitiesChanged and onSchemasChanged are invoked also after listener registration
         verify(listener, times(2)).onCapabilitiesChanged(any());
         verify(listener, times(2)).onSchemasChanged(any());
@@ -157,14 +151,14 @@ public class NetconfMonitoringServiceImplTest {
     public void testGetSchemaForCapability() throws Exception {
         //test multiple revisions of the same capability
         monitoringService.onCapabilitiesChanged(Collections.singleton(moduleCapability2), Collections.emptySet());
-        final String schema = monitoringService.getSchemaForCapability(TEST_MODULE_NAME, Optional.of(TEST_MODULE_REV));
+        final String schema = monitoringService.getSchemaForModuleRevision(TEST_MODULE_NAME, Optional.of(TEST_MODULE_REV));
         Assert.assertEquals(TEST_MODULE_CONTENT, schema);
-        final String schema2 = monitoringService.getSchemaForCapability(TEST_MODULE_NAME, Optional.of(TEST_MODULE_REV2));
+        final String schema2 = monitoringService.getSchemaForModuleRevision(TEST_MODULE_NAME, Optional.of(TEST_MODULE_REV2));
         Assert.assertEquals(TEST_MODULE_CONTENT2, schema2);
         //remove one revision
         monitoringService.onCapabilitiesChanged(Collections.emptySet(), Collections.singleton(moduleCapability1));
         //only one revision present
-        final String schema3 = monitoringService.getSchemaForCapability(TEST_MODULE_NAME, Optional.absent());
+        final String schema3 = monitoringService.getSchemaForModuleRevision(TEST_MODULE_NAME, Optional.absent());
         Assert.assertEquals(TEST_MODULE_CONTENT2, schema3);
     }
 
@@ -183,11 +177,8 @@ public class NetconfMonitoringServiceImplTest {
 
     @Test
     public void testClose() throws Exception {
-        monitoringService.onSessionUp(sessionMock);
-        Assert.assertFalse(monitoringService.getSessions().getSession().isEmpty());
         Assert.assertFalse(monitoringService.getCapabilities().getCapability().isEmpty());
         monitoringService.close();
-        Assert.assertTrue(monitoringService.getSessions().getSession().isEmpty());
         Assert.assertTrue(monitoringService.getCapabilities().getCapability().isEmpty());
     }
 
@@ -231,22 +222,4 @@ public class NetconfMonitoringServiceImplTest {
         Assert.assertEquals(Collections.emptySet(), new HashSet<>(afterRemove.getAddedCapability()));
     }
 
-    @Test
-    public void testOnSessionUpAndDown() throws Exception {
-        monitoringService.onSessionUp(sessionMock);
-        ArgumentCaptor<Session> sessionUpCaptor = ArgumentCaptor.forClass(Session.class);
-        verify(listener).onSessionStarted(sessionUpCaptor.capture());
-        final Session sesionUp = sessionUpCaptor.getValue();
-        Assert.assertEquals(SESSION.getSessionId(), sesionUp.getSessionId());
-        Assert.assertEquals(SESSION.getSourceHost(), sesionUp.getSourceHost());
-        Assert.assertEquals(SESSION.getUsername(), sesionUp.getUsername());
-
-        monitoringService.onSessionDown(sessionMock);
-        ArgumentCaptor<Session> sessionDownCaptor = ArgumentCaptor.forClass(Session.class);
-        verify(listener).onSessionEnded(sessionDownCaptor.capture());
-        final Session sessionDown = sessionDownCaptor.getValue();
-        Assert.assertEquals(SESSION.getSessionId(), sessionDown.getSessionId());
-        Assert.assertEquals(SESSION.getSourceHost(), sessionDown.getSourceHost());
-        Assert.assertEquals(SESSION.getUsername(), sessionDown.getUsername());
-    }
 }
diff --git a/netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringServiceTest.java b/netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringServiceTest.java
new file mode 100644 (file)
index 0000000..bb7dab8
--- /dev/null
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.impl.osgi;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.base.Optional;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.controller.config.util.capability.BasicCapability;
+import org.opendaylight.controller.config.util.capability.Capability;
+import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
+import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionEvent;
+import org.opendaylight.netconf.notifications.BaseNotificationPublisherRegistration;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.SessionBuilder;
+
+public class NetconfSessionMonitoringServiceTest {
+
+    private static final Session SESSION_1 = new SessionBuilder()
+            .setSessionId(1L)
+            .setSourceHost(new Host("0.0.0.0".toCharArray()))
+            .setUsername("admin")
+            .build();
+    private static final Session SESSION_2 = new SessionBuilder()
+            .setSessionId(2L)
+            .setSourceHost(new Host("0.0.0.0".toCharArray()))
+            .setUsername("admin")
+            .build();
+
+    @Mock
+    private NetconfManagementSession sessionMock1;
+    @Mock
+    private NetconfManagementSession sessionMock2;
+    @Mock
+    private NetconfMonitoringService.SessionsListener listener;
+    @Mock
+    private BaseNotificationPublisherRegistration notificationPublisher;
+
+    private NetconfSessionMonitoringService monitoringService;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        doReturn(SESSION_1).when(sessionMock1).toManagementSession();
+        doReturn(SESSION_2).when(sessionMock2).toManagementSession();
+        doNothing().when(listener).onSessionStarted(any());
+        doNothing().when(listener).onSessionEnded(any());
+
+        doNothing().when(notificationPublisher).onCapabilityChanged(any());
+        doNothing().when(notificationPublisher).onSessionStarted(any());
+        doNothing().when(notificationPublisher).onSessionEnded(any());
+
+        monitoringService = new NetconfSessionMonitoringService(Optional.absent(), 0);
+        monitoringService.registerListener(listener);
+    }
+
+    @Test
+    public void testListeners() throws Exception {
+        monitoringService.onSessionUp(sessionMock1);
+        HashSet<Capability> added = new HashSet<>();
+        added.add(new BasicCapability("toAdd"));
+        monitoringService.onSessionDown(sessionMock1);
+        verify(listener).onSessionStarted(any());
+        verify(listener).onSessionEnded(any());
+    }
+
+
+    @Test
+    public void testClose() throws Exception {
+        monitoringService.onSessionUp(sessionMock1);
+        Assert.assertFalse(monitoringService.getSessions().getSession().isEmpty());
+        monitoringService.close();
+        Assert.assertTrue(monitoringService.getSessions().getSession().isEmpty());
+    }
+
+
+    @Test
+    public void testOnSessionUpAndDown() throws Exception {
+        monitoringService.onSessionUp(sessionMock1);
+        ArgumentCaptor<Session> sessionUpCaptor = ArgumentCaptor.forClass(Session.class);
+        verify(listener).onSessionStarted(sessionUpCaptor.capture());
+        final Session sesionUp = sessionUpCaptor.getValue();
+        Assert.assertEquals(SESSION_1.getSessionId(), sesionUp.getSessionId());
+        Assert.assertEquals(SESSION_1.getSourceHost(), sesionUp.getSourceHost());
+        Assert.assertEquals(SESSION_1.getUsername(), sesionUp.getUsername());
+
+        monitoringService.onSessionDown(sessionMock1);
+        ArgumentCaptor<Session> sessionDownCaptor = ArgumentCaptor.forClass(Session.class);
+        verify(listener).onSessionEnded(sessionDownCaptor.capture());
+        final Session sessionDown = sessionDownCaptor.getValue();
+        Assert.assertEquals(SESSION_1.getSessionId(), sessionDown.getSessionId());
+        Assert.assertEquals(SESSION_1.getSourceHost(), sessionDown.getSourceHost());
+        Assert.assertEquals(SESSION_1.getUsername(), sessionDown.getUsername());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testListenerUpdateSession() throws Exception {
+        ScheduledThreadPool threadPool = mock(ScheduledThreadPool.class);
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+        doReturn(executor).when(threadPool).getExecutor();
+        monitoringService = new NetconfSessionMonitoringService(Optional.of(threadPool), 1);
+        monitoringService.registerListener(listener);
+        monitoringService.onSessionUp(sessionMock1);
+        monitoringService.onSessionUp(sessionMock2);
+        monitoringService.onSessionEvent(SessionEvent.inRpcSuccess(sessionMock1));
+        ArgumentCaptor<Collection> captor =
+                ArgumentCaptor.forClass(Collection.class);
+        verify(listener, timeout(2000)).onSessionsUpdated(captor.capture());
+        final Collection<Session> value = captor.getValue();
+        Assert.assertTrue(value.contains(SESSION_1));
+        Assert.assertFalse(value.contains(SESSION_2));
+        monitoringService.close();
+    }
+}
index ccd416379640f2e9fa0d668d9031cf1a301514b4..387c36d27ac1c288b1ef35584a31ac3b28d1dacf 100644 (file)
                   <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:netconf:north:mapper">dom:netconf-northbound-mapper</type>
                   <name>mapper-aggregator</name>
               </aggregator>
+              <scheduled-threadpool xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:netconf:northbound:impl">
+                  <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadpool</type>
+                  <name>global-netconf-ssh-scheduled-executor</name>
+              </scheduled-threadpool>
+              <monitoring-update-interval xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:netconf:northbound:impl">6</monitoring-update-interval>
           </module>
 
           <module>
index af332963e61a61a64c1effca215aee4fdcf6c285..156fe9c31a65dd4d6c3cd054a44e98fb2e9ca301 100644 (file)
@@ -20,6 +20,7 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.config.util.xml.XmlUtil;
 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
 import org.opendaylight.netconf.monitoring.xml.model.NetconfState;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
@@ -78,7 +79,6 @@ public class JaxBSerializerTest {
 
         final NetconfState model = new NetconfState(monitoringService);
         final String xml = XmlUtil.toString(new JaxBSerializer().toXml(model)).replaceAll("\\s", "");
-        System.out.println(xml);
         assertThat(xml, CoreMatchers.containsString(
                 "<schema>" +
                 "<format>yang</format>" +
index 60f08e19ca72e2f0fda84210b9cde446608944b5..a4144b87003345595c8e65357440151a101dcffd 100644 (file)
@@ -18,8 +18,8 @@ import java.util.Collections;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.config.util.capability.Capability;
-import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.Yang;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Capabilities;
@@ -86,6 +86,11 @@ public class DummyMonitoringService implements NetconfMonitoringService {
         return EMPTY_SESSIONS;
     }
 
+    @Override
+    public SessionListener getSessionListener() {
+        return null;
+    }
+
     @Override
     public Schemas getSchemas() {
         return schemas;
@@ -108,22 +113,13 @@ public class DummyMonitoringService implements NetconfMonitoringService {
     }
 
     @Override
-    public AutoCloseable registerListener(MonitoringListener listener) {
+    public AutoCloseable registerCapabilitiesListener(CapabilitiesListener listener) {
         return null;
     }
 
     @Override
-    public void onSessionUp(NetconfManagementSession session) {
-
-    }
-
-    @Override
-    public void onSessionDown(NetconfManagementSession session) {
-
+    public AutoCloseable registerSessionsListener(SessionsListener listener) {
+        return null;
     }
 
-    @Override
-    public void onCapabilitiesChanged(Set<Capability> added, Set<Capability> removed) {
-
-    }
 }