Bug 4940 - correctly implement default-request-timeout-millis
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / ClusteredNetconfTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.japi.Creator;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.net.InetSocketAddress;
22 import java.util.Collection;
23 import java.util.Collections;
24 import javassist.ClassPool;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
31 import org.opendaylight.controller.sal.core.api.Broker;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSessionListener;
34 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
35 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
36 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
37 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
38 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
40 import org.opendaylight.netconf.topology.NetconfTopology;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
43 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
44 import org.opendaylight.netconf.topology.TopologyManager;
45 import org.opendaylight.netconf.topology.TopologyManagerCallback;
46 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
47 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
48 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
49 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
50 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
51 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
52 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
53 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
54 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
55 import org.opendaylight.netconf.topology.util.NodeWriter;
56 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
61 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
62 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
63 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
64 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
65 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
71
72     private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
73
74     private final BindingNormalizedNodeCodecRegistry codecRegistry;
75
76     private final ActorSystem actorSystem;
77     private final EntityOwnershipService entityOwnershipService;
78     private TopologyManager topologyManager;
79
80     public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
81                                final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
82                                final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
83                                final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
84                                final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
85         super(topologyId, clientDispatcher,
86                 bindingAwareBroker, domBroker, eventExecutor,
87                 keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
88
89         final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
90         moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
91         final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
92         Preconditions.checkState(schemaContextOptional.isPresent());
93         final SchemaContext topologySchemaCtx = schemaContextOptional.get();
94
95         final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
96         codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
97         codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
98
99         this.actorSystem = actorSystem;
100         this.entityOwnershipService = entityOwnershipService;
101         registerToSal(this, this);
102         LOG.warn("Clustered netconf topo started");
103     }
104
105
106
107     @Override
108     public void onSessionInitiated(final ProviderContext session) {
109         dataBroker = session.getSALService(DataBroker.class);
110         final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
111         TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
112         LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
113         topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
114             @Override
115             public BaseTopologyManager create() throws Exception {
116                 return new BaseTopologyManager(actorSystem,
117                         codecRegistry,
118                         dataBroker,
119                         topologyId,
120                         new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
121                         new NetconfNodeOperationalDataAggregator(),
122                         new LoggingSalNodeWriter(writer),
123                         new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
124             }
125         }), topologyId);
126     }
127
128     @Override
129     public void close() throws Exception {
130         // close all existing connectors, delete whole topology in datastore?
131         for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
132             connectorDTO.getCommunicator().close();
133         }
134         activeConnectors.clear();
135     }
136
137     @Override
138     protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
139                                                            final NetconfNode node) {
140         //setup default values since default value is not supported yet in mdsal
141         // TODO remove this when mdsal starts supporting default values
142         final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
143         final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
144         final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
145
146         IpAddress ipAddress = node.getHost().getIpAddress();
147         InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
148                 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
149                 node.getPort().getValue());
150         RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
151
152         RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
153                 createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker);
154
155         if (keepaliveDelay > 0) {
156             LOG.warn("Adding keepalive facade, for device {}", nodeId);
157             salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay, defaultRequestTimeoutMillis);
158         }
159
160         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
161
162         final NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
163                 processingExecutor.getExecutor(), actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
164
165         return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
166     }
167
168     @Override
169     protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker) {
170         return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker);
171     }
172
173     @Override
174     public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
175         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
176     }
177
178     @Override
179     public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
180         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
181     }
182
183     @Override
184     public void unregisterMountPoint(final NodeId nodeId) {
185         Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
186         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
187     }
188
189     @Override
190     public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
191         Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
192         return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
193     }
194
195     @Override
196     public Collection<ProviderFunctionality> getProviderFunctionality() {
197         return Collections.emptySet();
198     }
199
200     public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
201         Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
202         return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
203     }
204
205     static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
206
207         private final NetconfTopology netconfTopology;
208         private final EntityOwnershipService entityOwnershipService;
209         private final NodeWriter writer;
210
211         TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
212             this.netconfTopology = netconfTopology;
213             this.entityOwnershipService = entityOwnershipService;
214             this.writer = writer;
215         }
216
217         @Override
218         public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
219             return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
220         }
221     }
222
223     private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
224
225         private final NetconfTopology netconfTopology;
226         private final EntityOwnershipService entityOwnershipService;
227
228         NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
229             this.netconfTopology = netconfTopology;
230             this.entityOwnershipService = entityOwnershipService;
231         }
232
233         @Override
234         public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
235             return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));
236         }
237     }
238 }