Bug 8032 - Initialization in sal failed, disconnecting from device
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.util.Timeout;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import java.io.IOException;
19 import java.util.List;
20 import java.util.stream.Collectors;
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
24 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
25 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
30 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
33 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
34 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
35 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
36 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
37 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
38 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
39 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
40 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
41 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
42 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
43 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
44 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
45 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
46 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
47 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
48 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
49 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
50 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
51 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
52 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
53 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
54 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
55 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
59 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
60 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
61 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
62 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
63 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
64 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
65 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
66 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
67 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
68 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71 import scala.concurrent.duration.Duration;
72
73 public class NetconfNodeActor extends UntypedActor {
74
75     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
76
77     private final SchemaSourceRegistry schemaRegistry;
78     private final SchemaRepository schemaRepository;
79     private final DOMMountPointService mountPointService;
80     private final Timeout actorResponseWaitTime;
81     private final Duration writeTxIdleTimeout;
82
83     private RemoteDeviceId id;
84     private NetconfTopologySetup setup;
85     private List<SourceIdentifier> sourceIdentifiers;
86     private DOMRpcService deviceRpc;
87     private SlaveSalFacade slaveSalManager;
88     private DOMDataBroker deviceDataBroker;
89     //readTxActor can be shared
90     private ActorRef readTxActor;
91     private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
92
93     public static Props props(final NetconfTopologySetup setup,
94                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
95                               final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
96                               final DOMMountPointService mountPointService) {
97         return Props.create(NetconfNodeActor.class, () ->
98                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime,
99                         mountPointService));
100     }
101
102     private NetconfNodeActor(final NetconfTopologySetup setup,
103                              final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
104                              final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
105                              final DOMMountPointService mountPointService) {
106         this.setup = setup;
107         this.id = id;
108         this.schemaRegistry = schemaRegistry;
109         this.schemaRepository = schemaRepository;
110         this.actorResponseWaitTime = actorResponseWaitTime;
111         this.writeTxIdleTimeout = setup.getIdleTimeout();
112         this.mountPointService = mountPointService;
113     }
114
115     @Override
116     public void onReceive(final Object message) throws Exception {
117         if (message instanceof CreateInitialMasterActorData) { // master
118
119             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
120             sourceIdentifiers = masterActorData.getSourceIndentifiers();
121             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
122             final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
123             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
124             this.deviceRpc = masterActorData.getDeviceRpc();
125
126             sender().tell(new MasterActorDataInitialized(), self());
127
128             LOG.debug("{}: Master is ready.", id);
129
130         } else if (message instanceof  RefreshSetupMasterActorData) {
131             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
132             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
133             sender().tell(new MasterActorDataInitialized(), self());
134         } else if (message instanceof AskForMasterMountPoint) { // master
135             // only master contains reference to deviceDataBroker
136             if (deviceDataBroker != null) {
137                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
138             }
139
140         } else if (message instanceof YangTextSchemaSourceRequest) { // master
141
142             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
143             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
144
145         } else if (message instanceof NewReadTransactionRequest) { // master
146
147             sender().tell(new NewReadTransactionReply(readTxActor), self());
148
149         } else if (message instanceof NewWriteTransactionRequest) { // master
150             try {
151                 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
152                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
153                 sender().tell(new NewWriteTransactionReply(txActor), self());
154             } catch (final Throwable t) {
155                 sender().tell(t, self());
156             }
157
158         } else if (message instanceof NewReadWriteTransactionRequest) {
159             try {
160                 final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
161                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
162                 sender().tell(new NewReadWriteTransactionReply(txActor), self());
163             } catch (final Throwable t) {
164                 sender().tell(t, self());
165             }
166         } else if (message instanceof InvokeRpcMessage) { // master
167
168             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
169             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
170
171         } else if (message instanceof RegisterMountPoint) { //slaves
172
173             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
174             registerSlaveMountPoint(getSender());
175
176         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
177             if (slaveSalManager != null) {
178                 slaveSalManager.close();
179                 slaveSalManager = null;
180             }
181
182         }
183     }
184
185     @Override
186     public void postStop() throws Exception {
187         super.postStop();
188         closeSchemaSourceRegistrations();
189     }
190
191     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
192         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
193                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
194
195         Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
196             @Override
197             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
198                 try {
199                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
200                 } catch (final IOException exception) {
201                     sender.tell(exception.getCause(), getSelf());
202                 }
203             }
204
205             @Override
206             public void onFailure(@Nonnull final Throwable throwable) {
207                 sender.tell(throwable, getSelf());
208             }
209         });
210     }
211
212     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
213                                 final ActorRef recipient) {
214
215         final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
216                 deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
217
218         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
219             @Override
220             public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
221                 if (domRpcResult == null) {
222                     recipient.tell(new EmptyResultResponse(), getSender());
223                     return;
224                 }
225                 NormalizedNodeMessage nodeMessageReply = null;
226                 if (domRpcResult.getResult() != null) {
227                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
228                             domRpcResult.getResult());
229                 }
230                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
231             }
232
233             @Override
234             public void onFailure(@Nonnull final Throwable throwable) {
235                 recipient.tell(throwable, getSelf());
236             }
237         });
238     }
239
240     private void registerSlaveMountPoint(final ActorRef masterReference) {
241         if (this.slaveSalManager != null) {
242             slaveSalManager.close();
243         }
244         closeSchemaSourceRegistrations();
245         slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime,
246                 mountPointService);
247
248         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
249                 getSchemaContext(masterReference);
250         final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
251
252         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
253             @Override
254             public void onSuccess(final SchemaContext result) {
255                 LOG.info("{}: Schema context resolved: {}", id, result.getModules());
256                 slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference);
257             }
258
259             @Override
260             public void onFailure(@Nonnull final Throwable throwable) {
261                 LOG.error("{}: Failed to register mount point: {}", id, throwable);
262             }
263         });
264     }
265
266     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
267         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
268     }
269
270     private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(final ActorRef masterReference) {
271
272         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
273                 new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
274         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
275                 getContext().dispatcher());
276
277         registeredSchemas = sourceIdentifiers.stream()
278                 .map(sourceId ->
279                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
280                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
281                 .collect(Collectors.toList());
282
283         final SchemaContextFactory schemaContextFactory
284                 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
285
286         return schemaContextFactory.createSchemaContext(sourceIdentifiers);
287     }
288
289     private void closeSchemaSourceRegistrations() {
290         if (registeredSchemas != null) {
291             registeredSchemas.forEach(SchemaSourceRegistration::close);
292             registeredSchemas = null;
293         }
294     }
295
296 }