Bug 3864: Notify netconf monitoring about changes in session 09/36809/8
authorAndrej Mak <andmak@cisco.com>
Tue, 29 Mar 2016 09:26:28 +0000 (11:26 +0200)
committerAndrej Mak <andmak@cisco.com>
Tue, 3 May 2016 05:43:06 +0000 (07:43 +0200)
According to https://tools.ietf.org/html/rfc6022#section-2.1.4,
netconf-state/sessions should contain number of received rpcs, notifications etc.
Because of performance reasons, data tree is not updated after every message, but
NetconfSessionMonitoringService periodically publish updates with changed sessions
to its session listeners. MonitoringToMdsalWriter writes changes to datastore.

Change-Id: I5cdc2a18a036541fd23f430be6e19ba395ca62c9
Signed-off-by: Andrej Mak <andmak@cisco.com>
22 files changed:
features/netconf/src/main/features/features.xml
netconf/mdsal-netconf-monitoring/src/main/java/org/opendaylight/controller/config/yang/netconf/mdsal/monitoring/MonitoringToMdsalWriter.java
netconf/mdsal-netconf-monitoring/src/test/java/org/opendaylight/controller/config/yang/netconf/mdsal/monitoring/MonitoringToMdsalWriterTest.java
netconf/mdsal-netconf-notification/src/main/java/org/opendaylight/controller/config/yang/netconf/mdsal/notification/SessionNotificationProducer.java
netconf/mdsal-netconf-notification/src/test/java/org/opendaylight/controller/config/yang/netconf/mdsal/notification/SessionNotificationProducerTest.java
netconf/netconf-api/src/main/java/org/opendaylight/netconf/api/monitoring/NetconfMonitoringService.java
netconf/netconf-api/src/main/java/org/opendaylight/netconf/api/monitoring/SessionEvent.java [new file with mode: 0644]
netconf/netconf-api/src/main/java/org/opendaylight/netconf/api/monitoring/SessionListener.java
netconf/netconf-impl/pom.xml
netconf/netconf-impl/src/main/java/org/opendaylight/controller/config/yang/config/netconf/northbound/impl/NetconfServerMonitoringModule.java
netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/NetconfServerSession.java
netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/NetconfServerSessionListener.java
netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfCapabilityMonitoringService.java [new file with mode: 0644]
netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfMonitoringServiceImpl.java
netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringService.java [new file with mode: 0644]
netconf/netconf-impl/src/main/yang/netconf-northbound-impl.yang
netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/ConcurrentClientsTest.java
netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfCapabilityMonitoringServiceTest.java [moved from netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfMonitoringServiceImplTest.java with 82% similarity]
netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringServiceTest.java [new file with mode: 0644]
netconf/netconf-mdsal-config/src/main/resources/initial/08-netconf-mdsal.xml
netconf/netconf-monitoring/src/test/java/org/opendaylight/netconf/monitoring/xml/JaxBSerializerTest.java
netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/DummyMonitoringService.java

index ac0ad75e53a982c79f2e4be8140d91c04a38bfd2..8b84a07a27e0678daf29f0680f8d329af0668657 100644 (file)
@@ -51,6 +51,7 @@
     <feature version='${project.version}'>odl-netconf-mapping-api</feature>
     <feature version='${project.version}'>odl-netconf-util</feature>
     <feature version='${project.version}'>odl-netconf-netty-util</feature>
+    <feature version='${config.version}'>odl-config-netty</feature>
     <!-- Netconf server without config connector is just an empty shell -->
     <feature version='${project.version}'>odl-config-netconf-connector</feature>
     <!-- Netconf will not provide schemas without monitoring -->
@@ -67,6 +68,7 @@
     <feature version='${project.version}'>odl-netconf-netty-util</feature>
     <bundle>mvn:org.opendaylight.netconf/netconf-impl/{{VERSION}}</bundle>
     <feature version='${project.version}'>odl-netconf-notifications-api</feature>
+    <feature version='${config.version}'>odl-config-netty</feature>
     <bundle>mvn:org.opendaylight.netconf/netconf-notifications-impl/{{VERSION}}</bundle>
     <bundle>mvn:org.opendaylight.netconf/config-netconf-connector/{{VERSION}}</bundle>
   </feature>
index 11fac96f92ecc9f58943da97ab9f7503b0633dc3..7c434f2813eae2916abd909a6f4d7882c440a6a8 100644 (file)
@@ -11,6 +11,8 @@ package org.opendaylight.controller.config.yang.netconf.mdsal.monitoring;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import java.util.Collection;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
@@ -23,12 +25,15 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.mon
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Schemas;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Sessions;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
-import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class MonitoringToMdsalWriter implements AutoCloseable, NetconfMonitoringService.MonitoringListener, BindingAwareProvider {
+/**
+ * Writes netconf server state changes received from NetconfMonitoringService to netconf-state datastore subtree.
+ */
+final class MonitoringToMdsalWriter implements AutoCloseable, NetconfMonitoringService.CapabilitiesListener,
+        NetconfMonitoringService.SessionsListener, BindingAwareProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(MonitoringToMdsalWriter.class);
 
@@ -48,43 +53,49 @@ final class MonitoringToMdsalWriter implements AutoCloseable, NetconfMonitoringS
 
     @Override
     public void close() {
-        deleteFromDatastore(InstanceIdentifier.create(NetconfState.class));
+        runTransaction((tx) -> tx.delete(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(NetconfState.class)));
     }
 
     @Override
     public void onSessionStarted(Session session) {
         final InstanceIdentifier<Session> sessionPath =
                 SESSIONS_INSTANCE_IDENTIFIER.child(Session.class, session.getKey());
-        putToDatastore(sessionPath, session);
+        runTransaction((tx) -> tx.put(LogicalDatastoreType.OPERATIONAL, sessionPath, session));
     }
 
     @Override
     public void onSessionEnded(Session session) {
         final InstanceIdentifier<Session> sessionPath =
                 SESSIONS_INSTANCE_IDENTIFIER.child(Session.class, session.getKey());
-        deleteFromDatastore(sessionPath);
+        runTransaction((tx) -> tx.delete(LogicalDatastoreType.OPERATIONAL, sessionPath));
+    }
+
+    @Override
+    public void onSessionsUpdated(Collection<Session> sessions) {
+        runTransaction((tx) -> updateSessions(tx, sessions));
     }
 
     @Override
     public void onCapabilitiesChanged(Capabilities capabilities) {
-        putToDatastore(CAPABILITIES_INSTANCE_IDENTIFIER, capabilities);
+        runTransaction((tx) -> tx.put(LogicalDatastoreType.OPERATIONAL, CAPABILITIES_INSTANCE_IDENTIFIER, capabilities));
     }
 
     @Override
     public void onSchemasChanged(Schemas schemas) {
-        putToDatastore(SCHEMAS_INSTANCE_IDENTIFIER, schemas);
+        runTransaction((tx) -> tx.put(LogicalDatastoreType.OPERATIONAL, SCHEMAS_INSTANCE_IDENTIFIER, schemas));
     }
 
     @Override
     public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
         dataBroker = providerContext.getSALService(DataBroker.class);
-        serverMonitoringDependency.registerListener(this);
+        serverMonitoringDependency.registerCapabilitiesListener(this);
+        serverMonitoringDependency.registerSessionsListener(this);
     }
 
-    private <T extends DataObject> void putToDatastore(InstanceIdentifier<T> path, T value) {
+    private void runTransaction(Consumer<WriteTransaction> txUser) {
         Preconditions.checkState(dataBroker != null);
         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
-        tx.put(LogicalDatastoreType.OPERATIONAL, path, value);
+        txUser.accept(tx);
         Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
             @Override
             public void onSuccess(@Nullable Void result) {
@@ -98,20 +109,11 @@ final class MonitoringToMdsalWriter implements AutoCloseable, NetconfMonitoringS
         });
     }
 
-    private <T extends DataObject> void deleteFromDatastore(InstanceIdentifier<T> path) {
-        Preconditions.checkState(dataBroker != null);
-        final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
-        tx.delete(LogicalDatastoreType.OPERATIONAL, path);
-        Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(@Nullable Void result) {
-                LOG.debug("Netconf state updated successfully");
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOG.warn("Unable to update netconf state", t);
-            }
-        });
+    private static void updateSessions(WriteTransaction tx, Collection<Session> sessions) {
+        for (Session session : sessions) {
+            final InstanceIdentifier<Session> sessionPath =
+                    SESSIONS_INSTANCE_IDENTIFIER.child(Session.class, session.getKey());
+            tx.put(LogicalDatastoreType.OPERATIONAL, sessionPath, session);
+        }
     }
 }
index 2a38a1ffc7fd3dd695fb4eb24023662892ba1288..ce52b2508e9e1b68357d630e1010ee714989e857 100644 (file)
@@ -16,6 +16,9 @@ import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.verify;
 
 import com.google.common.util.concurrent.Futures;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
