2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.japi.Creator;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.net.InetSocketAddress;
22 import java.util.Collection;
23 import java.util.Collections;
24 import javassist.ClassPool;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
31 import org.opendaylight.controller.sal.core.api.Broker;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSessionListener;
34 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
35 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
36 import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
37 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
38 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
39 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
40 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
41 import org.opendaylight.netconf.topology.NetconfTopology;
42 import org.opendaylight.netconf.topology.NodeManagerCallback;
43 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
44 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
45 import org.opendaylight.netconf.topology.TopologyManager;
46 import org.opendaylight.netconf.topology.TopologyManagerCallback;
47 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
48 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
49 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
50 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
51 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
52 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
53 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
54 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
55 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
56 import org.opendaylight.netconf.topology.util.NodeWriter;
57 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
58 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
62 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
63 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
64 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
65 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
66 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
67 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
71 public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
73 private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
75 private final BindingNormalizedNodeCodecRegistry codecRegistry;
77 private final ActorSystem actorSystem;
78 private final EntityOwnershipService entityOwnershipService;
79 private TopologyManager topologyManager;
81 public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
82 final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
83 final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
84 final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
85 final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
86 super(topologyId, clientDispatcher,
87 bindingAwareBroker, domBroker, eventExecutor,
88 keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
90 final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
91 moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
92 final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
93 Preconditions.checkState(schemaContextOptional.isPresent());
94 final SchemaContext topologySchemaCtx = schemaContextOptional.get();
96 final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
97 codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
98 codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
100 this.actorSystem = actorSystem;
101 this.entityOwnershipService = entityOwnershipService;
102 registerToSal(this, this);
103 LOG.warn("Clustered netconf topo started");
109 public void onSessionInitiated(final ProviderContext session) {
110 dataBroker = session.getSALService(DataBroker.class);
111 final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
112 TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
113 LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
114 topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
116 public BaseTopologyManager create() throws Exception {
117 return new BaseTopologyManager(actorSystem,
121 new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
122 new NetconfNodeOperationalDataAggregator(),
123 new LoggingSalNodeWriter(writer),
124 new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
130 public void close() throws Exception {
131 // close all existing connectors, delete whole topology in datastore?
132 for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
133 connectorDTO.getCommunicator().close();
135 activeConnectors.clear();
139 protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
140 final NetconfNode node) {
141 //setup default values since default value is not supported yet in mdsal
142 // TODO remove this when mdsal starts supporting default values
143 final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
144 final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
145 final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
147 IpAddress ipAddress = node.getHost().getIpAddress();
148 InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
149 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
150 node.getPort().getValue());
151 RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
153 RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
154 createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
156 if (keepaliveDelay > 0) {
157 LOG.warn("Adding keepalive facade, for device {}", nodeId);
158 salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
161 NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
162 new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
164 NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
165 processingExecutor.getExecutor(), sharedSchemaRepository, actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
167 return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
171 protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
172 return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
176 public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
177 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
181 public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
182 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
186 public void unregisterMountPoint(final NodeId nodeId) {
187 Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
188 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
192 public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
193 Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
194 return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
198 public Collection<ProviderFunctionality> getProviderFunctionality() {
199 return Collections.emptySet();
202 public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
203 Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
204 return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
207 static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
209 private final NetconfTopology netconfTopology;
210 private final EntityOwnershipService entityOwnershipService;
211 private final NodeWriter writer;
213 TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
214 this.netconfTopology = netconfTopology;
215 this.entityOwnershipService = entityOwnershipService;
216 this.writer = writer;
220 public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
221 return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
225 private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
227 private final NetconfTopology netconfTopology;
228 private final EntityOwnershipService entityOwnershipService;
230 NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
231 this.netconfTopology = netconfTopology;
232 this.entityOwnershipService = entityOwnershipService;
236 public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
237 return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));