Cluster schema resolution pipeline 15/31415/13
authorJakub Morvay <jmorvay@cisco.com>
Wed, 16 Dec 2015 10:22:07 +0000 (11:22 +0100)
committerTomas Cere <tcere@cisco.com>
Mon, 21 Dec 2015 12:05:40 +0000 (13:05 +0100)
Change-Id: Ia469fedd25d1511308e52328be925845827968c0
Signed-off-by: Jakub Morvay <jmorvay@cisco.com>
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDevice.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicator.java [new file with mode: 0644]
opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDevice.java
opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java

index 88ed13ca6de1dc3689db3afc70621d6cf3da84cd..8f3f62140f1794278db72bfd30279286c77a52ec 100644 (file)
@@ -32,6 +32,7 @@ import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.opendaylight.controller.sal.core.api.Provider;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
@@ -78,12 +79,12 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class);
 
-    private static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L;
-    private static final int DEFAULT_KEEPALIVE_DELAY = 0;
-    private static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
-    private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
-    private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
-    private static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
+    protected static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L;
+    protected static final int DEFAULT_KEEPALIVE_DELAY = 0;
+    protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+    protected static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
+    protected static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
+    protected static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
 
     private static FilesystemSchemaSourceCache<YangTextSchemaSource> CACHE = null;
     //keep track of already initialized repositories to avoid adding redundant listeners
@@ -92,14 +93,14 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     protected final String topologyId;
     private final NetconfClientDispatcher clientDispatcher;
     protected final BindingAwareBroker bindingAwareBroker;
-    private final Broker domBroker;
+    protected final Broker domBroker;
     private final EventExecutor eventExecutor;
-    private final ScheduledThreadPool keepaliveExecutor;
-    private final ThreadPool processingExecutor;
-    private final SharedSchemaRepository sharedSchemaRepository;
+    protected final ScheduledThreadPool keepaliveExecutor;
+    protected final ThreadPool processingExecutor;
+    protected final SharedSchemaRepository sharedSchemaRepository;
 
-    private SchemaSourceRegistry schemaRegistry = null;
-    private SchemaContextFactory schemaContextFactory = null;
+    protected SchemaSourceRegistry schemaRegistry = null;
+    protected SchemaContextFactory schemaContextFactory = null;
 
     protected DOMMountPointService mountPointService = null;
     protected DataBroker dataBroker = null;
@@ -181,7 +182,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         return Futures.immediateFuture(null);
     }
 
-    private ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
+    protected ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
                                                                         final Node configNode) {
         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
 
@@ -191,8 +192,10 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
 
         final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
         final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
-        final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(deviceCommunicator, netconfNode);
+        final NetconfClientSessionListener netconfClientSessionListener = deviceCommunicatorDTO.getSessionListener();
+        final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(netconfClientSessionListener, netconfNode);
         final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
+
         activeConnectors.put(nodeId, deviceCommunicatorDTO);
 
         Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
@@ -211,7 +214,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         return future;
     }
 
-    private NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
+    protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
                                                          final NetconfNode node) {
         //setup default values since default value is not supported yet in mdsal
         // TODO remove this when mdsal starts supporting default values
@@ -242,7 +245,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), salFacade);
     }
 
-    public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener, NetconfNode node) {
+    public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener, NetconfNode node) {
 
         //setup default values since default value is not supported yet in mdsal
         // TODO remove this when mdsal starts supporting default values
@@ -357,12 +360,12 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         }
     }
 
-    protected static final class NetconfConnectorDTO {
+    protected static class NetconfConnectorDTO {
 
         private final NetconfDeviceCommunicator communicator;
         private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
 
-        private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
+        public NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
             this.communicator = communicator;
             this.facade = facade;
         }
@@ -374,6 +377,10 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         public RemoteDeviceHandler<NetconfSessionPreferences> getFacade() {
             return facade;
         }
+
+        public NetconfClientSessionListener getSessionListener() {
+            return communicator;
+        }
     }
 
 }
index bc472e98d8a9ee9fa79cce10a8c72049c8f38c89..6b64d9513e93110b9ff51326ab6c8477f405bfe3 100644 (file)
@@ -18,6 +18,7 @@ import akka.japi.Creator;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import io.netty.util.concurrent.EventExecutor;
+import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import javassist.ClassPool;
@@ -29,8 +30,12 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
 import org.opendaylight.netconf.topology.NetconfTopology;
