9815231653ab4272fc50f3c7cad672c0b29bea3f
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / ClusteredNetconfTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.japi.Creator;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.net.InetSocketAddress;
22 import java.util.Collection;
23 import java.util.Collections;
24 import javassist.ClassPool;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
31 import org.opendaylight.controller.sal.core.api.Broker;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSessionListener;
34 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
35 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
36 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
37 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
38 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
40 import org.opendaylight.netconf.topology.NetconfTopology;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
43 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
44 import org.opendaylight.netconf.topology.TopologyManager;
45 import org.opendaylight.netconf.topology.TopologyManagerCallback;
46 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
47 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
48 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
49 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
50 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
51 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
52 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
53 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
54 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
55 import org.opendaylight.netconf.topology.util.NodeWriter;
56 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
61 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
62 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
63 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
64 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
65 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
71
72     private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
73
74     private final BindingNormalizedNodeCodecRegistry codecRegistry;
75
76     private final ActorSystem actorSystem;
77     private final EntityOwnershipService entityOwnershipService;
78     private TopologyManager topologyManager;
79
80     public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
81                                final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
82                                final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
83                                final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
84                                final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
85         super(topologyId, clientDispatcher,
86                 bindingAwareBroker, domBroker, eventExecutor,
87                 keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
88
89         final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
90         moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
91         final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
92         Preconditions.checkState(schemaContextOptional.isPresent());
93         final SchemaContext topologySchemaCtx = schemaContextOptional.get();
94
95         final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
96         codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
97         codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
98
99         this.actorSystem = actorSystem;
100         this.entityOwnershipService = entityOwnershipService;
101         registerToSal(this, this);
102         LOG.warn("Clustered netconf topo started");
103     }
104
105
106
107     @Override
108     public void onSessionInitiated(final ProviderContext session) {
109         dataBroker = session.getSALService(DataBroker.class);
110         final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
111         TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
112         LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
113         topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
114             @Override
115             public BaseTopologyManager create() throws Exception {
116                 return new BaseTopologyManager(actorSystem,
117                         codecRegistry,
118                         dataBroker,
119                         topologyId,
120                         new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
121                         new NetconfNodeOperationalDataAggregator(),
122                         new LoggingSalNodeWriter(writer),
123                         new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
124             }
125         }), topologyId);
126     }
127
128     @Override
129     public void close() throws Exception {
130         // close all existing connectors, delete whole topology in datastore?
131         for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
132             connectorDTO.getCommunicator().close();
133         }
134         activeConnectors.clear();
135     }
136
137     @Override
138     protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
139                                                            final NetconfNode node) {
140         //setup default values since default value is not supported in mdsal
141         final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
142         final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
143         final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
144
145         IpAddress ipAddress = node.getHost().getIpAddress();
146         InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
147                 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
148                 node.getPort().getValue());
149         RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
150
151         RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
152                 createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker);
153
154         if (keepaliveDelay > 0) {
155             LOG.warn("Adding keepalive facade, for device {}", nodeId);
156             salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay, defaultRequestTimeoutMillis);
157         }
158
159         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
160
161         final NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
162                 processingExecutor.getExecutor(), actorSystem, topologyId, nodeId.getValue(), TypedActor.context(),
163                 reconnectOnChangedSchema);
164
165         final int rpcMessageLimit =
166                 node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
167
168         if (rpcMessageLimit < 1) {
169             LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
170         }
171
172         return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService, rpcMessageLimit), salFacade);
173     }
174
175     @Override
176     protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker) {
177         return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker);
178     }
179
180     @Override
181     public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
182         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
183     }
184
185     @Override
186     public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
187         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
188     }
189
190     @Override
191     public void unregisterMountPoint(final NodeId nodeId) {
192         Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
193         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
194     }
195
196     @Override
197     public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
198         Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
199         return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
200     }
201
202     public Collection<ProviderFunctionality> getProviderFunctionality() {
203         return Collections.emptySet();
204     }
205
206     public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
207         Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
208         return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
209     }
210
211     static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
212
213         private final NetconfTopology netconfTopology;
214         private final EntityOwnershipService entityOwnershipService;
215         private final NodeWriter writer;
216
217         TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
218             this.netconfTopology = netconfTopology;
219             this.entityOwnershipService = entityOwnershipService;
220             this.writer = writer;
221         }
222
223         @Override
224         public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
225             return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
226         }
227     }
228
229     private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
230
231         private final NetconfTopology netconfTopology;
232         private final EntityOwnershipService entityOwnershipService;
233
234         NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
235             this.netconfTopology = netconfTopology;
236             this.entityOwnershipService = entityOwnershipService;
237         }
238
239         @Override
240         public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
241             return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));
242         }
243     }
244 }