6b64d9513e93110b9ff51326ab6c8477f405bfe3
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / ClusteredNetconfTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.japi.Creator;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.net.InetSocketAddress;
22 import java.util.Collection;
23 import java.util.Collections;
24 import javassist.ClassPool;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
31 import org.opendaylight.controller.sal.core.api.Broker;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSessionListener;
34 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
35 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
36 import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
37 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
38 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
39 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
40 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
41 import org.opendaylight.netconf.topology.NetconfTopology;
42 import org.opendaylight.netconf.topology.NodeManagerCallback;
43 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
44 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
45 import org.opendaylight.netconf.topology.TopologyManager;
46 import org.opendaylight.netconf.topology.TopologyManagerCallback;
47 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
48 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
49 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDevice;
50 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator;
51 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
52 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
53 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
54 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
55 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
56 import org.opendaylight.netconf.topology.util.NodeWriter;
57 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
58 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
62 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
63 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
64 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
65 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
66 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
67 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
72
73     private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
74
75     private final BindingNormalizedNodeCodecRegistry codecRegistry;
76
77     private final ActorSystem actorSystem;
78     private final EntityOwnershipService entityOwnershipService;
79     private TopologyManager topologyManager;
80
81     public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
82                                final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
83                                final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
84                                final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
85                                final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
86         super(topologyId, clientDispatcher,
87                 bindingAwareBroker, domBroker, eventExecutor,
88                 keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
89
90         final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
91         moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
92         final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
93         Preconditions.checkState(schemaContextOptional.isPresent());
94         final SchemaContext topologySchemaCtx = schemaContextOptional.get();
95
96         final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
97         codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
98         codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
99
100         this.actorSystem = actorSystem;
101         this.entityOwnershipService = entityOwnershipService;
102         registerToSal(this, this);
103         LOG.warn("Clustered netconf topo started");
104     }
105
106
107
108     @Override
109     public void onSessionInitiated(final ProviderContext session) {
110         dataBroker = session.getSALService(DataBroker.class);
111         final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
112         TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
113         LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
114         topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
115             @Override
116             public BaseTopologyManager create() throws Exception {
117                 return new BaseTopologyManager(actorSystem,
118                         codecRegistry,
119                         dataBroker,
120                         topologyId,
121                         new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
122                         new NetconfNodeOperationalDataAggregator(),
123                         new LoggingSalNodeWriter(writer),
124                         new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
125             }
126         }), topologyId);
127     }
128
129     @Override
130     public void close() throws Exception {
131         // close all existing connectors, delete whole topology in datastore?
132         for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
133             connectorDTO.getCommunicator().close();
134         }
135         activeConnectors.clear();
136     }
137
138     @Override
139     protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
140                                                            final NetconfNode node) {
141         //setup default values since default value is not supported yet in mdsal
142         // TODO remove this when mdsal starts supporting default values
143         final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
144         final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
145         final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
146
147         IpAddress ipAddress = node.getHost().getIpAddress();
148         InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
149                 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
150                 node.getPort().getValue());
151         RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
152
153         RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
154                 createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
155
156         if (keepaliveDelay > 0) {
157             LOG.warn("Adding keepalive facade, for device {}", nodeId);
158             salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
159         }
160
161         NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
162                 new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
163
164         NetconfDevice device = new ClusteredNetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
165                 processingExecutor.getExecutor(), sharedSchemaRepository, actorSystem, topologyId, nodeId.getValue(), TypedActor.context());
166
167         return new NetconfConnectorDTO(new ClusteredNetconfDeviceCommunicator(remoteDeviceId, device, entityOwnershipService), salFacade);
168     }
169
170     @Override
171     protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
172         return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
173     }
174
175     @Override
176     public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
177         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
178     }
179
180     @Override
181     public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
182         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
183     }
184
185     @Override
186     public void unregisterMountPoint(final NodeId nodeId) {
187         Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
188         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
189     }
190
191     @Override
192     public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
193         Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
194         return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
195     }
196
197     @Override
198     public Collection<ProviderFunctionality> getProviderFunctionality() {
199         return Collections.emptySet();
200     }
201
202     public NetconfClientSessionListenerRegistration registerNetconfClientSessionListener(final NodeId node, final NetconfClientSessionListener listener) {
203         Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a session listener can be registered");
204         return ((ClusteredNetconfDeviceCommunicator) activeConnectors.get(node).getCommunicator()).registerNetconfClientSessionListener(listener);
205     }
206
207     static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
208
209         private final NetconfTopology netconfTopology;
210         private final EntityOwnershipService entityOwnershipService;
211         private final NodeWriter writer;
212
213         TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
214             this.netconfTopology = netconfTopology;
215             this.entityOwnershipService = entityOwnershipService;
216             this.writer = writer;
217         }
218
219         @Override
220         public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
221             return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
222         }
223     }
224
225     private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
226
227         private final NetconfTopology netconfTopology;
228         private final EntityOwnershipService entityOwnershipService;
229
230         NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
231             this.netconfTopology = netconfTopology;
232             this.entityOwnershipService = entityOwnershipService;
233         }
234
235         @Override
236         public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
237             return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));
238         }
239     }
240 }