@@ -41,13 +46,18 @@ import org.opendaylight.netconf.topology.TopologyManager;
 import org.opendaylight.netconf.topology.TopologyManagerCallback;
 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
 import org.opendaylight.netconf.topology.util.NodeWriter;
 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
@@ -93,6 +103,8 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements
         LOG.warn("Clustered netconf topo started");
     }
 
+
+
     @Override
     public void onSessionInitiated(final ProviderContext session) {
         dataBroker = session.getSALService(DataBroker.class);
@@ -123,6 +135,38 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements
         activeConnectors.clear();
     }
 
+    @Override
+    protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
+                                                           final NetconfNode node) {
+        //setup default values since default value is not supported yet in mdsal
+        // TODO remove this when mdsal starts supporting default values
+        final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
+        final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
+        final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
+
+        IpAddress ipAddress = node.getHost().getIpAddress();
+        InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
+                ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
+                node.getPort().getValue());
+        RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
+
+        RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
+                createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+
+        if (keepaliveDelay > 0) {
+            LOG.warn("Adding keepalive facade, for device {}", nodeId);
+            salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
+        }
+
+        NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
+                new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+
+        NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
+                processingExecutor.getExecutor(), sharedSchemaRepository, actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
+
+        return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
+    }
+
     @Override
     protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
         return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
@@ -155,6 +199,11 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements
         return Collections.emptySet();
     }
 
+    public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
+        Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
+        return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
+    }
+
     static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
 
         private final NetconfTopology netconfTopology;
index 4c5e04b211a33aaeb37a33d2a31ed44db12bfc5d..9813bbc878be3227466e114ad959e2d0713ea39d 100644 (file)
@@ -30,6 +30,10 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.client.NetconfClientSession;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.topology.NetconfTopology;
@@ -37,6 +41,7 @@ import org.opendaylight.netconf.topology.NodeManager;
 import org.opendaylight.netconf.topology.NodeManagerCallback;
 import org.opendaylight.netconf.topology.RoleChangeStrategy;
 import org.opendaylight.netconf.topology.TopologyManager;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
 import org.opendaylight.netconf.topology.util.BaseNodeManager;
 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
