More netconf-topology module unit tests
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / ClusteredNetconfTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.japi.Creator;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.net.InetSocketAddress;
22 import java.util.Collection;
23 import java.util.Collections;
24 import javassist.ClassPool;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
31 import org.opendaylight.controller.sal.core.api.Broker;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSessionListener;
34 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
35 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
36 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
37 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
38 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
40 import org.opendaylight.netconf.topology.NetconfTopology;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
43 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
44 import org.opendaylight.netconf.topology.TopologyManager;
45 import org.opendaylight.netconf.topology.TopologyManagerCallback;
46 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
47 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
48 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
49 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
50 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
51 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
52 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
53 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
54 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
55 import org.opendaylight.netconf.topology.util.NodeWriter;
56 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
61 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
62 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
63 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
64 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
65 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
71
72     private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
73
74     private final BindingNormalizedNodeCodecRegistry codecRegistry;
75
76     private final ActorSystem actorSystem;
77     private final EntityOwnershipService entityOwnershipService;
78     private TopologyManager topologyManager;
79
80     public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
81                                final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
82                                final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
83                                final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
84                                final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
85         super(topologyId, clientDispatcher,
86                 bindingAwareBroker, domBroker, eventExecutor,
87                 keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
88
89         final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
90         moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
91         final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
92         Preconditions.checkState(schemaContextOptional.isPresent());
93         final SchemaContext topologySchemaCtx = schemaContextOptional.get();
94
95         final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
96         codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
97         codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
98
99         this.actorSystem = actorSystem;
100         this.entityOwnershipService = entityOwnershipService;
101         registerToSal(this, this);
102         LOG.warn("Clustered netconf topo started");
103     }
104
105
106
107     @Override
108     public void onSessionInitiated(final ProviderContext session) {
109         dataBroker = session.getSALService(DataBroker.class);
110         final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
111         TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
112         LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
113         topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
114             @Override
115             public BaseTopologyManager create() throws Exception {
116                 return new BaseTopologyManager(actorSystem,
117                         codecRegistry,
118                         dataBroker,
119                         topologyId,
120                         new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
121                         new NetconfNodeOperationalDataAggregator(),
122                         new LoggingSalNodeWriter(writer),
123                         new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
124             }
125         }), topologyId);
126     }
127
128     @Override
129     public void close() throws Exception {
130         // close all existing connectors, delete whole topology in datastore?
131         for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
132             connectorDTO.getCommunicator().close();
133         }
134         activeConnectors.clear();
135     }
136
137     @Override
138     protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
139                                                            final NetconfNode node) {
140         //setup default values since default value is not supported yet in mdsal
141         // TODO remove this when mdsal starts supporting default values
142         final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
143         final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
144         final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
145
146         IpAddress ipAddress = node.getHost().getIpAddress();
147         InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
148                 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
149                 node.getPort().getValue());
150         RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
151
152         RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
153                 createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
154
155         if (keepaliveDelay > 0) {
156             LOG.warn("Adding keepalive facade, for device {}", nodeId);
157             salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
158         }
159
160         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
161
162         final NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
163                 processingExecutor.getExecutor(), actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
164
165         return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
166     }
167
168     @Override
169     protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
170         return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
171     }
172
173     @Override
174     public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
175         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
176     }
177
178     @Override
179     public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
180         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
181     }
182
183     @Override
184     public void unregisterMountPoint(final NodeId nodeId) {
185         Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
186         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
187     }
188
189     @Override
190     public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
191         Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
192         return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
193     }
194
195     public Collection<ProviderFunctionality> getProviderFunctionality() {
196         return Collections.emptySet();
197     }
198
199     public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
200         Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
201         return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
202     }
203
204     static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
205
206         private final NetconfTopology netconfTopology;
207         private final EntityOwnershipService entityOwnershipService;
208         private final NodeWriter writer;
209
210         TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
211             this.netconfTopology = netconfTopology;
212             this.entityOwnershipService = entityOwnershipService;
213             this.writer = writer;
214         }
215
216         @Override
217         public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
218             return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
219         }
220     }
221
222     private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
223
224         private final NetconfTopology netconfTopology;
225         private final EntityOwnershipService entityOwnershipService;
226
227         NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
228             this.netconfTopology = netconfTopology;
229             this.entityOwnershipService = entityOwnershipService;
230         }
231
232         @Override
233         public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
234             return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));
235         }
236     }
237 }