From: Jakub Morvay Date: Wed, 16 Dec 2015 10:22:07 +0000 (+0100) Subject: Cluster schema resolution pipeline X-Git-Tag: release/beryllium~45^2~1 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=157141b95ba44d3bc142424b7e63874fa6704c6b;p=netconf.git Cluster schema resolution pipeline Change-Id: Ia469fedd25d1511308e52328be925845827968c0 Signed-off-by: Jakub Morvay --- diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java index 88ed13ca6d..8f3f62140f 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java @@ -32,6 +32,7 @@ import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.opendaylight.controller.sal.core.api.Provider; import org.opendaylight.netconf.client.NetconfClientDispatcher; +import org.opendaylight.netconf.client.NetconfClientSessionListener; import org.opendaylight.netconf.client.conf.NetconfClientConfiguration; import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration; import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder; @@ -78,12 +79,12 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class); - private static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L; - private static final int DEFAULT_KEEPALIVE_DELAY = 0; - private static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false; - private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0; - private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000; - private static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5); + protected static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L; + protected static final int DEFAULT_KEEPALIVE_DELAY = 0; + protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false; + protected static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0; + protected static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000; + protected static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5); private static FilesystemSchemaSourceCache CACHE = null; //keep track of already initialized repositories to avoid adding redundant listeners @@ -92,14 +93,14 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin protected final String topologyId; private final NetconfClientDispatcher clientDispatcher; protected final BindingAwareBroker bindingAwareBroker; - private final Broker domBroker; + protected final Broker domBroker; private final EventExecutor eventExecutor; - private final ScheduledThreadPool keepaliveExecutor; - private final ThreadPool processingExecutor; - private final SharedSchemaRepository sharedSchemaRepository; + protected final ScheduledThreadPool keepaliveExecutor; + protected final ThreadPool processingExecutor; + protected final SharedSchemaRepository sharedSchemaRepository; - private SchemaSourceRegistry schemaRegistry = null; - private SchemaContextFactory schemaContextFactory = null; + protected SchemaSourceRegistry schemaRegistry = null; + protected SchemaContextFactory schemaContextFactory = null; protected DOMMountPointService mountPointService = null; protected DataBroker dataBroker = null; @@ -181,7 +182,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin return Futures.immediateFuture(null); } - private ListenableFuture setupConnection(final NodeId nodeId, + protected ListenableFuture setupConnection(final NodeId nodeId, final Node configNode) { final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class); @@ -191,8 +192,10 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode); final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator(); - final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(deviceCommunicator, netconfNode); + final NetconfClientSessionListener netconfClientSessionListener = deviceCommunicatorDTO.getSessionListener(); + final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(netconfClientSessionListener, netconfNode); final ListenableFuture future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig); + activeConnectors.put(nodeId, deviceCommunicatorDTO); Futures.addCallback(future, new FutureCallback() { @@ -211,7 +214,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin return future; } - private NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, + protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node) { //setup default values since default value is not supported yet in mdsal // TODO remove this when mdsal starts supporting default values @@ -242,7 +245,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), salFacade); } - public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener, NetconfNode node) { + public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener, NetconfNode node) { //setup default values since default value is not supported yet in mdsal // TODO remove this when mdsal starts supporting default values @@ -357,12 +360,12 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin } } - protected static final class NetconfConnectorDTO { + protected static class NetconfConnectorDTO { private final NetconfDeviceCommunicator communicator; private final RemoteDeviceHandler facade; - private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler facade) { + public NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler facade) { this.communicator = communicator; this.facade = facade; } @@ -374,6 +377,10 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin public RemoteDeviceHandler getFacade() { return facade; } + + public NetconfClientSessionListener getSessionListener() { + return communicator; + } } } diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java index bc472e98d8..6b64d9513e 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java @@ -18,6 +18,7 @@ import akka.japi.Creator; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import io.netty.util.concurrent.EventExecutor; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; import javassist.ClassPool; @@ -29,8 +30,12 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.netconf.client.NetconfClientDispatcher; +import org.opendaylight.netconf.client.NetconfClientSessionListener; import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; +import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice; +import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; +import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.AbstractNetconfTopology; import org.opendaylight.netconf.topology.NetconfTopology; @@ -41,13 +46,18 @@ import org.opendaylight.netconf.topology.TopologyManager; import org.opendaylight.netconf.topology.TopologyManagerCallback; import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory; import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter; +import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice; +import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator; +import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration; import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade; import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration; import org.opendaylight.netconf.topology.util.BaseTopologyManager; import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy; import org.opendaylight.netconf.topology.util.NodeWriter; import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator; import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry; @@ -93,6 +103,8 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements LOG.warn("Clustered netconf topo started"); } + + @Override public void onSessionInitiated(final ProviderContext session) { dataBroker = session.getSALService(DataBroker.class); @@ -123,6 +135,38 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements activeConnectors.clear(); } + @Override + protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, + final NetconfNode node) { + //setup default values since default value is not supported yet in mdsal + // TODO remove this when mdsal starts supporting default values + final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis(); + final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay(); + final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema(); + + IpAddress ipAddress = node.getHost().getIpAddress(); + InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ? + ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(), + node.getPort().getValue()); + RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address); + + RemoteDeviceHandler salFacade = + createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis); + + if (keepaliveDelay > 0) { + LOG.warn("Adding keepalive facade, for device {}", nodeId); + salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay); + } + + NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = + new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl()); + + NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade, + processingExecutor.getExecutor(), sharedSchemaRepository, actorSystem, topologyId, nodeId.getValue(), TypedActor.context()); + + return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade); + } + @Override protected RemoteDeviceHandler createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) { return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis); @@ -155,6 +199,11 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements return Collections.emptySet(); } + public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) { + Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered"); + return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener); + } + static class TopologyCallbackFactory implements TopologyManagerCallbackFactory { private final NetconfTopology netconfTopology; diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java index 4c5e04b211..9813bbc878 100644 --- a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java @@ -30,6 +30,10 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.netconf.api.NetconfMessage; +import org.opendaylight.netconf.api.NetconfTerminationReason; +import org.opendaylight.netconf.client.NetconfClientSession; +import org.opendaylight.netconf.client.NetconfClientSessionListener; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.netconf.topology.NetconfTopology; @@ -37,6 +41,7 @@ import org.opendaylight.netconf.topology.NodeManager; import org.opendaylight.netconf.topology.NodeManagerCallback; import org.opendaylight.netconf.topology.RoleChangeStrategy; import org.opendaylight.netconf.topology.TopologyManager; +import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration; import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration; import org.opendaylight.netconf.topology.util.BaseNodeManager; import org.opendaylight.netconf.topology.util.BaseTopologyManager; @@ -64,7 +69,7 @@ import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -public class NetconfNodeManagerCallback implements NodeManagerCallback{ +public class NetconfNodeManagerCallback implements NodeManagerCallback, NetconfClientSessionListener{ private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class); @@ -103,7 +108,8 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ private Node currentConfig; private Node currentOperationalNode; - private ConnectionStatusListenerRegistration registration = null; + private ConnectionStatusListenerRegistration connectionStatusregistration = null; + private NetconfClientSessionListenerRegistration sessionListener = null; private ActorRef masterDataBrokerRef = null; private boolean connected = false; @@ -223,7 +229,8 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ Futures.addCallback(connectionFuture, new FutureCallback() { @Override public void onSuccess(@Nullable NetconfDeviceCapabilities result) { - registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager); + connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager); + sessionListener = topologyDispatcher.registerNetconfClientSessionListener(nodeId, NetconfNodeManagerCallback.this); } @Override @@ -269,8 +276,9 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ @Nonnull final Node configNode) { // first disconnect this node topologyDispatcher.unregisterMountPoint(nodeId); - if (registration != null) { - registration.close(); + + if (connectionStatusregistration != null) { + connectionStatusregistration.close(); } topologyDispatcher.disconnectNode(nodeId); @@ -280,7 +288,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ Futures.addCallback(connectionFuture, new FutureCallback() { @Override public void onSuccess(@Nullable NetconfDeviceCapabilities result) { - registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager); + connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this); } @Override @@ -323,8 +331,9 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ @Nonnull @Override public ListenableFuture onNodeDeleted(@Nonnull final NodeId nodeId) { // cleanup and disconnect topologyDispatcher.unregisterMountPoint(nodeId); - if (registration != null) { - registration.close(); + + if(connectionStatusregistration != null) { + connectionStatusregistration.close(); } roleChangeStrategy.unregisterRoleCandidate(); return topologyDispatcher.disconnectNode(nodeId); @@ -359,9 +368,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ @Override public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) { // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result - LOG.debug("onDeviceConnected received, registering role candidate"); connected = true; - roleChangeStrategy.registerRoleCandidate(nodeManager); if (!isMaster && masterDataBrokerRef != null) { // if we're not master but one is present already, we need to register mountpoint LOG.warn("Device connected, master already present in topology, registering mount point"); @@ -403,7 +410,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ @Override public void onDeviceDisconnected() { // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result - LOG.debug("onDeviceDisconnected received, unregistering role candidate"); + LOG.debug("onDeviceDisconnected received, unregistered role candidate"); connected = false; if (isMaster) { // announce that master mount point is going down @@ -415,7 +422,6 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects topologyDispatcher.unregisterMountPoint(new NodeId(nodeId)); } - roleChangeStrategy.unregisterRoleCandidate(); final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class); currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId)) @@ -445,7 +451,6 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ connected = false; String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON; - roleChangeStrategy.unregisterRoleCandidate(); currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId)) .addAugmentation(NetconfNode.class, new NetconfNodeBuilder() @@ -491,4 +496,28 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback{ topologyDispatcher.unregisterMountPoint(new NodeId(nodeId)); } } + + @Override + public void onSessionUp(NetconfClientSession netconfClientSession) { + //NetconfClientSession is up, we can register role candidate + LOG.debug("Netconf client session is up, registering role candidate"); + roleChangeStrategy.registerRoleCandidate(nodeManager); + } + + @Override + public void onSessionDown(NetconfClientSession netconfClientSession, Exception e) { + LOG.debug("Netconf client session is down, unregistering role candidate"); + roleChangeStrategy.unregisterRoleCandidate(); + } + + @Override + public void onSessionTerminated(NetconfClientSession netconfClientSession, NetconfTerminationReason netconfTerminationReason) { + LOG.debug("Netconf client session is down, unregistering role candidate"); + roleChangeStrategy.unregisterRoleCandidate(); + } + + @Override + public void onMessage(NetconfClientSession netconfClientSession, NetconfMessage netconfMessage) { + //NOOP + } } \ No newline at end of file diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDevice.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDevice.java new file mode 100644 index 0000000000..0040c551a2 --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDevice.java @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline; + +import akka.actor.ActorContext; +import akka.actor.ActorSystem; +import akka.actor.TypedActor; +import akka.actor.TypedProps; +import akka.dispatch.OnComplete; +import akka.japi.Creator; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.netconf.api.NetconfMessage; +import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator; +import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler; +import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice; +import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities; +import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator; +import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; +import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceRpc; +import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.NetconfMessageTransformer; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil; +import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; +import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClusteredNetconfDevice extends NetconfDevice implements EntityOwnershipListener { + + private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfDevice.class); + + private boolean isMaster = false; + private NetconfDeviceCommunicator listener; + private NetconfSessionPreferences sessionPreferences; + private SchemaRepository schemaRepo; + private final ActorSystem actorSystem; + private final String topologyId; + private final String nodeId; + private final ActorContext cachedContext; + + private MasterSourceProvider masterSourceProvider = null; + private ClusteredDeviceSourcesResolver resolver = null; + + public ClusteredNetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ExecutorService globalProcessingExecutor, SchemaRepository schemaRepo, ActorSystem actorSystem, String topologyId, String nodeId, + ActorContext cachedContext) { + super(schemaResourcesDTO, id, salFacade, globalProcessingExecutor); + this.schemaRepo = schemaRepo; + this.actorSystem = actorSystem; + this.topologyId = topologyId; + this.nodeId = nodeId; + this.cachedContext = cachedContext; + } + + @Override + public void onRemoteSessionUp(NetconfSessionPreferences remoteSessionCapabilities, NetconfDeviceCommunicator listener) { + LOG.warn("Node {} SessionUp, with capabilities {}", nodeId, remoteSessionCapabilities); + this.listener = listener; + this.sessionPreferences = remoteSessionCapabilities; + slaveSetupSchema(); + } + + + @Override + protected void handleSalInitializationSuccess(SchemaContext result, NetconfSessionPreferences remoteSessionCapabilities, DOMRpcService deviceRpc) { + super.handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc); + + final Set sourceIds = Sets.newHashSet(); + for(ModuleIdentifier id : result.getAllModuleIdentifiers()) { + sourceIds.add(SourceIdentifier.create(id.getName(), (SimpleDateFormatUtil.DEFAULT_DATE_REV == id.getRevision() ? Optional.absent() : + Optional.of(SimpleDateFormatUtil.getRevisionFormat().format(id.getRevision()))))); + } + + //TODO extract string constant to util class + LOG.debug("Creating master source provider"); + masterSourceProvider = TypedActor.get(cachedContext).typedActorOf( + new TypedProps<>(MasterSourceProvider.class, + new Creator() { + @Override + public MasterSourceProviderImpl create() throws Exception { + return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId); + } + }), "masterSourceProvider"); + } + + @Override + public void onRemoteSessionDown() { + super.onRemoteSessionDown(); + listener = null; + sessionPreferences = null; + if (masterSourceProvider != null) { + // if we have master the slave that started on this node should be already killed via PoisonPill, so stop master only now + LOG.debug("Stopping master source provider for node {}", nodeId); + TypedActor.get(actorSystem).stop(masterSourceProvider); + masterSourceProvider = null; + } else { + LOG.debug("Stopping slave source resolver for node {}", nodeId); + TypedActor.get(actorSystem).stop(resolver); + resolver = null; + } + } + + private void slaveSetupSchema() { + //TODO extract string constant to util class + resolver = TypedActor.get(cachedContext).typedActorOf( + new TypedProps<>(ClusteredDeviceSourcesResolver.class, + new Creator() { + @Override + public ClusteredDeviceSourcesResolverImpl create() throws Exception { + return new ClusteredDeviceSourcesResolverImpl(topologyId, nodeId, actorSystem, schemaRegistry, sourceRegistrations); + } + }), "clusteredDeviceSourcesResolver"); + + + final FutureCallback schemaContextFuture = new FutureCallback() { + @Override + public void onSuccess(SchemaContext schemaContext) { + LOG.debug("{}: Schema context built successfully.", id); + + final NetconfDeviceCapabilities deviceCap = sessionPreferences.getNetconfDeviceCapabilities(); + final Set providedSourcesQnames = Sets.newHashSet(); + for(ModuleIdentifier id : schemaContext.getAllModuleIdentifiers()) { + providedSourcesQnames.add(QName.create(id.getQNameModule(), id.getName())); + } + + deviceCap.addNonModuleBasedCapabilities(sessionPreferences.getNonModuleCaps()); + deviceCap.addCapabilities(providedSourcesQnames); + + ClusteredNetconfDevice.super.handleSalInitializationSuccess( + schemaContext, sessionPreferences, getDeviceSpecificRpc(schemaContext, listener)); + } + + @Override + public void onFailure(Throwable throwable) { + LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable); + handleSalInitializationFailure(throwable, listener); + } + }; + + resolver.getResolvedSources().onComplete( + new OnComplete>() { + @Override + public void onComplete(Throwable throwable, Set sourceIdentifiers) throws Throwable { + if(throwable != null) { + if(throwable instanceof MasterSourceProviderOnSameNodeException) { + //do nothing + } else { + LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable); + handleSalInitializationFailure(throwable, listener); + } + } else { + LOG.trace("{}: Trying to build schema context from {}", id, sourceIdentifiers); + Futures.addCallback(schemaContextFactory.createSchemaContext(sourceIdentifiers), schemaContextFuture); + } + } + }, actorSystem.dispatcher()); + } + + private NetconfDeviceRpc getDeviceSpecificRpc(SchemaContext result, RemoteDeviceCommunicator listener) { + return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true)); + } + + @Override + public void ownershipChanged(EntityOwnershipChange ownershipChange) { + LOG.debug("Entity ownership change received {}", ownershipChange); + if(ownershipChange.isOwner()) { + super.onRemoteSessionUp(sessionPreferences, listener); + } else if (ownershipChange.wasOwner()) { + slaveSetupSchema(); + } + } +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicator.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicator.java new file mode 100644 index 0000000000..0b191974d7 --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredNetconfDeviceCommunicator.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline; + +import java.util.ArrayList; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; +import org.opendaylight.netconf.api.NetconfMessage; +import org.opendaylight.netconf.api.NetconfTerminationReason; +import org.opendaylight.netconf.client.NetconfClientSession; +import org.opendaylight.netconf.client.NetconfClientSessionListener; +import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice; +import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; + +public class ClusteredNetconfDeviceCommunicator extends NetconfDeviceCommunicator { + + private final EntityOwnershipService ownershipService; + + private final ArrayList netconfClientSessionListeners = new ArrayList<>(); + private EntityOwnershipListenerRegistration ownershipListenerRegistration = null; + + public ClusteredNetconfDeviceCommunicator(RemoteDeviceId id, NetconfDevice remoteDevice, EntityOwnershipService ownershipService) { + super(id, remoteDevice); + this.ownershipService = ownershipService; + } + + @Override + public void onMessage(NetconfClientSession session, NetconfMessage message) { + super.onMessage(session, message); + for(NetconfClientSessionListener listener : netconfClientSessionListeners) { + listener.onMessage(session, message); + } + } + + @Override + public void onSessionDown(NetconfClientSession session, Exception e) { + super.onSessionDown(session, e); + ownershipListenerRegistration.close(); + for(NetconfClientSessionListener listener : netconfClientSessionListeners) { + listener.onSessionDown(session, e); + } + } + + @Override + public void onSessionUp(NetconfClientSession session) { + super.onSessionUp(session); + ownershipListenerRegistration = ownershipService.registerListener("netconf-node/" + id.getName(), (ClusteredNetconfDevice) remoteDevice); + for(NetconfClientSessionListener listener : netconfClientSessionListeners) { + listener.onSessionUp(session); + } + } + + @Override + public void onSessionTerminated(NetconfClientSession session, NetconfTerminationReason reason) { + super.onSessionTerminated(session, reason); + ownershipListenerRegistration.close(); + for(NetconfClientSessionListener listener : netconfClientSessionListeners) { + listener.onSessionTerminated(session, reason); + } + } + + public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(NetconfClientSessionListener listener) { + netconfClientSessionListeners.add(listener); + return new NetconfClientSessionListenerRegistration(listener); + } + + public class NetconfClientSessionListenerRegistration { + + private final NetconfClientSessionListener listener; + + public NetconfClientSessionListenerRegistration(NetconfClientSessionListener listener) { + this.listener = listener; + } + + public void close() { + netconfClientSessionListeners.remove(listener); + } + } +} diff --git a/opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDevice.java b/opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDevice.java index 23cb6ad030..f6f62fe8a2 100644 --- a/opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDevice.java @@ -7,7 +7,6 @@ */ package org.opendaylight.netconf.sal.connect.netconf; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -64,7 +63,7 @@ import org.slf4j.LoggerFactory; /** * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade */ -public final class NetconfDevice implements RemoteDevice { +public class NetconfDevice implements RemoteDevice { private static final Logger LOG = LoggerFactory.getLogger(NetconfDevice.class); @@ -95,16 +94,16 @@ public final class NetconfDevice implements RemoteDevice salFacade; private final ListeningExecutorService processingExecutor; - private final SchemaSourceRegistry schemaRegistry; + protected final SchemaSourceRegistry schemaRegistry; private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; private final NotificationHandler notificationHandler; - private final List> sourceRegistrations = Lists.newArrayList(); + protected final List> sourceRegistrations = Lists.newArrayList(); // Message transformer is constructed once the schemas are available private MessageTransformer messageTransformer; @@ -214,8 +213,7 @@ public final class NetconfDevice implements RemoteDevice listener) { + protected void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator listener) { LOG.error("{}: Initialization in sal failed, disconnecting from device", id, t); listener.close(); onRemoteSessionDown(); @@ -460,7 +458,7 @@ public final class NetconfDevice implements RemoteDevice remoteDevice; + protected final RemoteDevice remoteDevice; private final Optional overrideNetconfCapabilities; - private final RemoteDeviceId id; + protected final RemoteDeviceId id; private final Lock sessionLock = new ReentrantLock(); // TODO implement concurrent message limit