2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.japi.Creator;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.net.InetSocketAddress;
22 import java.util.Collection;
23 import java.util.Collections;
24 import javassist.ClassPool;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
31 import org.opendaylight.controller.sal.core.api.Broker;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSessionListener;
34 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
35 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
36 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
37 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
38 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
40 import org.opendaylight.netconf.topology.NetconfTopology;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
43 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
44 import org.opendaylight.netconf.topology.TopologyManager;
45 import org.opendaylight.netconf.topology.TopologyManagerCallback;
46 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
47 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
48 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
49 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
50 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
51 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
52 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
53 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
54 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
55 import org.opendaylight.netconf.topology.util.NodeWriter;
56 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
61 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
62 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
63 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
64 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
65 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
70 public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
72 private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
74 private final BindingNormalizedNodeCodecRegistry codecRegistry;
76 private final ActorSystem actorSystem;
77 private final EntityOwnershipService entityOwnershipService;
78 private TopologyManager topologyManager;
80 public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
81 final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
82 final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
83 final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
84 final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
85 super(topologyId, clientDispatcher,
86 bindingAwareBroker, domBroker, eventExecutor,
87 keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
89 final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
90 moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
91 final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
92 Preconditions.checkState(schemaContextOptional.isPresent());
93 final SchemaContext topologySchemaCtx = schemaContextOptional.get();
95 final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
96 codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
97 codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
99 this.actorSystem = actorSystem;
100 this.entityOwnershipService = entityOwnershipService;
101 registerToSal(this, this);
102 LOG.warn("Clustered netconf topo started");
108 public void onSessionInitiated(final ProviderContext session) {
109 dataBroker = session.getSALService(DataBroker.class);
110 final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
111 TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
112 LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
113 topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
115 public BaseTopologyManager create() throws Exception {
116 return new BaseTopologyManager(actorSystem,
120 new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
121 new NetconfNodeOperationalDataAggregator(),
122 new LoggingSalNodeWriter(writer),
123 new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
129 public void close() throws Exception {
130 // close all existing connectors, delete whole topology in datastore?
131 for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
132 connectorDTO.getCommunicator().close();
134 activeConnectors.clear();
138 protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
139 final NetconfNode node) {
140 //setup default values since default value is not supported yet in mdsal
141 // TODO remove this when mdsal starts supporting default values
142 final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
143 final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
144 final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
146 IpAddress ipAddress = node.getHost().getIpAddress();
147 InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
148 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
149 node.getPort().getValue());
150 RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
152 RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
153 createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
155 if (keepaliveDelay > 0) {
156 LOG.warn("Adding keepalive facade, for device {}", nodeId);
157 salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
160 final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
162 final NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
163 processingExecutor.getExecutor(), actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
165 return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
169 protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
170 return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
174 public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
175 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
179 public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
180 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
184 public void unregisterMountPoint(final NodeId nodeId) {
185 Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
186 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
190 public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
191 Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
192 return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
195 public Collection<ProviderFunctionality> getProviderFunctionality() {
196 return Collections.emptySet();
199 public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
200 Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
201 return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
204 static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
206 private final NetconfTopology netconfTopology;
207 private final EntityOwnershipService entityOwnershipService;
208 private final NodeWriter writer;
210 TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
211 this.netconfTopology = netconfTopology;
212 this.entityOwnershipService = entityOwnershipService;
213 this.writer = writer;
217 public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
218 return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
222 private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
224 private final NetconfTopology netconfTopology;
225 private final EntityOwnershipService entityOwnershipService;
227 NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
228 this.netconfTopology = netconfTopology;
229 this.entityOwnershipService = entityOwnershipService;
233 public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
234 return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));