Cluster schema resolution pipeline
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / ClusteredNetconfTopology.java
index bc472e98d8a9ee9fa79cce10a8c72049c8f38c89..6b64d9513e93110b9ff51326ab6c8477f405bfe3 100644 (file)
@@ -18,6 +18,7 @@ import akka.japi.Creator;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import io.netty.util.concurrent.EventExecutor;
+import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import javassist.ClassPool;
@@ -29,8 +30,12 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
 import org.opendaylight.netconf.topology.NetconfTopology;
@@ -41,13 +46,18 @@ import org.opendaylight.netconf.topology.TopologyManager;
 import org.opendaylight.netconf.topology.TopologyManagerCallback;
 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
 import org.opendaylight.netconf.topology.util.NodeWriter;
 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
@@ -93,6 +103,8 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements
         LOG.warn("Clustered netconf topo started");
     }
 
+
+
     @Override
     public void onSessionInitiated(final ProviderContext session) {
         dataBroker = session.getSALService(DataBroker.class);
@@ -123,6 +135,38 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements
         activeConnectors.clear();
     }
 
+    @Override
+    protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
+                                                           final NetconfNode node) {
+        //setup default values since default value is not supported yet in mdsal
+        // TODO remove this when mdsal starts supporting default values
+        final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
+        final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
+        final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
+
+        IpAddress ipAddress = node.getHost().getIpAddress();
+        InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
+                ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
+                node.getPort().getValue());
+        RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
+
+        RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
+                createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+
+        if (keepaliveDelay > 0) {
+            LOG.warn("Adding keepalive facade, for device {}", nodeId);
+            salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
+        }
+
+        NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
+                new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+
+        NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
+                processingExecutor.getExecutor(), sharedSchemaRepository, actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
+
+        return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
+    }
+
     @Override
     protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
         return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
@@ -155,6 +199,11 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements
         return Collections.emptySet();
     }
 
+    public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
+        Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
+        return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
+    }
+
     static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
 
         private final NetconfTopology netconfTopology;