Merge "BUG 9112: NPE in karaf cli when device is still connecting"
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.util.Timeout;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.io.IOException;
20 import java.util.List;
21 import java.util.stream.Collectors;
22 import javax.annotation.Nonnull;
23 import javax.annotation.Nullable;
24 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
25 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
26 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
28 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
31 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
33 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
34 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
35 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
36 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
37 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
38 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
39 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
40 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
41 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
42 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
43 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
44 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
45 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
46 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
47 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
48 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
49 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
50 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
51 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
52 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
53 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
54 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
55 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
56 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
57 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
59 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
60 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
61 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
62 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
63 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
64 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
65 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
66 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
67 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
68 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
69 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
70 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73 import scala.concurrent.duration.Duration;
74
75 public class NetconfNodeActor extends UntypedActor {
76
77     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
78
79     private final Duration writeTxIdleTimeout;
80     private final DOMMountPointService mountPointService;
81
82     private SchemaSourceRegistry schemaRegistry;
83     private SchemaRepository schemaRepository;
84     private Timeout actorResponseWaitTime;
85     private RemoteDeviceId id;
86     private NetconfTopologySetup setup;
87     private List<SourceIdentifier> sourceIdentifiers;
88     private DOMRpcService deviceRpc;
89     private SlaveSalFacade slaveSalManager;
90     private DOMDataBroker deviceDataBroker;
91     //readTxActor can be shared
92     private ActorRef readTxActor;
93     private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
94
95     public static Props props(final NetconfTopologySetup setup,
96                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
97                               final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
98                               final DOMMountPointService mountPointService) {
99         return Props.create(NetconfNodeActor.class, () ->
100                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime,
101                         mountPointService));
102     }
103
104     private NetconfNodeActor(final NetconfTopologySetup setup,
105                              final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
106                              final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
107                              final DOMMountPointService mountPointService) {
108         this.setup = setup;
109         this.id = id;
110         this.schemaRegistry = schemaRegistry;
111         this.schemaRepository = schemaRepository;
112         this.actorResponseWaitTime = actorResponseWaitTime;
113         this.writeTxIdleTimeout = setup.getIdleTimeout();
114         this.mountPointService = mountPointService;
115     }
116
117     @SuppressWarnings("checkstyle:IllegalCatch")
118     @Override
119     public void onReceive(final Object message) throws Exception {
120         if (message instanceof CreateInitialMasterActorData) { // master
121
122             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
123             sourceIdentifiers = masterActorData.getSourceIndentifiers();
124             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
125             final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
126             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
127             this.deviceRpc = masterActorData.getDeviceRpc();
128
129             sender().tell(new MasterActorDataInitialized(), self());
130
131             LOG.debug("{}: Master is ready.", id);
132
133         } else if (message instanceof  RefreshSetupMasterActorData) {
134             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
135             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
136             sender().tell(new MasterActorDataInitialized(), self());
137         } else if (message instanceof AskForMasterMountPoint) { // master
138             // only master contains reference to deviceDataBroker
139             if (deviceDataBroker != null) {
140                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
141             }
142
143         } else if (message instanceof YangTextSchemaSourceRequest) { // master
144
145             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
146             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
147
148         } else if (message instanceof NewReadTransactionRequest) { // master
149
150             sender().tell(new NewReadTransactionReply(readTxActor), self());
151
152         } else if (message instanceof NewWriteTransactionRequest) { // master
153             try {
154                 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
155                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
156                 sender().tell(new NewWriteTransactionReply(txActor), self());
157             } catch (final Exception t) {
158                 sender().tell(t, self());
159             }
160
161         } else if (message instanceof NewReadWriteTransactionRequest) {
162             try {
163                 final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
164                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
165                 sender().tell(new NewReadWriteTransactionReply(txActor), self());
166             } catch (final Exception t) {
167                 sender().tell(t, self());
168             }
169         } else if (message instanceof InvokeRpcMessage) { // master
170
171             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
172             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
173
174         } else if (message instanceof RegisterMountPoint) { //slaves
175
176             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
177             registerSlaveMountPoint(getSender());
178
179         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
180             if (slaveSalManager != null) {
181                 slaveSalManager.close();
182                 slaveSalManager = null;
183             }
184         } else if (message instanceof RefreshSlaveActor) { //slave
185             actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
186             id = ((RefreshSlaveActor) message).getId();
187             schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
188             setup = ((RefreshSlaveActor) message).getSetup();
189             schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
190         }
191
192     }
193
194     @Override
195     public void postStop() throws Exception {
196         super.postStop();
197         closeSchemaSourceRegistrations();
198     }
199
200     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
201         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
202                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
203
204         Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
205             @Override
206             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
207                 try {
208                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
209                 } catch (final IOException exception) {
210                     sender.tell(exception.getCause(), getSelf());
211                 }
212             }
213
214             @Override
215             public void onFailure(@Nonnull final Throwable throwable) {
216                 sender.tell(throwable, getSelf());
217             }
218         }, MoreExecutors.directExecutor());
219     }
220
221     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
222                                 final ActorRef recipient) {
223
224         final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
225                 deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
226
227         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
228             @Override
229             public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
230                 if (domRpcResult == null) {
231                     recipient.tell(new EmptyResultResponse(), getSender());
232                     return;
233                 }
234                 NormalizedNodeMessage nodeMessageReply = null;
235                 if (domRpcResult.getResult() != null) {
236                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
237                             domRpcResult.getResult());
238                 }
239                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
240             }
241
242             @Override
243             public void onFailure(@Nonnull final Throwable throwable) {
244                 recipient.tell(throwable, getSelf());
245             }
246         }, MoreExecutors.directExecutor());
247     }
248
249     private void registerSlaveMountPoint(final ActorRef masterReference) {
250         if (this.slaveSalManager != null) {
251             slaveSalManager.close();
252         }
253         closeSchemaSourceRegistrations();
254         slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime,
255                 mountPointService);
256
257         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
258                 getSchemaContext(masterReference);
259         final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
260
261         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
262             @Override
263             public void onSuccess(final SchemaContext result) {
264                 LOG.info("{}: Schema context resolved: {}", id, result.getModules());
265                 slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference);
266             }
267
268             @Override
269             public void onFailure(@Nonnull final Throwable throwable) {
270                 LOG.error("{}: Failed to register mount point: {}", id, throwable);
271             }
272         }, MoreExecutors.directExecutor());
273     }
274
275     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
276         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
277     }
278
279     private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(final ActorRef masterReference) {
280
281         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
282                 new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
283         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
284                 getContext().dispatcher());
285
286         registeredSchemas = sourceIdentifiers.stream()
287                 .map(sourceId ->
288                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
289                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
290                 .collect(Collectors.toList());
291
292         final SchemaContextFactory schemaContextFactory
293                 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
294
295         return schemaContextFactory.createSchemaContext(sourceIdentifiers);
296     }
297
298     private void closeSchemaSourceRegistrations() {
299         if (registeredSchemas != null) {
300             registeredSchemas.forEach(SchemaSourceRegistration::close);
301             registeredSchemas = null;
302         }
303     }
304
305 }