1612c7d9fccb23af2e7238145a91d232bc722772
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / ClusteredNetconfTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.japi.Creator;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.net.InetSocketAddress;
22 import java.util.Collection;
23 import java.util.Collections;
24 import javassist.ClassPool;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
31 import org.opendaylight.controller.sal.core.api.Broker;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSessionListener;
34 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
35 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
36 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
37 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
38 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
40 import org.opendaylight.netconf.topology.NetconfTopology;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
43 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
44 import org.opendaylight.netconf.topology.TopologyManager;
45 import org.opendaylight.netconf.topology.TopologyManagerCallback;
46 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
47 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
48 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
49 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
50 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
51 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
52 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
53 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
54 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
55 import org.opendaylight.netconf.topology.util.NodeWriter;
56 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
61 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
62 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
63 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
64 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
65 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
71
72     private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
73
74     private final BindingNormalizedNodeCodecRegistry codecRegistry;
75
76     private final ActorSystem actorSystem;
77     private final EntityOwnershipService entityOwnershipService;
78     private TopologyManager topologyManager;
79
80     public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
81                                final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
82                                final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
83                                final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
84                                final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
85         super(topologyId, clientDispatcher,
86                 bindingAwareBroker, domBroker, eventExecutor,
87                 keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
88
89         final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
90         moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
91         final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
92         Preconditions.checkState(schemaContextOptional.isPresent());
93         final SchemaContext topologySchemaCtx = schemaContextOptional.get();
94
95         final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
96         codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
97         codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
98
99         this.actorSystem = actorSystem;
100         this.entityOwnershipService = entityOwnershipService;
101         registerToSal(this, this);
102         LOG.warn("Clustered netconf topo started");
103     }
104
105
106
107     @Override
108     public void onSessionInitiated(final ProviderContext session) {
109         dataBroker = session.getSALService(DataBroker.class);
110         final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
111         TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
112         LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
113         topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
114             @Override
115             public BaseTopologyManager create() throws Exception {
116                 return new BaseTopologyManager(actorSystem,
117                         codecRegistry,
118                         dataBroker,
119                         topologyId,
120                         new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
121                         new NetconfNodeOperationalDataAggregator(),
122                         new LoggingSalNodeWriter(writer),
123                         new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
124             }
125         }), topologyId);
126     }
127
128     @Override
129     public void close() throws Exception {
130         // close all existing connectors, delete whole topology in datastore?
131         for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
132             connectorDTO.getCommunicator().close();
133         }
134         activeConnectors.clear();
135     }
136
137     @Override
138     protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
139                                                            final NetconfNode node) {
140         //setup default values since default value is not supported in mdsal
141         final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
142         final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
143         final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
144
145         IpAddress ipAddress = node.getHost().getIpAddress();
146         InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
147                 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
148                 node.getPort().getValue());
149         RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
150
151         RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
152                 createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker);
153
154         if (keepaliveDelay > 0) {
155             LOG.warn("Adding keepalive facade, for device {}", nodeId);
156             salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay, defaultRequestTimeoutMillis);
157         }
158
159         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
160
161         final NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
162                 processingExecutor.getExecutor(), actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
163
164         return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
165     }
166
167     @Override
168     protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker) {
169         return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker);
170     }
171
172     @Override
173     public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
174         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
175     }
176
177     @Override
178     public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
179         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
180     }
181
182     @Override
183     public void unregisterMountPoint(final NodeId nodeId) {
184         Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
185         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
186     }
187
188     @Override
189     public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
190         Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
191         return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
192     }
193
194     public Collection<ProviderFunctionality> getProviderFunctionality() {
195         return Collections.emptySet();
196     }
197
198     public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
199         Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
200         return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
201     }
202
203     static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
204
205         private final NetconfTopology netconfTopology;
206         private final EntityOwnershipService entityOwnershipService;
207         private final NodeWriter writer;
208
209         TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
210             this.netconfTopology = netconfTopology;
211             this.entityOwnershipService = entityOwnershipService;
212             this.writer = writer;
213         }
214
215         @Override
216         public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
217             return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
218         }
219     }
220
221     private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
222
223         private final NetconfTopology netconfTopology;
224         private final EntityOwnershipService entityOwnershipService;
225
226         NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
227             this.netconfTopology = netconfTopology;
228             this.entityOwnershipService = entityOwnershipService;
229         }
230
231         @Override
232         public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
233             return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));
234         }
235     }
236 }