@@ -64,7 +69,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-public class NetconfNodeManagerCallback implements NodeManagerCallback{
+public class NetconfNodeManagerCallback implements NodeManagerCallback, NetconfClientSessionListener{
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
 
@@ -103,7 +108,8 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
     private Node currentConfig;
     private Node currentOperationalNode;
 
-    private ConnectionStatusListenerRegistration registration = null;
+    private ConnectionStatusListenerRegistration connectionStatusregistration = null;
+    private NetconfClientSessionListenerRegistration sessionListener = null;
 
     private ActorRef masterDataBrokerRef = null;
     private boolean connected = false;
@@ -223,7 +229,8 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
             @Override
             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
-                registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
+                connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
+                sessionListener = topologyDispatcher.registerNetconfClientSessionListener(nodeId, NetconfNodeManagerCallback.this);
             }
 
             @Override
@@ -269,8 +276,9 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
                                                 @Nonnull final Node configNode) {
         // first disconnect this node
         topologyDispatcher.unregisterMountPoint(nodeId);
-        if (registration != null) {
-            registration.close();
+
+        if (connectionStatusregistration != null) {
+            connectionStatusregistration.close();
         }
         topologyDispatcher.disconnectNode(nodeId);
 
@@ -280,7 +288,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
             @Override
             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
-                registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
+                connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
             }
 
             @Override
@@ -323,8 +331,9 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
     @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
         // cleanup and disconnect
         topologyDispatcher.unregisterMountPoint(nodeId);
-        if (registration != null) {
-            registration.close();
+
+        if(connectionStatusregistration != null) {
+            connectionStatusregistration.close();
         }
         roleChangeStrategy.unregisterRoleCandidate();
         return topologyDispatcher.disconnectNode(nodeId);
@@ -359,9 +368,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
     @Override
     public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
-        LOG.debug("onDeviceConnected received, registering role candidate");
         connected = true;
-        roleChangeStrategy.registerRoleCandidate(nodeManager);
         if (!isMaster && masterDataBrokerRef != null) {
             // if we're not master but one is present already, we need to register mountpoint
             LOG.warn("Device connected, master already present in topology, registering mount point");
@@ -403,7 +410,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
     @Override
     public void onDeviceDisconnected() {
         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
-        LOG.debug("onDeviceDisconnected received, unregistering role candidate");
+        LOG.debug("onDeviceDisconnected received, unregistered role candidate");
         connected = false;
         if (isMaster) {
             // announce that master mount point is going down
@@ -415,7 +422,6 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
             // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
         }
-        roleChangeStrategy.unregisterRoleCandidate();
 
         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
@@ -445,7 +451,6 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
         connected = false;
         String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
 
-        roleChangeStrategy.unregisterRoleCandidate();
         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
                 .addAugmentation(NetconfNode.class,
                         new NetconfNodeBuilder()
@@ -491,4 +496,28 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{
             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
         }
     }
+
+    @Override
+    public void onSessionUp(NetconfClientSession netconfClientSession) {
+        //NetconfClientSession is up, we can register role candidate
+        LOG.debug("Netconf client session is up, registering role candidate");
+        roleChangeStrategy.registerRoleCandidate(nodeManager);
+    }
+
+    @Override
+    public void onSessionDown(NetconfClientSession netconfClientSession, Exception e) {
+        LOG.debug("Netconf client session is down, unregistering role candidate");
+        roleChangeStrategy.unregisterRoleCandidate();
+    }
+
+    @Override
+    public void onSessionTerminated(NetconfClientSession netconfClientSession, NetconfTerminationReason netconfTerminationReason) {
+        LOG.debug("Netconf client session is down, unregistering role candidate");
+        roleChangeStrategy.unregisterRoleCandidate();
+    }
+
+    @Override
+    public void onMessage(NetconfClientSession netconfClientSession, NetconfMessage netconfMessage) {
+        //NOOP
+    }
 }
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDevice.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDevice.java
new file mode 100644 (file)
index 0000000..0040c55
--- /dev/null
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.dispatch.OnComplete;
+import akka.japi.Creator;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceRpc;
+import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusteredNetconfDevice extends NetconfDevice implements EntityOwnershipListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfDevice.class);
+
+    private boolean isMaster = false;
+    private NetconfDeviceCommunicator listener;
+    private NetconfSessionPreferences sessionPreferences;
+    private SchemaRepository schemaRepo;
+    private final ActorSystem actorSystem;
+    private final String topologyId;
+    private final String nodeId;
+    private final ActorContext cachedContext;
+
+    private MasterSourceProvider masterSourceProvider = null;
+    private ClusteredDeviceSourcesResolver resolver = null;
+
+    public ClusteredNetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
+                                  final ExecutorService globalProcessingExecutor, SchemaRepository schemaRepo, ActorSystem actorSystem, String topologyId, String nodeId,
+                                  ActorContext cachedContext) {
+        super(schemaResourcesDTO, id, salFacade, globalProcessingExecutor);
+        this.schemaRepo = schemaRepo;
+        this.actorSystem = actorSystem;
+        this.topologyId = topologyId;
+        this.nodeId = nodeId;
+        this.cachedContext = cachedContext;
+    }
+
+    @Override
+    public void onRemoteSessionUp(NetconfSessionPreferences remoteSessionCapabilities, NetconfDeviceCommunicator listener) {
+        LOG.warn("Node {} SessionUp, with capabilities {}", nodeId, remoteSessionCapabilities);
+        this.listener = listener;
+        this.sessionPreferences = remoteSessionCapabilities;
+        slaveSetupSchema();
+    }
+
+
+    @Override
+    protected void handleSalInitializationSuccess(SchemaContext result, NetconfSessionPreferences remoteSessionCapabilities, DOMRpcService deviceRpc) {
+        super.handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
+
+        final Set<SourceIdentifier> sourceIds = Sets.newHashSet();
+        for(ModuleIdentifier id : result.getAllModuleIdentifiers()) {
+            sourceIds.add(SourceIdentifier.create(id.getName(), (SimpleDateFormatUtil.DEFAULT_DATE_REV == id.getRevision() ? Optional.<String>absent() :
+                    Optional.of(SimpleDateFormatUtil.getRevisionFormat().format(id.getRevision())))));
+        }
+
+        //TODO extract string constant to util class
+        LOG.debug("Creating master source provider");
+        masterSourceProvider = TypedActor.get(cachedContext).typedActorOf(
+                new TypedProps<>(MasterSourceProvider.class,
+                        new Creator<MasterSourceProviderImpl>() {
+                            @Override
+                            public MasterSourceProviderImpl create() throws Exception {
+                                return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
+                            }
+                        }), "masterSourceProvider");
+    }
+
+    @Override
+    public void onRemoteSessionDown() {
+        super.onRemoteSessionDown();
+        listener = null;
+        sessionPreferences = null;
+        if (masterSourceProvider != null) {
+            // if we have master the slave that started on this node should be already killed via PoisonPill, so stop master only now
+            LOG.debug("Stopping master source provider for node {}", nodeId);
+            TypedActor.get(actorSystem).stop(masterSourceProvider);
+            masterSourceProvider = null;
+        } else {
+            LOG.debug("Stopping slave source resolver for node {}", nodeId);
+            TypedActor.get(actorSystem).stop(resolver);
+            resolver = null;
+        }
+    }
+
+    private void slaveSetupSchema() {
+        //TODO extract string constant to util class
+        resolver = TypedActor.get(cachedContext).typedActorOf(
+                new TypedProps<>(ClusteredDeviceSourcesResolver.class,
+                        new Creator<ClusteredDeviceSourcesResolverImpl>() {
+                            @Override
+                            public ClusteredDeviceSourcesResolverImpl create() throws Exception {
+                                return new ClusteredDeviceSourcesResolverImpl(topologyId, nodeId, actorSystem, schemaRegistry, sourceRegistrations);
+                            }
+                        }), "clusteredDeviceSourcesResolver");
+
+
+        final FutureCallback<SchemaContext> schemaContextFuture = new FutureCallback<SchemaContext>() {
+            @Override
+            public void onSuccess(SchemaContext schemaContext) {
+                LOG.debug("{}: Schema context built successfully.", id);
+
+                final NetconfDeviceCapabilities deviceCap = sessionPreferences.getNetconfDeviceCapabilities();
+                final Set<QName> providedSourcesQnames = Sets.newHashSet();
+                for(ModuleIdentifier id : schemaContext.getAllModuleIdentifiers()) {
+                    providedSourcesQnames.add(QName.create(id.getQNameModule(), id.getName()));
+                }
+
+                deviceCap.addNonModuleBasedCapabilities(sessionPreferences.getNonModuleCaps());
+                deviceCap.addCapabilities(providedSourcesQnames);
+
+                ClusteredNetconfDevice.super.handleSalInitializationSuccess(
+                        schemaContext, sessionPreferences, getDeviceSpecificRpc(schemaContext, listener));
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) {
+                LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
+                handleSalInitializationFailure(throwable, listener);
+            }
+        };
+
+        resolver.getResolvedSources().onComplete(
+                new OnComplete<Set<SourceIdentifier>>() {
+                    @Override
+                    public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
+                        if(throwable != null) {
+                            if(throwable instanceof MasterSourceProviderOnSameNodeException) {
+                                //do nothing
+                            } else {
+                                LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
+                                handleSalInitializationFailure(throwable, listener);
+                            }
+                        } else {
+                            LOG.trace("{}: Trying to build schema context from {}", id, sourceIdentifiers);
+                            Futures.addCallback(schemaContextFactory.createSchemaContext(sourceIdentifiers), schemaContextFuture);
+                        }
+                    }
+                }, actorSystem.dispatcher());
+    }
+
+    private NetconfDeviceRpc getDeviceSpecificRpc(SchemaContext result, RemoteDeviceCommunicator<NetconfMessage> listener) {
+        return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
+    }
+
+    @Override
+    public void ownershipChanged(EntityOwnershipChange ownershipChange) {
+        LOG.debug("Entity ownership change received {}", ownershipChange);
+        if(ownershipChange.isOwner()) {
+            super.onRemoteSessionUp(sessionPreferences, listener);
+        } else if (ownershipChange.wasOwner()) {
+            slaveSetupSchema();
+        }
+    }
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicator.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicator.java
new file mode 100644 (file)
index 0000000..0b19197
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import java.util.ArrayList;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.client.NetconfClientSession;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+
+public class ClusteredNetconfDeviceCommunicator extends NetconfDeviceCommunicator {
+
+    private final EntityOwnershipService ownershipService;
+
+    private final ArrayList<NetconfClientSessionListener> netconfClientSessionListeners = new ArrayList<>();
+    private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
+
+    public ClusteredNetconfDeviceCommunicator(RemoteDeviceId id, NetconfDevice remoteDevice, EntityOwnershipService ownershipService) {
+        super(id, remoteDevice);
+        this.ownershipService = ownershipService;
+    }
+
+    @Override
+    public void onMessage(NetconfClientSession session, NetconfMessage message) {
+        super.onMessage(session, message);
+        for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+            listener.onMessage(session, message);
+        }
+    }
+
+    @Override
+    public void onSessionDown(NetconfClientSession session, Exception e) {
+        super.onSessionDown(session, e);
+        ownershipListenerRegistration.close();
+        for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+            listener.onSessionDown(session, e);
+        }
+    }
+
+    @Override
+    public void onSessionUp(NetconfClientSession session) {
+        super.onSessionUp(session);
+        ownershipListenerRegistration = ownershipService.registerListener("netconf-node/" + id.getName(), (ClusteredNetconfDevice) remoteDevice);
+        for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+            listener.onSessionUp(session);
+        }
+    }
+
+    @Override
+    public void onSessionTerminated(NetconfClientSession session, NetconfTerminationReason reason) {
+        super.onSessionTerminated(session, reason);
+        ownershipListenerRegistration.close();
+        for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+            listener.onSessionTerminated(session, reason);
+        }
+    }
+
+    public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(NetconfClientSessionListener listener) {
+        netconfClientSessionListeners.add(listener);
+        return new NetconfClientSessionListenerRegistration(listener);
+    }
+
+    public class NetconfClientSessionListenerRegistration {
+
+        private final NetconfClientSessionListener listener;
+
+        public NetconfClientSessionListenerRegistration(NetconfClientSessionListener listener) {
+            this.listener = listener;
+        }
+
+        public void close() {
+            netconfClientSessionListeners.remove(listener);
+        }
+    }
+}
index 23cb6ad0300a434575731cbb8563dbdddd5a01a6..f6f62fe8a2a63d05daeaa384bf1dc95e394c6063 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.netconf.sal.connect.netconf;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -64,7 +63,7 @@ import org.slf4j.LoggerFactory;
 /**
  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
  */
