2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.japi.Creator;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.net.InetSocketAddress;
22 import java.util.Collection;
23 import java.util.Collections;
24 import javassist.ClassPool;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
31 import org.opendaylight.controller.sal.core.api.Broker;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSessionListener;
34 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
35 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
36 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
37 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
38 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
40 import org.opendaylight.netconf.topology.NetconfTopology;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
43 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
44 import org.opendaylight.netconf.topology.TopologyManager;
45 import org.opendaylight.netconf.topology.TopologyManagerCallback;
46 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
47 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
48 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
49 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
50 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
51 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
52 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
53 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
54 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
55 import org.opendaylight.netconf.topology.util.NodeWriter;
56 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
61 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
62 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
63 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
64 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
65 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
70 public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
72 private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
74 private final BindingNormalizedNodeCodecRegistry codecRegistry;
76 private final ActorSystem actorSystem;
77 private final EntityOwnershipService entityOwnershipService;
78 private TopologyManager topologyManager;
80 public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
81 final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
82 final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
83 final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
84 final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
85 super(topologyId, clientDispatcher,
86 bindingAwareBroker, domBroker, eventExecutor,
87 keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
89 final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
90 moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
91 final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
92 Preconditions.checkState(schemaContextOptional.isPresent());
93 final SchemaContext topologySchemaCtx = schemaContextOptional.get();
95 final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
96 codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
97 codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
99 this.actorSystem = actorSystem;
100 this.entityOwnershipService = entityOwnershipService;
101 registerToSal(this, this);
102 LOG.warn("Clustered netconf topo started");
108 public void onSessionInitiated(final ProviderContext session) {
109 dataBroker = session.getSALService(DataBroker.class);
110 final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
111 TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
112 LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
113 topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
115 public BaseTopologyManager create() throws Exception {
116 return new BaseTopologyManager(actorSystem,
120 new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
121 new NetconfNodeOperationalDataAggregator(),
122 new LoggingSalNodeWriter(writer),
123 new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
129 public void close() throws Exception {
130 // close all existing connectors, delete whole topology in datastore?
131 for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
132 connectorDTO.getCommunicator().close();
134 activeConnectors.clear();
138 protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
139 final NetconfNode node) {
140 //setup default values since default value is not supported in mdsal
141 final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
142 final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
143 final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
145 IpAddress ipAddress = node.getHost().getIpAddress();
146 InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
147 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
148 node.getPort().getValue());
149 RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
151 RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
152 createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker);
154 if (keepaliveDelay > 0) {
155 LOG.warn("Adding keepalive facade, for device {}", nodeId);
156 salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay, defaultRequestTimeoutMillis);
159 final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
161 final NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
162 processingExecutor.getExecutor(), actorSystem, topologyId, nodeId.getValue(), TypedActor.context(),
163 reconnectOnChangedSchema);
165 final int rpcMessageLimit =
166 node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
168 if (rpcMessageLimit < 1) {
169 LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
172 return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService, rpcMessageLimit), salFacade);
176 protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker) {
177 return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker);
181 public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
182 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
186 public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
187 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
191 public void unregisterMountPoint(final NodeId nodeId) {
192 Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
193 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
197 public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
198 Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
199 return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
202 public Collection<ProviderFunctionality> getProviderFunctionality() {
203 return Collections.emptySet();
206 public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
207 Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
208 return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
211 static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
213 private final NetconfTopology netconfTopology;
214 private final EntityOwnershipService entityOwnershipService;
215 private final NodeWriter writer;
217 TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
218 this.netconfTopology = netconfTopology;
219 this.entityOwnershipService = entityOwnershipService;
220 this.writer = writer;
224 public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
225 return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
229 private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
231 private final NetconfTopology netconfTopology;
232 private final EntityOwnershipService entityOwnershipService;
234 NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
235 this.netconfTopology = netconfTopology;
236 this.entityOwnershipService = entityOwnershipService;
240 public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
241 return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));