@@ -55,7 +58,8 @@ public class MonitoringToMdsalWriterTest {
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
 
-        doReturn(null).when(monitoring).registerListener(any());
+        doReturn(null).when(monitoring).registerCapabilitiesListener(any());
+        doReturn(null).when(monitoring).registerSessionsListener(any());
 
         doReturn(dataBroker).when(context).getSALService(DataBroker.class);
 
@@ -125,9 +129,30 @@ public class MonitoringToMdsalWriterTest {
         inOrder.verify(writeTransaction).submit();
     }
 
+    @Test
+    public void testOnSessionsUpdated() throws Exception {
+        Session session1 = new SessionBuilder()
+                .setSessionId(1L)
+                .build();
+        Session session2 = new SessionBuilder()
+                .setSessionId(2L)
+                .build();
+        List<Session> sessions = new ArrayList<>();
+        sessions.add(session1);
+        sessions.add(session2);
+        final InstanceIdentifier<Session> id1 = InstanceIdentifier.create(NetconfState.class).child(Sessions.class).child(Session.class, session1.getKey());
+        final InstanceIdentifier<Session> id2 = InstanceIdentifier.create(NetconfState.class).child(Sessions.class).child(Session.class, session2.getKey());
+        writer.onSessionInitiated(context);
+        writer.onSessionsUpdated(sessions);
+        InOrder inOrder = inOrder(writeTransaction);
+        inOrder.verify(writeTransaction).put(LogicalDatastoreType.OPERATIONAL, id1, session1);
+        inOrder.verify(writeTransaction).put(LogicalDatastoreType.OPERATIONAL, id2, session2);
+        inOrder.verify(writeTransaction).submit();
+    }
+
     @Test
     public void testOnSessionInitiated() throws Exception {
         writer.onSessionInitiated(context);
-        verify(monitoring).registerListener(writer);
+        verify(monitoring).registerCapabilitiesListener(writer);
     }
 }