-public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
+public class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfDevice.class);
 
@@ -95,16 +94,16 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
         }
     };
 
-    private final RemoteDeviceId id;
+    protected final RemoteDeviceId id;
     private final boolean reconnectOnSchemasChange;
 
-    private final SchemaContextFactory schemaContextFactory;
+    protected final SchemaContextFactory schemaContextFactory;
     private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
     private final ListeningExecutorService processingExecutor;
-    private final SchemaSourceRegistry schemaRegistry;
+    protected final SchemaSourceRegistry schemaRegistry;
     private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
     private final NotificationHandler notificationHandler;
-    private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
+    protected final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
 
     // Message transformer is constructed once the schemas are available
     private MessageTransformer<NetconfMessage> messageTransformer;
@@ -214,8 +213,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
         return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
     }
 
-    @VisibleForTesting
-    void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
+    protected void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
         messageTransformer = new NetconfMessageTransformer(result, true);
 
         updateTransformer(messageTransformer);
@@ -226,7 +224,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
         LOG.info("{}: Netconf connector initialized successfully", id);
     }
 
-    private void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+    protected void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
         LOG.error("{}: Initialization in sal failed, disconnecting from device", id, t);
         listener.close();
         onRemoteSessionDown();
@@ -460,7 +458,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferenc
             Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback);
         }
 
-        private NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
+        protected NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
             return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
         }
 
index d174d9546abec51ac1ba482a79875511712e7eed..27b05ee5ef36d8378a99a4614730aeca2a501f2b 100644 (file)
@@ -48,9 +48,9 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
 
-    private final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
+    protected final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
     private final Optional<NetconfSessionPreferences> overrideNetconfCapabilities;
-    private final RemoteDeviceId id;
+    protected final RemoteDeviceId id;
     private final Lock sessionLock = new ReentrantLock();
 
     // TODO implement concurrent message limit