Prevent NPE's on failures
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / ClusteredNetconfTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.japi.Creator;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import io.netty.util.concurrent.EventExecutor;
21 import java.util.Collection;
22 import java.util.Collections;
23 import javassist.ClassPool;
24 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
25 import org.opendaylight.controller.config.threadpool.ThreadPool;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
28 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
30 import org.opendaylight.controller.sal.core.api.Broker;
31 import org.opendaylight.netconf.client.NetconfClientDispatcher;
32 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
33 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
34 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
35 import org.opendaylight.netconf.topology.AbstractNetconfTopology;
36 import org.opendaylight.netconf.topology.NetconfTopology;
37 import org.opendaylight.netconf.topology.NodeManagerCallback;
38 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
39 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
40 import org.opendaylight.netconf.topology.TopologyManager;
41 import org.opendaylight.netconf.topology.TopologyManagerCallback;
42 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
43 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
44 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
45 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
46 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
47 import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
48 import org.opendaylight.netconf.topology.util.NodeWriter;
49 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
52 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
53 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
54 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
55 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
56 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
57 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
62
63     private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
64
65     private final BindingNormalizedNodeCodecRegistry codecRegistry;
66
67     private final ActorSystem actorSystem;
68     private final EntityOwnershipService entityOwnershipService;
69     private TopologyManager topologyManager;
70
71     public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
72                                final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
73                                final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
74                                final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
75                                final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
76         super(topologyId, clientDispatcher,
77                 bindingAwareBroker, domBroker, eventExecutor,
78                 keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
79
80         final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
81         moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
82         final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
83         Preconditions.checkState(schemaContextOptional.isPresent());
84         final SchemaContext topologySchemaCtx = schemaContextOptional.get();
85
86         final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
87         codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
88         codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
89
90         this.actorSystem = actorSystem;
91         this.entityOwnershipService = entityOwnershipService;
92         registerToSal(this, this);
93         LOG.warn("Clustered netconf topo started");
94     }
95
96     @Override
97     public void onSessionInitiated(final ProviderContext session) {
98         dataBroker = session.getSALService(DataBroker.class);
99         final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
100         TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
101         LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
102         topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
103             @Override
104             public BaseTopologyManager create() throws Exception {
105                 return new BaseTopologyManager(actorSystem,
106                         codecRegistry,
107                         dataBroker,
108                         topologyId,
109                         new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
110                         new NetconfNodeOperationalDataAggregator(),
111                         new LoggingSalNodeWriter(writer),
112                         new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
113             }
114         }), topologyId);
115     }
116
117     @Override
118     public void close() throws Exception {
119         // close all existing connectors, delete whole topology in datastore?
120         for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
121             connectorDTO.getCommunicator().close();
122         }
123         activeConnectors.clear();
124     }
125
126     @Override
127     protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
128         return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
129     }
130
131     @Override
132     public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
133         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
134     }
135
136     @Override
137     public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
138         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
139     }
140
141     @Override
142     public void unregisterMountPoint(final NodeId nodeId) {
143         Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
144         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
145     }
146
147     @Override
148     public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
149         Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
150         return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
151     }
152
153     @Override
154     public Collection<ProviderFunctionality> getProviderFunctionality() {
155         return Collections.emptySet();
156     }
157
158     static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
159
160         private final NetconfTopology netconfTopology;
161         private final EntityOwnershipService entityOwnershipService;
162         private final NodeWriter writer;
163
164         TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
165             this.netconfTopology = netconfTopology;
166             this.entityOwnershipService = entityOwnershipService;
167             this.writer = writer;
168         }
169
170         @Override
171         public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
172             return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
173         }
174     }
175
176     private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
177
178         private final NetconfTopology netconfTopology;
179         private final EntityOwnershipService entityOwnershipService;
180
181         NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
182             this.netconfTopology = netconfTopology;
183             this.entityOwnershipService = entityOwnershipService;
184         }
185
186         @Override
187         public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
188             return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));
189         }
190     }
191 }