\ No newline at end of file
index 06b3e05e5691c3043d262d306a2ce9d00c80c892..54f67d39789233959e2b481723788bd2dc9699d3 100644 (file)
@@ -46,7 +46,7 @@ public class SessionNotificationProducer extends OperationalDatastoreListener<Se
             switch (modificationType) {
                 case WRITE:
                     final Session created = rootNode.getDataAfter();
-                    if (created != null) {
+                    if (created != null && rootNode.getDataBefore() == null) {
                         publishStartedSession(created);
                     }
                     break;
index 5f8e5ea9a940ab7dba531391baa3dbcb0c8a108b..e0ddae7feee1d97b9288fafc431915987aa0db4e 100644 (file)
@@ -11,6 +11,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 import java.util.Collections;
@@ -28,6 +29,7 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.mon
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.SessionBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionEnd;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionStart;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.ZeroBasedCounter32;
 
 public class SessionNotificationProducerTest {
 
@@ -56,6 +58,22 @@ public class SessionNotificationProducerTest {
         Assert.assertEquals(session.getUsername(), value.getUsername());
     }
 
+    @Test
+    public void testOnDataChangedSessionUpdated() throws Exception {
+        final DataTreeModification<Session> treeChange = mock(DataTreeModification.class);
+        final DataObjectModification<Session> changeObject = mock(DataObjectModification.class);
+        final Session sessionBefore = createSessionWithInRpcCount(1, 0);
+        final Session sessionAfter = createSessionWithInRpcCount(1, 1);
+        doReturn(sessionBefore).when(changeObject).getDataBefore();
+        doReturn(sessionAfter).when(changeObject).getDataAfter();
+        doReturn(DataObjectModification.ModificationType.WRITE).when(changeObject).getModificationType();
+        doReturn(changeObject).when(treeChange).getRootNode();
+        publisher.onDataTreeChanged(Collections.singleton(treeChange));
+        //session didn't start, only stats changed. No notification should be produced
+        verify(registration, never()).onSessionStarted(any());
+        verify(registration, never()).onSessionEnded(any());
+    }
+
     @Test
     public void testOnDataChangedSessionDeleted() throws Exception {
         final Session session = createSession(1);
@@ -70,10 +88,15 @@ public class SessionNotificationProducerTest {
     }
 
     private Session createSession(long id) {
+        return createSessionWithInRpcCount(id, 0);
+    }
+
+    private Session createSessionWithInRpcCount(long id, long inRpc) {
         return new SessionBuilder()
                 .setSessionId(id)
                 .setSourceHost(new Host("0.0.0.0".toCharArray()))
                 .setUsername("user")
+                .setInRpcs(new ZeroBasedCounter32(inRpc))
                 .build();
     }
 
@@ -83,6 +106,7 @@ public class SessionNotificationProducerTest {
         final DataObjectModification<Session> changeObject = mock(DataObjectModification.class);
         switch (type) {
             case WRITE:
+                doReturn(null).when(changeObject).getDataBefore();
                 doReturn(session).when(changeObject).getDataAfter();
                 break;
             case DELETE:
index da186336cd7e63ee09f77ef5f13e24d605998fd3..e60b71eb1f2dcdd0a7476fc0e20744da55605def 100644 (file)
@@ -8,15 +8,22 @@
 package org.opendaylight.netconf.api.monitoring;
 
 import com.google.common.base.Optional;
+import java.util.Collection;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Capabilities;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Schemas;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Sessions;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
 
-public interface NetconfMonitoringService extends CapabilityListener, SessionListener {
+public interface NetconfMonitoringService {
 
     Sessions getSessions();
 
+    /**
+     * Returns session monitoring service session listener, which is used to notify monitoring service about state of session.
+     * @return session listener
+     */
+    SessionListener getSessionListener();
+
     Schemas getSchemas();
 
     String getSchemaForCapability(String moduleName, Optional<String> revision);
@@ -24,13 +31,35 @@ public interface NetconfMonitoringService extends CapabilityListener, SessionLis
     Capabilities getCapabilities();
 
     /**
-     * Allows push based state information transfer. After the listener is registered, current state is pushed to the listener.
+     * Allows push based capabilities information transfer. After the listener is registered, current state is pushed to the listener.
      * @param listener Monitoring listener
      * @return listener registration
      */
-    AutoCloseable registerListener(MonitoringListener listener);
+    AutoCloseable registerCapabilitiesListener(CapabilitiesListener listener);
+
+    /**
+     * Allows push based sessions information transfer.
+     * @param listener Monitoring listener
+     * @return listener registration
+     */
+    AutoCloseable registerSessionsListener(SessionsListener listener);
+
+    interface CapabilitiesListener {
+
+        /**
+         * Callback used to notify about a change in used capabilities
+         * @param capabilities resulting capabilities
+         */
+        void onCapabilitiesChanged(Capabilities capabilities);
+
+        /**
+         * Callback used to notify about a change in used schemas
+         * @param schemas resulting schemas
+         */
+        void onSchemasChanged(Schemas schemas);
+    }
 
-    interface MonitoringListener {
+    interface SessionsListener {
         /**
          * Callback used to notify about netconf session start
          * @param session started session
@@ -44,15 +73,12 @@ public interface NetconfMonitoringService extends CapabilityListener, SessionLis
         void onSessionEnded(Session session);
 
         /**
-         * Callback used to notify about a change in used capabilities
-         * @param capabilities actual capabilities
+         * Callback used to notify about activity in netconf session, like
+         * rpc or notification. It is triggered at regular time interval. Session parameter
+         * contains only sessions which state was changed.
+         * @param sessions updated sessions
          */
-        void onCapabilitiesChanged(Capabilities capabilities);
+        void onSessionsUpdated(Collection<Session> sessions);
 
-        /**
-         * Callback used to notify about a change in used schemas
-         * @param schemas actual schemas
-         */
-        void onSchemasChanged(Schemas schemas);
     }
 }
diff --git a/netconf/netconf-api/src/main/java/org/opendaylight/netconf/api/monitoring/SessionEvent.java b/netconf/netconf-api/src/main/java/org/opendaylight/netconf/api/monitoring/SessionEvent.java
new file mode 100644 (file)
index 0000000..bd30c22
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.api.monitoring;
+
+/**
+ * Class represents change in netconf session.
+ */
+public class SessionEvent {
+    private final NetconfManagementSession session;
+    private final Type type;
+
+    private SessionEvent(NetconfManagementSession session, Type type) {
+        this.session = session;
+        this.type = type;
+    }
+
+    /**
+     * Returns session, where event occurred
+     * @return session
+     */
+    public NetconfManagementSession getSession() {
+        return session;
+    }
+
+    /**
+     * Returns event type
+     * @return type
+     */
+    public Type getType() {
+        return type;
+    }
+
+    public static SessionEvent inRpcSuccess(NetconfManagementSession session) {
+        return new SessionEvent(session, Type.IN_RPC_SUCCESS);
+    }
+
+    public static SessionEvent inRpcFail(NetconfManagementSession session) {
+        return new SessionEvent(session, Type.IN_RPC_FAIL);
+    }
+
+    public static SessionEvent outRpcError(NetconfManagementSession session) {
+        return new SessionEvent(session, Type.OUT_RPC_ERROR);
+    }
+
+    public static SessionEvent notification(NetconfManagementSession session) {
+        return new SessionEvent(session, Type.NOTIFICATION);
+    }
+
+    /**
+     * Session event type
+     */
+    public enum Type {
+
+        /**
+         * Correct rpc message received
+         */
+        IN_RPC_SUCCESS,
+
+        /**
+         * Incorrect rpc message received
+         */
+        IN_RPC_FAIL,
+
+        /**
+         * rpc-reply messages sent that contained an rpc-error element.
+         */
+        OUT_RPC_ERROR,
+
+        /**
+         *  Notification message sent
+         */
+        NOTIFICATION
+    }
+}
index f568eaa867241877b0f4c7a2af5083d5fa56a87c..74df29cc789b0c977ad4eccd9b3fa910cf4df74e 100644 (file)
@@ -12,7 +12,23 @@ package org.opendaylight.netconf.api.monitoring;
  * Created by mmarsale on 13.2.2015.
  */
 public interface SessionListener {
+
+    /**
+     * Callback used to notify about netconf session start
+     * @param session started session
+     */
     void onSessionUp(NetconfManagementSession session);
 
+    /**
+     * Callback used to notify about netconf session end
+     * @param session ended session
+     */
     void onSessionDown(NetconfManagementSession session);
+
+    /**
+     * Callback used to notify about activity in netconf session, like
+     * rpc or notification
+     * @param event session event, contains session and type of event
+     */
+    void onSessionEvent(SessionEvent event);
 }
index a98c76cb86763598c4e3820c260c6d8a04d755e6..2f18c3e9d17323866b5cb37cc17ea562a9e9c9f3 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>protocol-framework</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>threadpool-config-api</artifactId>
+    </dependency>
     <!-- test dependencies -->
     <dependency>
       <groupId>org.opendaylight.yangtools</groupId>
index 5a7aa06ad72148600a024fc0e211545f1f498c1e..02521e81258ddec99fd1c317ee265aec0a2287c5 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.config.yang.config.netconf.northbound.impl;
 
+import com.google.common.base.Optional;
 import org.opendaylight.netconf.impl.osgi.NetconfMonitoringServiceImpl;
 
 public class NetconfServerMonitoringModule extends org.opendaylight.controller.config.yang.config.netconf.northbound.impl.AbstractNetconfServerMonitoringModule {
@@ -26,7 +27,9 @@ public class NetconfServerMonitoringModule extends org.opendaylight.controller.c
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-        return new NetconfMonitoringServiceImpl(getAggregatorDependency());
+        return new NetconfMonitoringServiceImpl(getAggregatorDependency(),
+                Optional.fromNullable(getScheduledThreadpoolDependency()),
+                getMonitoringUpdateInterval());
     }
 
 }
index b6c88a60b255418dddde5fdcfbc6454ff104408c..f14a8f0ae021e4316d2916984ac9d46499b0e3ef 100644 (file)
@@ -29,6 +29,7 @@ import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
 import org.opendaylight.netconf.nettyutil.AbstractNetconfSession;
 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder;
 import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
+import org.opendaylight.netconf.notifications.NetconfNotification;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
@@ -52,15 +53,17 @@ public final class NetconfServerSession extends AbstractNetconfSession<NetconfSe
     private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
 
     private final NetconfHelloMessageAdditionalHeader header;
+    private final NetconfServerSessionListener sessionListener;
 
     private ZonedDateTime loginTime;
-    private long inRpcSuccess, inRpcFail, outRpcError;
+    private long inRpcSuccess, inRpcFail, outRpcError, outNotification;
     private volatile boolean delayedClose;
 
     public NetconfServerSession(final NetconfServerSessionListener sessionListener, final Channel channel, final long sessionId,
             final NetconfHelloMessageAdditionalHeader header) {
         super(sessionListener, channel, sessionId);
         this.header = header;
+        this.sessionListener = sessionListener;
         LOG.debug("Session {} created", toString());
     }
 
@@ -82,6 +85,10 @@ public final class NetconfServerSession extends AbstractNetconfSession<NetconfSe
     @Override
     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
         final ChannelFuture channelFuture = super.sendMessage(netconfMessage);
+        if (netconfMessage instanceof NetconfNotification) {
+            outNotification++;
+            sessionListener.onNotification(this, (NetconfNotification) netconfMessage);
+        }
         // delayed close was set, close after the message was sent
         if(delayedClose) {
             channelFuture.addListener(new ChannelFutureListener() {
@@ -137,7 +144,7 @@ public final class NetconfServerSession extends AbstractNetconfSession<NetconfSe
         builder.setUsername(header.getUserName());
         builder.setTransport(getTransportForString(header.getTransport()));
 
-        builder.setOutNotifications(new ZeroBasedCounter32(0L));
+        builder.setOutNotifications(new ZeroBasedCounter32(outNotification));
 
         builder.setKey(new SessionKey(getSessionId()));
 
index f3a26ce40b97494d71a00cf10b72c6f3f0069d62..4117c3189f29e8cf13a1913238e54ec7f9995518 100644 (file)
@@ -13,14 +13,17 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import org.opendaylight.controller.config.util.xml.DocumentedException;
 import org.opendaylight.controller.config.util.xml.XmlUtil;
-import org.opendaylight.netconf.util.messages.SendErrorExceptionUtil;
 import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.NetconfSessionListener;
 import org.opendaylight.netconf.api.NetconfTerminationReason;
 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionEvent;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
 import org.opendaylight.netconf.impl.osgi.NetconfOperationRouter;
 import org.opendaylight.netconf.util.messages.SubtreeFilter;
+import org.opendaylight.netconf.notifications.NetconfNotification;
+import org.opendaylight.netconf.util.messages.SendErrorExceptionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
@@ -30,22 +33,20 @@ import org.w3c.dom.Node;
 public class NetconfServerSessionListener implements NetconfSessionListener<NetconfServerSession> {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfServerSessionListener.class);
-    private final NetconfMonitoringService monitoringService;
+    private final SessionListener monitoringSessionListener;
     private final NetconfOperationRouter operationRouter;
     private final AutoCloseable onSessionDownCloseable;
 
     public NetconfServerSessionListener(final NetconfOperationRouter operationRouter, final NetconfMonitoringService monitoringService,
                                         final AutoCloseable onSessionDownCloseable) {
         this.operationRouter = operationRouter;
-        this.monitoringService = monitoringService;
+        this.monitoringSessionListener = monitoringService.getSessionListener();
         this.onSessionDownCloseable = onSessionDownCloseable;
     }
 
     @Override
     public void onSessionUp(final NetconfServerSession netconfNetconfServerSession) {
-        monitoringService.onSessionUp(netconfNetconfServerSession);
-        // FIXME monitoring service should be also notified about all the other changes to netconf session (from ietf-netconf-monitoring point of view)
-        // This means also notifying after every message is processed
+        monitoringSessionListener.onSessionUp(netconfNetconfServerSession);
     }
 
     @Override
@@ -55,7 +56,7 @@ public class NetconfServerSessionListener implements NetconfSessionListener<Netc
     }
 
     public void onDown(final NetconfServerSession netconfNetconfServerSession) {
-        monitoringService.onSessionDown(netconfNetconfServerSession);
+        monitoringSessionListener.onSessionDown(netconfNetconfServerSession);
 
         try {
             operationRouter.close();
@@ -87,19 +88,27 @@ public class NetconfServerSessionListener implements NetconfSessionListener<Netc
                     session);
             LOG.debug("Responding with message {}", message);
             session.sendMessage(message);
+            monitoringSessionListener.onSessionEvent(SessionEvent.inRpcSuccess(session));
         } catch (final RuntimeException e) {
             // TODO: should send generic error or close session?
             LOG.error("Unexpected exception", e);
             session.onIncommingRpcFail();
+            monitoringSessionListener.onSessionEvent(SessionEvent.inRpcFail(session));
             throw new IllegalStateException("Unable to process incoming message " + netconfMessage, e);
         } catch (DocumentedException e) {
             LOG.trace("Error occurred while processing message",e);
             session.onOutgoingRpcError();
             session.onIncommingRpcFail();
+            monitoringSessionListener.onSessionEvent(SessionEvent.inRpcFail(session));
+            monitoringSessionListener.onSessionEvent(SessionEvent.outRpcError(session));
             SendErrorExceptionUtil.sendErrorMessage(session, e, netconfMessage);
         }
     }
 
+    public void onNotification(final NetconfServerSession session, final NetconfNotification notification) {
+        monitoringSessionListener.onSessionEvent(SessionEvent.notification(session));
+    }
+
     private NetconfMessage processDocument(final NetconfMessage netconfMessage, final NetconfServerSession session)
             throws DocumentedException {
 
diff --git a/netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfCapabilityMonitoringService.java b/netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfCapabilityMonitoringService.java
new file mode 100644 (file)
index 0000000..afc1da6
--- /dev/null
@@ -0,0 +1,267 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.impl.osgi;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.config.util.capability.BasicCapability;
+import org.opendaylight.controller.config.util.capability.Capability;
+import org.opendaylight.netconf.api.monitoring.CapabilityListener;
+import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
+import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
+import org.opendaylight.netconf.notifications.BaseNotificationPublisherRegistration;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.Yang;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Capabilities;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Schemas;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SchemasBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.Schema;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.SchemaBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.SchemaKey;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.ChangedByBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.changed.by.server.or.user.ServerBuilder;
+
+class NetconfCapabilityMonitoringService implements CapabilityListener, AutoCloseable {
+
+    private static final Schema.Location NETCONF_LOCATION = new Schema.Location(Schema.Location.Enumeration.NETCONF);
+    private static final List<Schema.Location> NETCONF_LOCATIONS = ImmutableList.of(NETCONF_LOCATION);
+    private static final BasicCapability CANDIDATE_CAPABILITY = new BasicCapability("urn:ietf:params:netconf:capability:candidate:1.0");
+    private static final Function<Capability, Uri> CAPABILITY_TO_URI = new Function<Capability, Uri>() {
+        @Override
+        public Uri apply(final Capability input) {
+            return new Uri(input.getCapabilityUri());
+        }
+    };
+
+    private final Set<NetconfManagementSession> sessions = new HashSet<>();
+    private final NetconfOperationServiceFactory netconfOperationProvider;
+    private final Map<Uri, Capability> capabilities = new HashMap<>();
+    private final Map<String, Map<String, String>> mappedModulesToRevisionToSchema = Maps.newHashMap();
+
+
+    private final Set<NetconfMonitoringService.CapabilitiesListener> listeners = Sets.newHashSet();
+    private volatile BaseNotificationPublisherRegistration notificationPublisher;
+
+    NetconfCapabilityMonitoringService(final NetconfOperationServiceFactory netconfOperationProvider) {
+        this.netconfOperationProvider = netconfOperationProvider;
+        netconfOperationProvider.registerCapabilityListener(this);
+    }
+
+    synchronized Schemas getSchemas() {
+        try {
+            return transformSchemas(netconfOperationProvider.getCapabilities());
+        } catch (final RuntimeException e) {
+            throw e;
+        } catch (final Exception e) {
+            throw new IllegalStateException("Exception while closing", e);
+        }
+    }
+
+    synchronized String getSchemaForModuleRevision(final String moduleName, final Optional<String> revision) {
+
+        Map<String, String> revisionMapRequest = mappedModulesToRevisionToSchema.get(moduleName);
+        Preconditions.checkState(revisionMapRequest != null, "Capability for module %s not present, " + ""
+                + "available modules : %s", moduleName, Collections2.transform(capabilities.values(), CAPABILITY_TO_URI));
+
+        if (revision.isPresent()) {
+            String schema = revisionMapRequest.get(revision.get());
+
+            Preconditions.checkState(schema != null,
+                    "Capability for module %s:%s not present, available revisions for module: %s", moduleName,
+                    revision.get(), revisionMapRequest.keySet());
+
+            return schema;
+        } else {
+            Preconditions.checkState(revisionMapRequest.size() == 1,
+                    "Expected 1 capability for module %s, available revisions : %s", moduleName,
+                    revisionMapRequest.keySet());
+            //Only one revision is present, so return it
+            return revisionMapRequest.values().iterator().next();
+        }
+    }
+
+    private void updateCapabilityToSchemaMap(final Set<Capability> added, final Set<Capability> removed) {
+        for (final Capability cap : added) {
+            if (!isValidModuleCapability(cap)){
+                continue;
+            }
+
+            final String currentModuleName = cap.getModuleName().get();
+            Map<String, String> revisionMap = mappedModulesToRevisionToSchema.get(currentModuleName);
+            if (revisionMap == null) {
+                revisionMap = Maps.newHashMap();
+                mappedModulesToRevisionToSchema.put(currentModuleName, revisionMap);
+            }
+
+            final String currentRevision = cap.getRevision().get();
+            revisionMap.put(currentRevision, cap.getCapabilitySchema().get());
+        }
+        for (final Capability cap : removed) {
+            if (!isValidModuleCapability(cap)){
+                continue;
+            }
+            final Map<String, String> revisionMap = mappedModulesToRevisionToSchema.get(cap.getModuleName().get());
+            if (revisionMap != null) {
+                revisionMap.remove(cap.getRevision().get());
+                if (revisionMap.isEmpty()) {
+                    mappedModulesToRevisionToSchema.remove(cap.getModuleName().get());
+                }
+            }
+        }
+    }
+
+    private static boolean isValidModuleCapability(Capability cap) {
+        return cap.getModuleName().isPresent()
+                && cap.getRevision().isPresent()
+                && cap.getCapabilitySchema().isPresent();
+    }
+
+
+    synchronized Capabilities getCapabilities() {
+        return new CapabilitiesBuilder().setCapability(Lists.newArrayList(capabilities.keySet())).build();
+    }
+
+    synchronized AutoCloseable registerListener(final NetconfMonitoringService.CapabilitiesListener listener) {
+        listeners.add(listener);
+        listener.onCapabilitiesChanged(getCapabilities());
+        listener.onSchemasChanged(getSchemas());
+        return new AutoCloseable() {
+            @Override
+            public void close() throws Exception {
+                synchronized (NetconfCapabilityMonitoringService.this) {
+                    listeners.remove(listener);
+                }
+            }
+        };
+    }
+
+    private static Schemas transformSchemas(final Set<Capability> caps) {
+        final List<Schema> schemas = new ArrayList<>(caps.size());
+        for (final Capability cap : caps) {
+            if (cap.getCapabilitySchema().isPresent()) {
+                final SchemaBuilder builder = new SchemaBuilder();
+
+                Preconditions.checkState(isValidModuleCapability(cap));
+
+                builder.setNamespace(new Uri(cap.getModuleNamespace().get()));
+
+                final String version = cap.getRevision().get();
+                builder.setVersion(version);
+
+                final String identifier = cap.getModuleName().get();
+                builder.setIdentifier(identifier);
+
+                builder.setFormat(Yang.class);
+
+                builder.setLocation(transformLocations(cap.getLocation()));
+
+                builder.setKey(new SchemaKey(Yang.class, identifier, version));
+
+                schemas.add(builder.build());
+            }
+        }
+
+        return new SchemasBuilder().setSchema(schemas).build();
+    }
+
+    private static List<Schema.Location> transformLocations(final Collection<String> locations) {
+        if (locations.isEmpty()) {
+            return NETCONF_LOCATIONS;
+        }
+
+        final Builder<Schema.Location> b = ImmutableList.builder();
+        b.add(NETCONF_LOCATION);
+
+        for (final String location : locations) {
+            b.add(new Schema.Location(new Uri(location)));
+        }
+
+        return b.build();
+    }
+
+    private static Set<Capability> setupCapabilities(final Set<Capability> caps) {
+        Set<Capability> capabilities = new HashSet<>(caps);
+        capabilities.add(CANDIDATE_CAPABILITY);
+        // TODO rollback on error not supported EditConfigXmlParser:100
+        // [RFC6241] 8.5.  Rollback-on-Error Capability
+        // capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:rollback-on-error:1.0"));
+        return capabilities;
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        listeners.clear();
+        sessions.clear();
+        capabilities.clear();
+    }
+
+    @Override
+    public synchronized void onCapabilitiesChanged(Set<Capability> added, Set<Capability> removed) {
+        onCapabilitiesAdded(added);
+        onCapabilitiesRemoved(removed);
+        updateCapabilityToSchemaMap(added, removed);
+        notifyCapabilityChanged(getCapabilities());
+
+        // publish notification to notification collector about changed capabilities
+        if (notificationPublisher != null) {
+            notificationPublisher.onCapabilityChanged(computeDiff(added, removed));
+        }
+    }
+
+    private void notifyCapabilityChanged(Capabilities capabilities) {
+        for (NetconfMonitoringService.CapabilitiesListener listener : listeners) {
+            listener.onCapabilitiesChanged(capabilities);
+            listener.onSchemasChanged(getSchemas());
+        }
+    }
+
+
+    private static NetconfCapabilityChange computeDiff(final Set<Capability> added, final Set<Capability> removed) {
+        final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
+        netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build());
+        netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, CAPABILITY_TO_URI)));
+        netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, CAPABILITY_TO_URI)));
+        // TODO modified should be computed ... but why ?
+        netconfCapabilityChangeBuilder.setModifiedCapability(Collections.<Uri>emptyList());
+        return netconfCapabilityChangeBuilder.build();
+    }
+
+
+    private void onCapabilitiesAdded(final Set<Capability> addedCaps) {
+        this.capabilities.putAll(Maps.uniqueIndex(setupCapabilities(addedCaps), CAPABILITY_TO_URI));
+    }
+
+    private void onCapabilitiesRemoved(final Set<Capability> removedCaps) {
+        for (final Capability addedCap : removedCaps) {
+            capabilities.remove(CAPABILITY_TO_URI.apply(addedCap));
+        }
+    }
+
+    void setNotificationPublisher(final BaseNotificationPublisherRegistration notificationPublisher) {
+        this.notificationPublisher = notificationPublisher;
+    }
+}
index 72850737a2b3fefe966c61d287a69513c98b20dd..9c8905afffc451e1b809c8f2d5ec7258d8812414 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  */
 package org.opendaylight.netconf.impl.osgi;
 
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.config.util.capability.BasicCapability;
-import org.opendaylight.controller.config.util.capability.Capability;
-import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
 import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
 import org.opendaylight.netconf.notifications.BaseNotificationPublisherRegistration;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.Yang;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Capabilities;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Schemas;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SchemasBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Sessions;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SessionsBuilder;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.Schema;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.SchemaBuilder;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.SchemaKey;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.ChangedByBuilder;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.changed.by.server.or.user.ServerBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class NetconfMonitoringServiceImpl implements NetconfMonitoringService, AutoCloseable {
 
-    private static final Schema.Location NETCONF_LOCATION = new Schema.Location(Schema.Location.Enumeration.NETCONF);
-    private static final List<Schema.Location> NETCONF_LOCATIONS = ImmutableList.of(NETCONF_LOCATION);
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfMonitoringServiceImpl.class);
-    private static final Function<NetconfManagementSession, Session> SESSION_FUNCTION = new Function<NetconfManagementSession, Session>() {
-        @Override
-        public Session apply(@Nonnull final NetconfManagementSession input) {
-            return input.toManagementSession();
-        }
-    };
-    private static final Function<Capability, Uri> CAPABILITY_TO_URI = new Function<Capability, Uri>() {
-        @Override
-        public Uri apply(final Capability input) {
-            return new Uri(input.getCapabilityUri());
-        }
-    };
+    private final NetconfCapabilityMonitoringService capabilityMonitoring;
+    private final NetconfSessionMonitoringService sessionMonitoring;
 
-    private final Set<NetconfManagementSession> sessions = new HashSet<>();
-    private final NetconfOperationServiceFactory netconfOperationProvider;
-    private final Map<Uri, Capability> capabilities = new HashMap<>();
-    private final Map<String, Map<String, String>> mappedModulesToRevisionToSchema = Maps.newHashMap();
-
-    private final Set<MonitoringListener> listeners = Sets.newHashSet();
-    private volatile BaseNotificationPublisherRegistration notificationPublisher;
-
-    public NetconfMonitoringServiceImpl(final NetconfOperationServiceFactory netconfOperationProvider) {
-        this.netconfOperationProvider = netconfOperationProvider;
-        netconfOperationProvider.registerCapabilityListener(this);
+    public NetconfMonitoringServiceImpl(NetconfOperationServiceFactory opProvider) {
+        this(opProvider, Optional.absent(), 0);
     }
 
-    @Override
-    public synchronized void onSessionUp(final NetconfManagementSession session) {
-        LOG.debug("Session {} up", session);
-        Preconditions.checkState(!sessions.contains(session), "Session %s was already added", session);
-        sessions.add(session);
-        notifySessionUp(session);
-    }
+    public NetconfMonitoringServiceImpl(NetconfOperationServiceFactory opProvider,
+                                        Optional<ScheduledThreadPool> threadPool,
+                                        long updateInterval) {
+        this.capabilityMonitoring = new NetconfCapabilityMonitoringService(opProvider);
+        this.sessionMonitoring = new NetconfSessionMonitoringService(threadPool, updateInterval);
 
-    @Override
-    public synchronized void onSessionDown(final NetconfManagementSession session) {
-        LOG.debug("Session {} down", session);
-        Preconditions.checkState(sessions.contains(session), "Session %s not present", session);
-        sessions.remove(session);
-        notifySessionDown(session);
     }
 
     @Override
-    public synchronized Sessions getSessions() {
-        return new SessionsBuilder().setSession(ImmutableList.copyOf(Collections2.transform(sessions, SESSION_FUNCTION))).build();
+    public Sessions getSessions() {
+        return sessionMonitoring.getSessions();
     }
 
     @Override
-    public synchronized Schemas getSchemas() {
-        try {
-            return transformSchemas(netconfOperationProvider.getCapabilities());
-        } catch (final RuntimeException e) {
-            throw e;
-        } catch (final Exception e) {
-            throw new IllegalStateException("Exception while closing", e);
-        }
+    public SessionListener getSessionListener() {
+        return sessionMonitoring;
     }
 
     @Override
-    public synchronized String getSchemaForCapability(final String moduleName, final Optional<String> revision) {
-
-        Map<String, String> revisionMapRequest = mappedModulesToRevisionToSchema.get(moduleName);
-        Preconditions.checkState(revisionMapRequest != null, "Capability for module %s not present, " + ""
-                + "available modules : %s", moduleName, Collections2.transform(capabilities.values(), CAPABILITY_TO_URI));
-
-        if (revision.isPresent()) {
-            String schema = revisionMapRequest.get(revision.get());
-
-            Preconditions.checkState(schema != null,
-                    "Capability for module %s:%s not present, available revisions for module: %s", moduleName,
-                    revision.get(), revisionMapRequest.keySet());
-
-            return schema;
-        } else {
-            Preconditions.checkState(revisionMapRequest.size() == 1,
-                    "Expected 1 capability for module %s, available revisions : %s", moduleName,
-                    revisionMapRequest.keySet());
-            return revisionMapRequest.values().iterator().next();
-        }
-    }
-
-    private synchronized void updateCapabilityToSchemaMap(final Set<Capability> added, final Set<Capability> removed) {
-        for (final Capability cap : added) {
-            if (!isValidModuleCapability(cap)){
-                continue;
-            }
-
-            final String currentModuleName = cap.getModuleName().get();
-            Map<String, String> revisionMap = mappedModulesToRevisionToSchema.get(currentModuleName);
-            if (revisionMap == null) {
-                revisionMap = Maps.newHashMap();
-                mappedModulesToRevisionToSchema.put(currentModuleName, revisionMap);
-            }
-
-            final String currentRevision = cap.getRevision().get();
-            revisionMap.put(currentRevision, cap.getCapabilitySchema().get());
-        }
-        for (final Capability cap : removed) {
-            if (!isValidModuleCapability(cap)){
-                continue;
-            }
-            final Map<String, String> revisionMap = mappedModulesToRevisionToSchema.get(cap.getModuleName().get());
-            if (revisionMap != null) {
-                revisionMap.remove(cap.getRevision().get());
-                if (revisionMap.isEmpty()) {
-                    mappedModulesToRevisionToSchema.remove(cap.getModuleName().get());
-                }
-            }
-        }
-    }
-
-    private boolean isValidModuleCapability(Capability cap) {
-        return cap.getModuleName().isPresent()
-                && cap.getRevision().isPresent()
-                && cap.getCapabilitySchema().isPresent();
+    public Schemas getSchemas() {
+        return capabilityMonitoring.getSchemas();
     }
 
     @Override
-    public synchronized Capabilities getCapabilities() {
-        return new CapabilitiesBuilder().setCapability(Lists.newArrayList(capabilities.keySet())).build();
+    public String getSchemaForCapability(String moduleName, Optional<String> revision) {
+        return capabilityMonitoring.getSchemaForModuleRevision(moduleName, revision);
     }
 
     @Override
-    public synchronized AutoCloseable registerListener(final MonitoringListener listener) {
-        listeners.add(listener);
-        listener.onCapabilitiesChanged(getCapabilities());
-        listener.onSchemasChanged(getSchemas());
-        return new AutoCloseable() {
-            @Override
-            public void close() throws Exception {
-                listeners.remove(listener);
-            }
-        };
-    }
-
-    private static Schemas transformSchemas(final Set<Capability> caps) {
-        final List<Schema> schemas = new ArrayList<>(caps.size());
-        for (final Capability cap : caps) {
-            if (cap.getCapabilitySchema().isPresent()) {
-                final SchemaBuilder builder = new SchemaBuilder();
-                Preconditions.checkState(cap.getModuleNamespace().isPresent());
-                builder.setNamespace(new Uri(cap.getModuleNamespace().get()));
-
-                Preconditions.checkState(cap.getRevision().isPresent());
-                final String version = cap.getRevision().get();
-                builder.setVersion(version);
-
-                Preconditions.checkState(cap.getModuleName().isPresent());
-                final String identifier = cap.getModuleName().get();
-                builder.setIdentifier(identifier);
-
-                builder.setFormat(Yang.class);
-
-                builder.setLocation(transformLocations(cap.getLocation()));
-
-                builder.setKey(new SchemaKey(Yang.class, identifier, version));
-
-                schemas.add(builder.build());
-            }
-        }
-
-        return new SchemasBuilder().setSchema(schemas).build();
-    }
-
-    private static List<Schema.Location> transformLocations(final Collection<String> locations) {
-        if (locations.isEmpty()) {
-            return NETCONF_LOCATIONS;
-        }
-
-        final Builder<Schema.Location> b = ImmutableList.builder();
-        b.add(NETCONF_LOCATION);
-
-        for (final String location : locations) {
-            b.add(new Schema.Location(new Uri(location)));
-        }
-
-        return b.build();
-    }
-
-    public static Set<Capability> setupCapabilities(final Set<Capability> caps) {
-        Set<Capability> capabilities = new HashSet<>(caps);
-        capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:candidate:1.0"));
-        // TODO rollback on error not supported EditConfigXmlParser:100
-        // [RFC6241] 8.5.  Rollback-on-Error Capability
-        // capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:rollback-on-error:1.0"));
-        return capabilities;
+    public Capabilities getCapabilities() {
+        return capabilityMonitoring.getCapabilities();
     }
 
     @Override
-    public synchronized void close() throws Exception {
-        listeners.clear();
-        sessions.clear();
-        capabilities.clear();
+    public AutoCloseable registerCapabilitiesListener(CapabilitiesListener listener) {
+        return capabilityMonitoring.registerListener(listener);
     }
 
     @Override
-    public void onCapabilitiesChanged(Set<Capability> added, Set<Capability> removed) {
-        onCapabilitiesAdded(added);
-        onCapabilitiesRemoved(removed);
-        updateCapabilityToSchemaMap(added, removed);
-        notifyCapabilityChanged(getCapabilities());
-
-        // publish notification to notification collector about changed capabilities
-        if (notificationPublisher != null) {
-            notificationPublisher.onCapabilityChanged(computeDiff(added, removed));
-        }
-    }
-
-    private void notifyCapabilityChanged(Capabilities capabilities) {
-        for (MonitoringListener listener : listeners) {
-            listener.onCapabilitiesChanged(capabilities);
-            listener.onSchemasChanged(getSchemas());
-        }
-    }
-
-    private void notifySessionUp(NetconfManagementSession managementSession) {
-        Session session = SESSION_FUNCTION.apply(managementSession);
-        for (MonitoringListener listener : listeners) {
-            listener.onSessionStarted(session);
-        }
+    public AutoCloseable registerSessionsListener(SessionsListener listener) {
+        return sessionMonitoring.registerListener(listener);
     }
 
-    private void notifySessionDown(NetconfManagementSession managementSession) {
-        Session session = SESSION_FUNCTION.apply(managementSession);
-        for (MonitoringListener listener : listeners) {
-            listener.onSessionEnded(session);
-        }
+    public void setNotificationPublisher(BaseNotificationPublisherRegistration notificationPublisher) {
+        this.capabilityMonitoring.setNotificationPublisher(notificationPublisher);
     }
 
-    static NetconfCapabilityChange computeDiff(final Set<Capability> added, final Set<Capability> removed) {
-        final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
-        netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build());
-        netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, CAPABILITY_TO_URI)));
-        netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, CAPABILITY_TO_URI)));
-        // TODO modified should be computed ... but why ?
-        netconfCapabilityChangeBuilder.setModifiedCapability(Collections.<Uri>emptyList());
-        return netconfCapabilityChangeBuilder.build();
-    }
-
-
-    private synchronized void onCapabilitiesAdded(final Set<Capability> addedCaps) {
-        this.capabilities.putAll(Maps.uniqueIndex(setupCapabilities(addedCaps), CAPABILITY_TO_URI));
-    }
-
-    private synchronized void onCapabilitiesRemoved(final Set<Capability> addedCaps) {
-        for (final Capability addedCap : addedCaps) {
-            capabilities.remove(CAPABILITY_TO_URI.apply(addedCap));
-        }
-    }
-
-    public void setNotificationPublisher(final BaseNotificationPublisherRegistration notificationPublisher) {
-        this.notificationPublisher = notificationPublisher;
+    @Override
+    public void close() throws Exception {
+        capabilityMonitoring.close();
+        sessionMonitoring.close();
     }
 }
diff --git a/netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringService.java b/netconf/netconf-impl/src/main/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringService.java
new file mode 100644 (file)
index 0000000..68883fd
--- /dev/null
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.impl.osgi;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
+import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionEvent;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Sessions;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SessionsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements {@link SessionListener} to receive updates about Netconf sessions. Instance notifies its listeners
+ * about session start and end. It also publishes on regular interval list of sessions,
+ * where events like rpc or notification happened.
+ */
+class NetconfSessionMonitoringService implements SessionListener, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfSessionMonitoringService.class);
+
+    private final Set<NetconfManagementSession> sessions = Sets.newHashSet();
+    private final Set<NetconfManagementSession> changedSessions = Sets.newHashSet();
+    private final Set<NetconfMonitoringService.SessionsListener> listeners = Sets.newHashSet();
+    private final ScheduledExecutorService executor;
+    private final long updateInterval;
+    private boolean running;
+
+    /**
+     * @param schedulingThreadPool thread pool for scheduling session stats updates. If not present, updates won't be scheduled.
+     * @param updateInterval update interval. If is less than 0, updates won't be scheduled
+     */
+    NetconfSessionMonitoringService(Optional<ScheduledThreadPool> schedulingThreadPool, long updateInterval) {
+        this.updateInterval = updateInterval;
+        if (schedulingThreadPool.isPresent() && updateInterval > 0) {
+            this.executor =  schedulingThreadPool.get().getExecutor();
+            LOG.info("/netconf-state/sessions will be updated every {} seconds.", updateInterval);
+        } else {
+            LOG.info("Scheduling thread pool is present = {}, update interval {}: /netconf-state/sessions won't be updated.",
+                    schedulingThreadPool.isPresent(), updateInterval);
+            this.executor = null;
+        }
+    }
+
+    synchronized Sessions getSessions() {
+        final Collection<Session> managementSessions = Collections2.transform(sessions, NetconfManagementSession::toManagementSession);
+        return new SessionsBuilder().setSession(ImmutableList.copyOf(managementSessions)).build();
+    }
+
+    @Override
+    public synchronized void onSessionUp(final NetconfManagementSession session) {
+        LOG.debug("Session {} up", session);
+        Preconditions.checkState(!sessions.contains(session), "Session %s was already added", session);
+        sessions.add(session);
+        notifySessionUp(session);
+    }
+
+    @Override
+    public synchronized void onSessionDown(final NetconfManagementSession session) {
+        LOG.debug("Session {} down", session);
+        Preconditions.checkState(sessions.contains(session), "Session %s not present", session);
+        sessions.remove(session);
+        changedSessions.remove(session);
+        notifySessionDown(session);
+    }
+
+    @Override
+    public synchronized void onSessionEvent(SessionEvent event) {
+        changedSessions.add(event.getSession());
+    }
+
+    synchronized AutoCloseable registerListener(final NetconfMonitoringService.SessionsListener listener) {
+        listeners.add(listener);
+        if (!running) {
+            startUpdateSessionStats();
+        }
+        return new AutoCloseable() {
+            @Override
+            public void close() throws Exception {
+                listeners.remove(listener);
+            }
+        };
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        stopUpdateSessionStats();
+        listeners.clear();
+        sessions.clear();
+    }
+
+    private synchronized void updateSessionStats() {
+        if (changedSessions.isEmpty()) {
+            return;
+        }
+        final List<Session> changed = changedSessions.stream()
+                .map(NetconfManagementSession::toManagementSession)
+                .collect(Collectors.toList());
+        final ImmutableList<Session> sessionImmutableList = ImmutableList.copyOf(changed);
+        for (NetconfMonitoringService.SessionsListener listener : listeners) {
+            listener.onSessionsUpdated(sessionImmutableList);
+        }
+        changedSessions.clear();
+    }
+
+    private void notifySessionUp(NetconfManagementSession managementSession) {
+        Session session = managementSession.toManagementSession();
+        for (NetconfMonitoringService.SessionsListener listener : listeners) {
+            listener.onSessionStarted(session);
+        }
+    }
+
+    private void notifySessionDown(NetconfManagementSession managementSession) {
+        Session session = managementSession.toManagementSession();
+        for (NetconfMonitoringService.SessionsListener listener : listeners) {
+            listener.onSessionEnded(session);
+        }
+    }
+
+    private void startUpdateSessionStats() {
+        if (executor != null) {
+            executor.scheduleAtFixedRate(this::updateSessionStats, 1, updateInterval, TimeUnit.SECONDS);
+            running = true;
+        }
+    }
+
+    private void stopUpdateSessionStats() {
+        if (executor != null) {
+            executor.shutdownNow();
+            running = false;
+        }
+    }
+}
index 7ad1fef55da1975e486012e690b8de0d929f3a15..aa6485a00ad0215488cb7cc798e70cb7680a2f57 100644 (file)
@@ -9,6 +9,7 @@ module netconf-northbound-impl {
     import netconf-northbound-mapper { prefix nnm; revision-date 2015-01-14; }
     import netconf-northbound { prefix nn; revision-date 2015-01-14; }
     import netty {prefix netty; }
+    import threadpool {prefix th;}
 
     description
         "This module contains the base YANG definitions for
@@ -106,6 +107,20 @@ module netconf-northbound-impl {
                     }
                 }
             }
+            container scheduled-threadpool {
+                uses config:service-ref {
+                    refine type {
+                        mandatory false;
+                        config:required-identity th:scheduled-threadpool;
+                    }
+                }
+                description "Dedicated to update netconf-state/sessions subtree on session change.";
+            }
+            leaf monitoring-update-interval {
+                description "Specifies interval in seconds after which session stats are updated. If zero, stats won't be updated.";
+                type uint32;
+                default 0;
+            }
 
         }
     }
