import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class);
- private static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L;
- private static final int DEFAULT_KEEPALIVE_DELAY = 0;
- private static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
- private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
- private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
- private static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
+ protected static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L;
+ protected static final int DEFAULT_KEEPALIVE_DELAY = 0;
+ protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+ protected static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
+ protected static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
+ protected static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
private static FilesystemSchemaSourceCache<YangTextSchemaSource> CACHE = null;
//keep track of already initialized repositories to avoid adding redundant listeners
protected final String topologyId;
private final NetconfClientDispatcher clientDispatcher;
protected final BindingAwareBroker bindingAwareBroker;
- private final Broker domBroker;
+ protected final Broker domBroker;
private final EventExecutor eventExecutor;
- private final ScheduledThreadPool keepaliveExecutor;
- private final ThreadPool processingExecutor;
- private final SharedSchemaRepository sharedSchemaRepository;
+ protected final ScheduledThreadPool keepaliveExecutor;
+ protected final ThreadPool processingExecutor;
+ protected final SharedSchemaRepository sharedSchemaRepository;
- private SchemaSourceRegistry schemaRegistry = null;
- private SchemaContextFactory schemaContextFactory = null;
+ protected SchemaSourceRegistry schemaRegistry = null;
+ protected SchemaContextFactory schemaContextFactory = null;
protected DOMMountPointService mountPointService = null;
protected DataBroker dataBroker = null;
return Futures.immediateFuture(null);
}
- private ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
+ protected ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
final Node configNode) {
final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
- final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(deviceCommunicator, netconfNode);
+ final NetconfClientSessionListener netconfClientSessionListener = deviceCommunicatorDTO.getSessionListener();
+ final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(netconfClientSessionListener, netconfNode);
final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
+
activeConnectors.put(nodeId, deviceCommunicatorDTO);
Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
return future;
}
- private NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
+ protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
final NetconfNode node) {
//setup default values since default value is not supported yet in mdsal
// TODO remove this when mdsal starts supporting default values
return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), salFacade);
}
- public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener, NetconfNode node) {
+ public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener, NetconfNode node) {
//setup default values since default value is not supported yet in mdsal
// TODO remove this when mdsal starts supporting default values
}
}
- protected static final class NetconfConnectorDTO {
+ protected static class NetconfConnectorDTO {
private final NetconfDeviceCommunicator communicator;
private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
- private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
+ public NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
this.communicator = communicator;
this.facade = facade;
}
public RemoteDeviceHandler<NetconfSessionPreferences> getFacade() {
return facade;
}
+
+ public NetconfClientSessionListener getSessionListener() {
+ return communicator;
+ }
}
}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.netty.util.concurrent.EventExecutor;
+import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import javassist.ClassPool;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.AbstractNetconfTopology;
import org.opendaylight.netconf.topology.NetconfTopology;
import org.opendaylight.netconf.topology.TopologyManagerCallback;
import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
import org.opendaylight.netconf.topology.util.BaseTopologyManager;
import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
import org.opendaylight.netconf.topology.util.NodeWriter;
import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
LOG.warn("Clustered netconf topo started");
}
+
+
@Override
public void onSessionInitiated(final ProviderContext session) {
dataBroker = session.getSALService(DataBroker.class);
activeConnectors.clear();
}
+ @Override
+ protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
+ final NetconfNode node) {
+ //setup default values since default value is not supported yet in mdsal
+ // TODO remove this when mdsal starts supporting default values
+ final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
+ final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
+ final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
+
+ IpAddress ipAddress = node.getHost().getIpAddress();
+ InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
+ ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
+ node.getPort().getValue());
+ RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
+
+ RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
+ createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+
+ if (keepaliveDelay > 0) {
+ LOG.warn("Adding keepalive facade, for device {}", nodeId);
+ salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
+ }
+
+ NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
+ new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+
+ NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
+ processingExecutor.getExecutor(), sharedSchemaRepository, actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
+
+ return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
+ }
+
@Override
protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
return Collections.emptySet();
}
+ public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
+ Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
+ return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
+ }
+
static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
private final NetconfTopology netconfTopology;
import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.client.NetconfClientSession;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.topology.NetconfTopology;
import org.opendaylight.netconf.topology.NodeManagerCallback;
import org.opendaylight.netconf.topology.RoleChangeStrategy;
import org.opendaylight.netconf.topology.TopologyManager;
+import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
import org.opendaylight.netconf.topology.util.BaseNodeManager;
import org.opendaylight.netconf.topology.util.BaseTopologyManager;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-public class NetconfNodeManagerCallback implements NodeManagerCallback{
+public class NetconfNodeManagerCallback implements NodeManagerCallback, NetconfClientSessionListener{
private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
private Node currentConfig;
private Node currentOperationalNode;
- private ConnectionStatusListenerRegistration registration = null;
+ private ConnectionStatusListenerRegistration connectionStatusregistration = null;
+ private NetconfClientSessionListenerRegistration sessionListener = null;
private ActorRef masterDataBrokerRef = null;
private boolean connected = false;
Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
@Override
public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
- registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
+ connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
+ sessionListener = topologyDispatcher.registerNetconfClientSessionListener(nodeId, NetconfNodeManagerCallback.this);
}
@Override
@Nonnull final Node configNode) {
// first disconnect this node
topologyDispatcher.unregisterMountPoint(nodeId);
- if (registration != null) {
- registration.close();
+
+ if (connectionStatusregistration != null) {
+ connectionStatusregistration.close();
}
topologyDispatcher.disconnectNode(nodeId);
Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
@Override
public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
- registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
+ connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
}
@Override
@Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
// cleanup and disconnect
topologyDispatcher.unregisterMountPoint(nodeId);
- if (registration != null) {
- registration.close();
+
+ if(connectionStatusregistration != null) {
+ connectionStatusregistration.close();
}
roleChangeStrategy.unregisterRoleCandidate();
return topologyDispatcher.disconnectNode(nodeId);
@Override
public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
// we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
- LOG.debug("onDeviceConnected received, registering role candidate");
connected = true;
- roleChangeStrategy.registerRoleCandidate(nodeManager);
if (!isMaster && masterDataBrokerRef != null) {
// if we're not master but one is present already, we need to register mountpoint
LOG.warn("Device connected, master already present in topology, registering mount point");
@Override
public void onDeviceDisconnected() {
// we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
- LOG.debug("onDeviceDisconnected received, unregistering role candidate");
+ LOG.debug("onDeviceDisconnected received, unregistered role candidate");
connected = false;
if (isMaster) {
// announce that master mount point is going down
// onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
}
- roleChangeStrategy.unregisterRoleCandidate();
final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
connected = false;
String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
- roleChangeStrategy.unregisterRoleCandidate();
currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
.addAugmentation(NetconfNode.class,
new NetconfNodeBuilder()
topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
}
}
+
+ @Override
+ public void onSessionUp(NetconfClientSession netconfClientSession) {
+ //NetconfClientSession is up, we can register role candidate
+ LOG.debug("Netconf client session is up, registering role candidate");
+ roleChangeStrategy.registerRoleCandidate(nodeManager);
+ }
+
+ @Override
+ public void onSessionDown(NetconfClientSession netconfClientSession, Exception e) {
+ LOG.debug("Netconf client session is down, unregistering role candidate");
+ roleChangeStrategy.unregisterRoleCandidate();
+ }
+
+ @Override
+ public void onSessionTerminated(NetconfClientSession netconfClientSession, NetconfTerminationReason netconfTerminationReason) {
+ LOG.debug("Netconf client session is down, unregistering role candidate");
+ roleChangeStrategy.unregisterRoleCandidate();
+ }
+
+ @Override
+ public void onMessage(NetconfClientSession netconfClientSession, NetconfMessage netconfMessage) {
+ //NOOP
+ }
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.dispatch.OnComplete;
+import akka.japi.Creator;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceRpc;
+import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusteredNetconfDevice extends NetconfDevice implements EntityOwnershipListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfDevice.class);
+
+ private boolean isMaster = false;
+ private NetconfDeviceCommunicator listener;
+ private NetconfSessionPreferences sessionPreferences;
+ private SchemaRepository schemaRepo;
+ private final ActorSystem actorSystem;
+ private final String topologyId;
+ private final String nodeId;
+ private final ActorContext cachedContext;
+
+ private MasterSourceProvider masterSourceProvider = null;
+ private ClusteredDeviceSourcesResolver resolver = null;
+
+ public ClusteredNetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
+ final ExecutorService globalProcessingExecutor, SchemaRepository schemaRepo, ActorSystem actorSystem, String topologyId, String nodeId,
+ ActorContext cachedContext) {
+ super(schemaResourcesDTO, id, salFacade, globalProcessingExecutor);
+ this.schemaRepo = schemaRepo;
+ this.actorSystem = actorSystem;
+ this.topologyId = topologyId;
+ this.nodeId = nodeId;
+ this.cachedContext = cachedContext;
+ }
+
+ @Override
+ public void onRemoteSessionUp(NetconfSessionPreferences remoteSessionCapabilities, NetconfDeviceCommunicator listener) {
+ LOG.warn("Node {} SessionUp, with capabilities {}", nodeId, remoteSessionCapabilities);
+ this.listener = listener;
+ this.sessionPreferences = remoteSessionCapabilities;
+ slaveSetupSchema();
+ }
+
+
+ @Override
+ protected void handleSalInitializationSuccess(SchemaContext result, NetconfSessionPreferences remoteSessionCapabilities, DOMRpcService deviceRpc) {
+ super.handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
+
+ final Set<SourceIdentifier> sourceIds = Sets.newHashSet();
+ for(ModuleIdentifier id : result.getAllModuleIdentifiers()) {
+ sourceIds.add(SourceIdentifier.create(id.getName(), (SimpleDateFormatUtil.DEFAULT_DATE_REV == id.getRevision() ? Optional.<String>absent() :
+ Optional.of(SimpleDateFormatUtil.getRevisionFormat().format(id.getRevision())))));
+ }
+
+ //TODO extract string constant to util class
+ LOG.debug("Creating master source provider");
+ masterSourceProvider = TypedActor.get(cachedContext).typedActorOf(
+ new TypedProps<>(MasterSourceProvider.class,
+ new Creator<MasterSourceProviderImpl>() {
+ @Override
+ public MasterSourceProviderImpl create() throws Exception {
+ return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
+ }
+ }), "masterSourceProvider");
+ }
+
+ @Override
+ public void onRemoteSessionDown() {
+ super.onRemoteSessionDown();
+ listener = null;
+ sessionPreferences = null;
+ if (masterSourceProvider != null) {
+ // if we have master the slave that started on this node should be already killed via PoisonPill, so stop master only now
+ LOG.debug("Stopping master source provider for node {}", nodeId);
+ TypedActor.get(actorSystem).stop(masterSourceProvider);
+ masterSourceProvider = null;
+ } else {
+ LOG.debug("Stopping slave source resolver for node {}", nodeId);
+ TypedActor.get(actorSystem).stop(resolver);
+ resolver = null;
+ }
+ }
+
+ private void slaveSetupSchema() {
+ //TODO extract string constant to util class
+ resolver = TypedActor.get(cachedContext).typedActorOf(
+ new TypedProps<>(ClusteredDeviceSourcesResolver.class,
+ new Creator<ClusteredDeviceSourcesResolverImpl>() {
+ @Override
+ public ClusteredDeviceSourcesResolverImpl create() throws Exception {
+ return new ClusteredDeviceSourcesResolverImpl(topologyId, nodeId, actorSystem, schemaRegistry, sourceRegistrations);
+ }
+ }), "clusteredDeviceSourcesResolver");
+
+
+ final FutureCallback<SchemaContext> schemaContextFuture = new FutureCallback<SchemaContext>() {
+ @Override
+ public void onSuccess(SchemaContext schemaContext) {
+ LOG.debug("{}: Schema context built successfully.", id);
+
+ final NetconfDeviceCapabilities deviceCap = sessionPreferences.getNetconfDeviceCapabilities();
+ final Set<QName> providedSourcesQnames = Sets.newHashSet();
+ for(ModuleIdentifier id : schemaContext.getAllModuleIdentifiers()) {
+ providedSourcesQnames.add(QName.create(id.getQNameModule(), id.getName()));
+ }
+
+ deviceCap.addNonModuleBasedCapabilities(sessionPreferences.getNonModuleCaps());
+ deviceCap.addCapabilities(providedSourcesQnames);
+
+ ClusteredNetconfDevice.super.handleSalInitializationSuccess(
+ schemaContext, sessionPreferences, getDeviceSpecificRpc(schemaContext, listener));
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
+ handleSalInitializationFailure(throwable, listener);
+ }
+ };
+
+ resolver.getResolvedSources().onComplete(
+ new OnComplete<Set<SourceIdentifier>>() {
+ @Override
+ public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
+ if(throwable != null) {
+ if(throwable instanceof MasterSourceProviderOnSameNodeException) {
+ //do nothing
+ } else {
+ LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
+ handleSalInitializationFailure(throwable, listener);
+ }
+ } else {
+ LOG.trace("{}: Trying to build schema context from {}", id, sourceIdentifiers);
+ Futures.addCallback(schemaContextFactory.createSchemaContext(sourceIdentifiers), schemaContextFuture);
+ }
+ }
+ }, actorSystem.dispatcher());
+ }
+
+ private NetconfDeviceRpc getDeviceSpecificRpc(SchemaContext result, RemoteDeviceCommunicator<NetconfMessage> listener) {
+ return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
+ }
+
+ @Override
+ public void ownershipChanged(EntityOwnershipChange ownershipChange) {
+ LOG.debug("Entity ownership change received {}", ownershipChange);
+ if(ownershipChange.isOwner()) {
+ super.onRemoteSessionUp(sessionPreferences, listener);
+ } else if (ownershipChange.wasOwner()) {
+ slaveSetupSchema();
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import java.util.ArrayList;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.client.NetconfClientSession;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+
+public class ClusteredNetconfDeviceCommunicator extends NetconfDeviceCommunicator {
+
+ private final EntityOwnershipService ownershipService;
+
+ private final ArrayList<NetconfClientSessionListener> netconfClientSessionListeners = new ArrayList<>();
+ private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
+
+ public ClusteredNetconfDeviceCommunicator(RemoteDeviceId id, NetconfDevice remoteDevice, EntityOwnershipService ownershipService) {
+ super(id, remoteDevice);
+ this.ownershipService = ownershipService;
+ }
+
+ @Override
+ public void onMessage(NetconfClientSession session, NetconfMessage message) {
+ super.onMessage(session, message);
+ for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+ listener.onMessage(session, message);
+ }
+ }
+
+ @Override
+ public void onSessionDown(NetconfClientSession session, Exception e) {
+ super.onSessionDown(session, e);
+ ownershipListenerRegistration.close();
+ for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+ listener.onSessionDown(session, e);
+ }
+ }
+
+ @Override
+ public void onSessionUp(NetconfClientSession session) {
+ super.onSessionUp(session);
+ ownershipListenerRegistration = ownershipService.registerListener("netconf-node/" + id.getName(), (ClusteredNetconfDevice) remoteDevice);
+ for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+ listener.onSessionUp(session);
+ }
+ }
+
+ @Override
+ public void onSessionTerminated(NetconfClientSession session, NetconfTerminationReason reason) {
+ super.onSessionTerminated(session, reason);
+ ownershipListenerRegistration.close();
+ for(NetconfClientSessionListener listener : netconfClientSessionListeners) {
+ listener.onSessionTerminated(session, reason);
+ }
+ }
+
+ public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(NetconfClientSessionListener listener) {
+ netconfClientSessionListeners.add(listener);
+ return new NetconfClientSessionListenerRegistration(listener);
+ }
+
+ public class NetconfClientSessionListenerRegistration {
+
+ private final NetconfClientSessionListener listener;
+
+ public NetconfClientSessionListenerRegistration(NetconfClientSessionListener listener) {
+ this.listener = listener;
+ }
+
+ public void close() {
+ netconfClientSessionListeners.remove(listener);
+ }
+ }
+}
*/
package org.opendaylight.netconf.sal.connect.netconf;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
/**
* This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
*/
-public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
+public class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
private static final Logger LOG = LoggerFactory.getLogger(NetconfDevice.class);
}
};
- private final RemoteDeviceId id;
+ protected final RemoteDeviceId id;
private final boolean reconnectOnSchemasChange;
- private final SchemaContextFactory schemaContextFactory;
+ protected final SchemaContextFactory schemaContextFactory;
private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
private final ListeningExecutorService processingExecutor;
- private final SchemaSourceRegistry schemaRegistry;
+ protected final SchemaSourceRegistry schemaRegistry;
private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
private final NotificationHandler notificationHandler;
- private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
+ protected final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
// Message transformer is constructed once the schemas are available
private MessageTransformer<NetconfMessage> messageTransformer;
return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
}
- @VisibleForTesting
- void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
+ protected void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
messageTransformer = new NetconfMessageTransformer(result, true);
updateTransformer(messageTransformer);
LOG.info("{}: Netconf connector initialized successfully", id);
}
- private void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ protected void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
LOG.error("{}: Initialization in sal failed, disconnecting from device", id, t);
listener.close();
onRemoteSessionDown();
Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback);
}
- private NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
+ protected NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
}
private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
- private final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
+ protected final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
private final Optional<NetconfSessionPreferences> overrideNetconfCapabilities;
- private final RemoteDeviceId id;
+ protected final RemoteDeviceId id;
private final Lock sessionLock = new ReentrantLock();
// TODO implement concurrent message limit