2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.japi.Creator;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.net.InetSocketAddress;
22 import java.util.Collection;
23 import java.util.Collections;
24 import javassist.ClassPool;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
31 import org.opendaylight.controller.sal.core.api.Broker;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSessionListener;
34 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
35 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
36 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
37 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
38 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
40 import org.opendaylight.netconf.topology.NetconfTopology;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
43 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
44 import org.opendaylight.netconf.topology.TopologyManager;
45 import org.opendaylight.netconf.topology.TopologyManagerCallback;
46 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
47 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
48 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
49 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
50 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
51 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
52 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
53 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
54 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
55 import org.opendaylight.netconf.topology.util.NodeWriter;
56 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
61 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
62 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
63 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
64 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
65 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
70 public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
72 private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
74 private final BindingNormalizedNodeCodecRegistry codecRegistry;
76 private final ActorSystem actorSystem;
77 private final EntityOwnershipService entityOwnershipService;
78 private TopologyManager topologyManager;
80 public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
81 final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
82 final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
83 final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
84 final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
85 super(topologyId, clientDispatcher,
86 bindingAwareBroker, domBroker, eventExecutor,
87 keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
89 final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
90 moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
91 final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
92 Preconditions.checkState(schemaContextOptional.isPresent());
93 final SchemaContext topologySchemaCtx = schemaContextOptional.get();
95 final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
96 codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
97 codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
99 this.actorSystem = actorSystem;
100 this.entityOwnershipService = entityOwnershipService;
101 registerToSal(this, this);
102 LOG.warn("Clustered netconf topo started");
108 public void onSessionInitiated(final ProviderContext session) {
109 dataBroker = session.getSALService(DataBroker.class);
110 final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
111 TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
112 LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
113 topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
115 public BaseTopologyManager create() throws Exception {
116 return new BaseTopologyManager(actorSystem,
120 new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
121 new NetconfNodeOperationalDataAggregator(),
122 new LoggingSalNodeWriter(writer),
123 new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
129 public void close() throws Exception {
130 // close all existing connectors, delete whole topology in datastore?
131 for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
132 connectorDTO.getCommunicator().close();
134 activeConnectors.clear();
138 protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
139 final NetconfNode node) {
140 //setup default values since default value is not supported in mdsal
141 final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
142 final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
143 final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
145 IpAddress ipAddress = node.getHost().getIpAddress();
146 InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
147 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
148 node.getPort().getValue());
149 RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
151 RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
152 createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
154 if (keepaliveDelay > 0) {
155 LOG.warn("Adding keepalive facade, for device {}", nodeId);
156 salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
159 final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
161 final NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
162 processingExecutor.getExecutor(), actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
164 return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
168 protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
169 return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
173 public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
174 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
178 public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
179 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
183 public void unregisterMountPoint(final NodeId nodeId) {
184 Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
185 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
189 public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
190 Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
191 return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
194 public Collection<ProviderFunctionality> getProviderFunctionality() {
195 return Collections.emptySet();
198 public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
199 Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
200 return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
203 static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
205 private final NetconfTopology netconfTopology;
206 private final EntityOwnershipService entityOwnershipService;
207 private final NodeWriter writer;
209 TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
210 this.netconfTopology = netconfTopology;
211 this.entityOwnershipService = entityOwnershipService;
212 this.writer = writer;
216 public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
217 return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
221 private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
223 private final NetconfTopology netconfTopology;
224 private final EntityOwnershipService entityOwnershipService;
226 NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
227 this.netconfTopology = netconfTopology;
228 this.entityOwnershipService = entityOwnershipService;
232 public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
233 return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));