index 35abee564046d5249f47cc3efd18090bc39d963b..420a8f5d59ec0a6eca3dd299f737cd22ac44af25 100644 (file)
@@ -12,7 +12,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anySetOf;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -52,6 +51,8 @@ import org.junit.runners.Parameterized;
 import org.opendaylight.controller.config.util.capability.Capability;
 import org.opendaylight.controller.config.util.xml.DocumentedException;
 import org.opendaylight.controller.config.util.xml.XmlUtil;
+import org.opendaylight.netconf.api.monitoring.SessionEvent;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
 import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
 import org.opendaylight.netconf.api.monitoring.CapabilityListener;
@@ -119,15 +120,17 @@ public class ConcurrentClientsTest {
 
     public static NetconfMonitoringService createMockedMonitoringService() {
         NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
-        doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
-        doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
+        final SessionListener sessionListener = mock(SessionListener.class);
+        doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class));
+        doNothing().when(sessionListener).onSessionDown(any(NetconfServerSession.class));
+        doNothing().when(sessionListener).onSessionEvent(any(SessionEvent.class));
         doReturn(new AutoCloseable() {
             @Override
             public void close() throws Exception {
 
             }
-        }).when(monitoring).registerListener(any(NetconfMonitoringService.MonitoringListener.class));
-        doNothing().when(monitoring).onCapabilitiesChanged(anySetOf(Capability.class), anySetOf(Capability.class));
+        }).when(monitoring).registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
+        doReturn(sessionListener).when(monitoring).getSessionListener();
         doReturn(new CapabilitiesBuilder().setCapability(Collections.<Uri>emptyList()).build()).when(monitoring).getCapabilities();
         return monitoring;
     }
