2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.japi.Creator;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.util.Collection;
22 import java.util.Collections;
23 import javassist.ClassPool;
24 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
25 import org.opendaylight.controller.config.threadpool.ThreadPool;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
28 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
30 import org.opendaylight.controller.sal.core.api.Broker;
31 import org.opendaylight.netconf.client.NetconfClientDispatcher;
32 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
33 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
34 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
35 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
36 import org.opendaylight.netconf.topology.NetconfTopology;
37 import org.opendaylight.netconf.topology.NodeManagerCallback;
38 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
39 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
40 import org.opendaylight.netconf.topology.TopologyManager;
41 import org.opendaylight.netconf.topology.TopologyManagerCallback;
42 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
43 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
44 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
45 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
46 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
47 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
48 import org.opendaylight.netconf.topology.util.NodeWriter;
49 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
52 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
53 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
54 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
55 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
56 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
57 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
63 private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
65 private final BindingNormalizedNodeCodecRegistry codecRegistry;
67 private final ActorSystem actorSystem;
68 private final EntityOwnershipService entityOwnershipService;
69 private TopologyManager topologyManager;
71 public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
72 final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
73 final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
74 final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
75 final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
76 super(topologyId, clientDispatcher,
77 bindingAwareBroker, domBroker, eventExecutor,
78 keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
80 final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
81 moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
82 final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
83 Preconditions.checkState(schemaContextOptional.isPresent());
84 final SchemaContext topologySchemaCtx = schemaContextOptional.get();
86 final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
87 codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
88 codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
90 this.actorSystem = actorSystem;
91 this.entityOwnershipService = entityOwnershipService;
92 registerToSal(this, this);
93 LOG.warn("Clustered netconf topo started");
97 public void onSessionInitiated(final ProviderContext session) {
98 dataBroker = session.getSALService(DataBroker.class);
99 final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
100 TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
101 LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
102 topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
104 public BaseTopologyManager create() throws Exception {
105 return new BaseTopologyManager(actorSystem,
109 new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
110 new NetconfNodeOperationalDataAggregator(),
111 new LoggingSalNodeWriter(writer),
112 new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
118 public void close() throws Exception {
119 // close all existing connectors, delete whole topology in datastore?
120 for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
121 connectorDTO.getCommunicator().close();
123 activeConnectors.clear();
127 protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
128 return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
132 public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
133 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
137 public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
138 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
142 public void unregisterMountPoint(final NodeId nodeId) {
143 Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
144 ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
148 public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
149 Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
150 return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
154 public Collection<ProviderFunctionality> getProviderFunctionality() {
155 return Collections.emptySet();
158 static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
160 private final NetconfTopology netconfTopology;
161 private final EntityOwnershipService entityOwnershipService;
162 private final NodeWriter writer;
164 TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
165 this.netconfTopology = netconfTopology;
166 this.entityOwnershipService = entityOwnershipService;
167 this.writer = writer;
171 public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
172 return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
176 private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
178 private final NetconfTopology netconfTopology;
179 private final EntityOwnershipService entityOwnershipService;
181 NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
182 this.netconfTopology = netconfTopology;
183 this.entityOwnershipService = entityOwnershipService;
187 public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
188 return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));