similarity index 82%
rename from netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfMonitoringServiceImplTest.java
rename to netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfCapabilityMonitoringServiceTest.java
index 1359a12cd329cc6edd7b336fe9e14a69ff34c6fc..b69a2f2a03128c84071adb85c493437097d2cbfe 100644 (file)
@@ -47,7 +47,7 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.not
 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
 import org.opendaylight.yangtools.yang.model.api.Module;
 
-public class NetconfMonitoringServiceImplTest {
+public class NetconfCapabilityMonitoringServiceTest {
 
     private static final String TEST_MODULE_CONTENT = "content";
     private static final String TEST_MODULE_CONTENT2 = "content2";
@@ -78,11 +78,11 @@ public class NetconfMonitoringServiceImplTest {
     @Mock
     private NetconfManagementSession sessionMock;
     @Mock
-    private NetconfMonitoringService.MonitoringListener listener;
+    private NetconfMonitoringService.CapabilitiesListener listener;
     @Mock
     private BaseNotificationPublisherRegistration notificationPublisher;
 
-    private NetconfMonitoringServiceImpl monitoringService;
+    private NetconfCapabilityMonitoringService monitoringService;
 
     @BeforeClass
     public static void suiteSetUp() throws Exception {
@@ -111,19 +111,17 @@ public class NetconfMonitoringServiceImplTest {
         CAPABILITIES.add(new BasicCapability("urn:ietf:params:xml:ns:yang:ietf-inet-types?module=ietf-inet-types&amp;revision=2010-09-24"));
 
         doReturn(CAPABILITIES).when(operationServiceFactoryMock).getCapabilities();
-        doReturn(null).when(operationServiceFactoryMock).registerCapabilityListener(any(NetconfMonitoringServiceImpl.class));
+        doReturn(null).when(operationServiceFactoryMock).registerCapabilityListener(any(NetconfCapabilityMonitoringService.class));
 
         doReturn(SESSION).when(sessionMock).toManagementSession();
         doNothing().when(listener).onCapabilitiesChanged(any());
         doNothing().when(listener).onSchemasChanged(any());
-        doNothing().when(listener).onSessionStarted(any());
-        doNothing().when(listener).onSessionEnded(any());
 
         doNothing().when(notificationPublisher).onCapabilityChanged(any());
         doNothing().when(notificationPublisher).onSessionStarted(any());
         doNothing().when(notificationPublisher).onSessionEnded(any());
 
-        monitoringService = new NetconfMonitoringServiceImpl(operationServiceFactoryMock);
+        monitoringService = new NetconfCapabilityMonitoringService(operationServiceFactoryMock);
         monitoringService.onCapabilitiesChanged(CAPABILITIES, Collections.emptySet());
         monitoringService.setNotificationPublisher(notificationPublisher);
         monitoringService.registerListener(listener);
@@ -132,13 +130,9 @@ public class NetconfMonitoringServiceImplTest {
 
     @Test
     public void testListeners() throws Exception {
-        monitoringService.onSessionUp(sessionMock);
         HashSet<Capability> added = new HashSet<>();
         added.add(new BasicCapability("toAdd"));
         monitoringService.onCapabilitiesChanged(added, Collections.emptySet());
-        monitoringService.onSessionDown(sessionMock);
-        verify(listener).onSessionStarted(any());
-        verify(listener).onSessionEnded(any());
         //onCapabilitiesChanged and onSchemasChanged are invoked also after listener registration
         verify(listener, times(2)).onCapabilitiesChanged(any());
         verify(listener, times(2)).onSchemasChanged(any());
@@ -157,14 +151,14 @@ public class NetconfMonitoringServiceImplTest {
     public void testGetSchemaForCapability() throws Exception {
         //test multiple revisions of the same capability
         monitoringService.onCapabilitiesChanged(Collections.singleton(moduleCapability2), Collections.emptySet());
-        final String schema = monitoringService.getSchemaForCapability(TEST_MODULE_NAME, Optional.of(TEST_MODULE_REV));
+        final String schema = monitoringService.getSchemaForModuleRevision(TEST_MODULE_NAME, Optional.of(TEST_MODULE_REV));
         Assert.assertEquals(TEST_MODULE_CONTENT, schema);
-        final String schema2 = monitoringService.getSchemaForCapability(TEST_MODULE_NAME, Optional.of(TEST_MODULE_REV2));
+        final String schema2 = monitoringService.getSchemaForModuleRevision(TEST_MODULE_NAME, Optional.of(TEST_MODULE_REV2));
         Assert.assertEquals(TEST_MODULE_CONTENT2, schema2);
         //remove one revision
         monitoringService.onCapabilitiesChanged(Collections.emptySet(), Collections.singleton(moduleCapability1));
         //only one revision present
-        final String schema3 = monitoringService.getSchemaForCapability(TEST_MODULE_NAME, Optional.absent());
+        final String schema3 = monitoringService.getSchemaForModuleRevision(TEST_MODULE_NAME, Optional.absent());
         Assert.assertEquals(TEST_MODULE_CONTENT2, schema3);
     }
 
@@ -183,11 +177,8 @@ public class NetconfMonitoringServiceImplTest {
 
     @Test
     public void testClose() throws Exception {
-        monitoringService.onSessionUp(sessionMock);
-        Assert.assertFalse(monitoringService.getSessions().getSession().isEmpty());
         Assert.assertFalse(monitoringService.getCapabilities().getCapability().isEmpty());
         monitoringService.close();
-        Assert.assertTrue(monitoringService.getSessions().getSession().isEmpty());
         Assert.assertTrue(monitoringService.getCapabilities().getCapability().isEmpty());
     }
 
@@ -231,22 +222,4 @@ public class NetconfMonitoringServiceImplTest {
         Assert.assertEquals(Collections.emptySet(), new HashSet<>(afterRemove.getAddedCapability()));
     }
 
-    @Test
-    public void testOnSessionUpAndDown() throws Exception {
-        monitoringService.onSessionUp(sessionMock);
-        ArgumentCaptor<Session> sessionUpCaptor = ArgumentCaptor.forClass(Session.class);
-        verify(listener).onSessionStarted(sessionUpCaptor.capture());
-        final Session sesionUp = sessionUpCaptor.getValue();
-        Assert.assertEquals(SESSION.getSessionId(), sesionUp.getSessionId());
-        Assert.assertEquals(SESSION.getSourceHost(), sesionUp.getSourceHost());
-        Assert.assertEquals(SESSION.getUsername(), sesionUp.getUsername());
-
-        monitoringService.onSessionDown(sessionMock);
-        ArgumentCaptor<Session> sessionDownCaptor = ArgumentCaptor.forClass(Session.class);
-        verify(listener).onSessionEnded(sessionDownCaptor.capture());
-        final Session sessionDown = sessionDownCaptor.getValue();
-        Assert.assertEquals(SESSION.getSessionId(), sessionDown.getSessionId());
-        Assert.assertEquals(SESSION.getSourceHost(), sessionDown.getSourceHost());
-        Assert.assertEquals(SESSION.getUsername(), sessionDown.getUsername());
-    }
 }
diff --git a/netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringServiceTest.java b/netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/osgi/NetconfSessionMonitoringServiceTest.java
new file mode 100644 (file)
index 0000000..bb7dab8
--- /dev/null
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.impl.osgi;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.base.Optional;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.controller.config.util.capability.BasicCapability;
+import org.opendaylight.controller.config.util.capability.Capability;
+import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
+import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionEvent;
+import org.opendaylight.netconf.notifications.BaseNotificationPublisherRegistration;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.SessionBuilder;
+
+public class NetconfSessionMonitoringServiceTest {
+
+    private static final Session SESSION_1 = new SessionBuilder()
+            .setSessionId(1L)
+            .setSourceHost(new Host("0.0.0.0".toCharArray()))
+            .setUsername("admin")
+            .build();
+    private static final Session SESSION_2 = new SessionBuilder()
+            .setSessionId(2L)
+            .setSourceHost(new Host("0.0.0.0".toCharArray()))
+            .setUsername("admin")
+            .build();
+
+    @Mock
+    private NetconfManagementSession sessionMock1;
+    @Mock
+    private NetconfManagementSession sessionMock2;
+    @Mock
+    private NetconfMonitoringService.SessionsListener listener;
+    @Mock
+    private BaseNotificationPublisherRegistration notificationPublisher;
+
+    private NetconfSessionMonitoringService monitoringService;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        doReturn(SESSION_1).when(sessionMock1).toManagementSession();
+        doReturn(SESSION_2).when(sessionMock2).toManagementSession();
+        doNothing().when(listener).onSessionStarted(any());
+        doNothing().when(listener).onSessionEnded(any());
+
+        doNothing().when(notificationPublisher).onCapabilityChanged(any());
+        doNothing().when(notificationPublisher).onSessionStarted(any());
+        doNothing().when(notificationPublisher).onSessionEnded(any());
+
+        monitoringService = new NetconfSessionMonitoringService(Optional.absent(), 0);
+        monitoringService.registerListener(listener);
+    }
+
+    @Test
+    public void testListeners() throws Exception {
+        monitoringService.onSessionUp(sessionMock1);
+        HashSet<Capability> added = new HashSet<>();
+        added.add(new BasicCapability("toAdd"));
+        monitoringService.onSessionDown(sessionMock1);
+        verify(listener).onSessionStarted(any());
+        verify(listener).onSessionEnded(any());
+    }
+
+
+    @Test
+    public void testClose() throws Exception {
+        monitoringService.onSessionUp(sessionMock1);
+        Assert.assertFalse(monitoringService.getSessions().getSession().isEmpty());
+        monitoringService.close();
+        Assert.assertTrue(monitoringService.getSessions().getSession().isEmpty());
+    }
+
+
+    @Test
+    public void testOnSessionUpAndDown() throws Exception {
+        monitoringService.onSessionUp(sessionMock1);
+        ArgumentCaptor<Session> sessionUpCaptor = ArgumentCaptor.forClass(Session.class);
+        verify(listener).onSessionStarted(sessionUpCaptor.capture());
+        final Session sesionUp = sessionUpCaptor.getValue();
+        Assert.assertEquals(SESSION_1.getSessionId(), sesionUp.getSessionId());
+        Assert.assertEquals(SESSION_1.getSourceHost(), sesionUp.getSourceHost());
+        Assert.assertEquals(SESSION_1.getUsername(), sesionUp.getUsername());
+
+        monitoringService.onSessionDown(sessionMock1);
+        ArgumentCaptor<Session> sessionDownCaptor = ArgumentCaptor.forClass(Session.class);
+        verify(listener).onSessionEnded(sessionDownCaptor.capture());
+        final Session sessionDown = sessionDownCaptor.getValue();
+        Assert.assertEquals(SESSION_1.getSessionId(), sessionDown.getSessionId());
+        Assert.assertEquals(SESSION_1.getSourceHost(), sessionDown.getSourceHost());
+        Assert.assertEquals(SESSION_1.getUsername(), sessionDown.getUsername());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testListenerUpdateSession() throws Exception {
+        ScheduledThreadPool threadPool = mock(ScheduledThreadPool.class);
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+        doReturn(executor).when(threadPool).getExecutor();
+        monitoringService = new NetconfSessionMonitoringService(Optional.of(threadPool), 1);
+        monitoringService.registerListener(listener);
+        monitoringService.onSessionUp(sessionMock1);
+        monitoringService.onSessionUp(sessionMock2);
+        monitoringService.onSessionEvent(SessionEvent.inRpcSuccess(sessionMock1));
+        ArgumentCaptor<Collection> captor =
+                ArgumentCaptor.forClass(Collection.class);
+        verify(listener, timeout(2000)).onSessionsUpdated(captor.capture());
+        final Collection<Session> value = captor.getValue();
+        Assert.assertTrue(value.contains(SESSION_1));
+        Assert.assertFalse(value.contains(SESSION_2));
+        monitoringService.close();
+    }
+}
index ccd416379640f2e9fa0d668d9031cf1a301514b4..387c36d27ac1c288b1ef35584a31ac3b28d1dacf 100644 (file)
                   <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:netconf:north:mapper">dom:netconf-northbound-mapper</type>
                   <name>mapper-aggregator</name>
               </aggregator>
+              <scheduled-threadpool xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:netconf:northbound:impl">
+                  <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadpool</type>
+                  <name>global-netconf-ssh-scheduled-executor</name>
+              </scheduled-threadpool>
+              <monitoring-update-interval xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:netconf:northbound:impl">6</monitoring-update-interval>
           </module>
 
           <module>
index af332963e61a61a64c1effca215aee4fdcf6c285..156fe9c31a65dd4d6c3cd054a44e98fb2e9ca301 100644 (file)
@@ -20,6 +20,7 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.config.util.xml.XmlUtil;
 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
 import org.opendaylight.netconf.monitoring.xml.model.NetconfState;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
@@ -78,7 +79,6 @@ public class JaxBSerializerTest {
 
         final NetconfState model = new NetconfState(monitoringService);
         final String xml = XmlUtil.toString(new JaxBSerializer().toXml(model)).replaceAll("\\s", "");
-        System.out.println(xml);
         assertThat(xml, CoreMatchers.containsString(
                 "<schema>" +
                 "<format>yang</format>" +
index 60f08e19ca72e2f0fda84210b9cde446608944b5..a4144b87003345595c8e65357440151a101dcffd 100644 (file)
@@ -18,8 +18,8 @@ import java.util.Collections;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.config.util.capability.Capability;
-import org.opendaylight.netconf.api.monitoring.NetconfManagementSession;
 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.netconf.api.monitoring.SessionListener;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.Yang;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Capabilities;
@@ -86,6 +86,11 @@ public class DummyMonitoringService implements NetconfMonitoringService {
         return EMPTY_SESSIONS;
     }
 
+    @Override
+    public SessionListener getSessionListener() {
+        return null;
+    }
+
     @Override
     public Schemas getSchemas() {
         return schemas;
@@ -108,22 +113,13 @@ public class DummyMonitoringService implements NetconfMonitoringService {
     }
 
     @Override
-    public AutoCloseable registerListener(MonitoringListener listener) {
+    public AutoCloseable registerCapabilitiesListener(CapabilitiesListener listener) {
         return null;
     }
 
     @Override
-    public void onSessionUp(NetconfManagementSession session) {
-
-    }
-
-    @Override
-    public void onSessionDown(NetconfManagementSession session) {
-
+    public AutoCloseable registerSessionsListener(SessionsListener listener) {
+        return null;
     }
 
-    @Override
-    public void onCapabilitiesChanged(Set<Capability> added, Set<Capability> removed) {
-
-    }
 }