Bug 8197: Deregister schema sources on actor stop
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.util.Timeout;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import java.io.IOException;
19 import java.util.List;
20 import java.util.stream.Collectors;
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
24 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
25 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
29 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
30 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
32 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
33 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
34 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
35 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
36 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
37 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
38 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
39 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
40 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
41 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
42 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
43 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
44 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
46 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
47 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
48 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
49 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
50 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
51 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
54 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
55 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
56 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
57 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
58 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
59 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
60 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
61 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
62 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
63 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
64 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67 import scala.concurrent.duration.Duration;
68
69 public class NetconfNodeActor extends UntypedActor {
70
71     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
72
73     private final SchemaSourceRegistry schemaRegistry;
74     private final SchemaRepository schemaRepository;
75     private final Timeout actorResponseWaitTime;
76     private final Duration writeTxIdleTimeout;
77
78     private RemoteDeviceId id;
79     private NetconfTopologySetup setup;
80     private List<SourceIdentifier> sourceIdentifiers;
81     private DOMRpcService deviceRpc;
82     private SlaveSalFacade slaveSalManager;
83     private DOMDataBroker deviceDataBroker;
84     //readTxActor can be shared
85     private ActorRef readTxActor;
86     private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
87
88     public static Props props(final NetconfTopologySetup setup,
89                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
90                               final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime) {
91         return Props.create(NetconfNodeActor.class, () ->
92                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime));
93     }
94
95     private NetconfNodeActor(final NetconfTopologySetup setup,
96                              final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
97                              final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime) {
98         this.setup = setup;
99         this.id = id;
100         this.schemaRegistry = schemaRegistry;
101         this.schemaRepository = schemaRepository;
102         this.actorResponseWaitTime = actorResponseWaitTime;
103         this.writeTxIdleTimeout = setup.getIdleTimeout();
104     }
105
106     @Override
107     public void onReceive(final Object message) throws Exception {
108         if (message instanceof CreateInitialMasterActorData) { // master
109
110             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
111             sourceIdentifiers = masterActorData.getSourceIndentifiers();
112             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
113             final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
114             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
115             this.deviceRpc = masterActorData.getDeviceRpc();
116
117             sender().tell(new MasterActorDataInitialized(), self());
118
119             LOG.debug("{}: Master is ready.", id);
120
121         } else if (message instanceof  RefreshSetupMasterActorData) {
122             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
123             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
124             sender().tell(new MasterActorDataInitialized(), self());
125         } else if (message instanceof AskForMasterMountPoint) { // master
126             // only master contains reference to deviceDataBroker
127             if (deviceDataBroker != null) {
128                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
129             }
130
131         } else if (message instanceof YangTextSchemaSourceRequest) { // master
132
133             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
134             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
135
136         } else if (message instanceof NewReadTransactionRequest) { // master
137
138             sender().tell(new NewReadTransactionReply(readTxActor), self());
139
140         } else if (message instanceof NewWriteTransactionRequest) { // master
141             try {
142                 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
143                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
144                 sender().tell(new NewWriteTransactionReply(txActor), self());
145             } catch (final Throwable t) {
146                 sender().tell(t, self());
147             }
148
149         } else if (message instanceof InvokeRpcMessage) { // master
150
151             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
152             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
153
154         } else if (message instanceof RegisterMountPoint) { //slaves
155
156             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
157             registerSlaveMountPoint(getSender());
158
159         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
160             if (slaveSalManager != null) {
161                 slaveSalManager.close();
162                 slaveSalManager = null;
163             }
164
165         }
166     }
167
168     @Override
169     public void postStop() throws Exception {
170         super.postStop();
171         closeSchemaSourceRegistrations();
172     }
173
174     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
175         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
176                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
177
178         Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
179             @Override
180             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
181                 try {
182                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
183                 } catch (final IOException exception) {
184                     sender.tell(exception.getCause(), getSelf());
185                 }
186             }
187
188             @Override
189             public void onFailure(@Nonnull final Throwable throwable) {
190                 sender.tell(throwable, getSelf());
191             }
192         });
193     }
194
195     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
196                                 final ActorRef recipient) {
197
198         final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
199                 deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
200
201         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
202             @Override
203             public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
204                 if (domRpcResult == null) {
205                     recipient.tell(new EmptyResultResponse(), getSender());
206                     return;
207                 }
208                 NormalizedNodeMessage nodeMessageReply = null;
209                 if (domRpcResult.getResult() != null) {
210                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
211                             domRpcResult.getResult());
212                 }
213                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
214             }
215
216             @Override
217             public void onFailure(@Nonnull final Throwable throwable) {
218                 recipient.tell(throwable, getSelf());
219             }
220         });
221     }
222
223     private void registerSlaveMountPoint(final ActorRef masterReference) {
224         if (this.slaveSalManager != null) {
225             slaveSalManager.close();
226         }
227         closeSchemaSourceRegistrations();
228         slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem(), actorResponseWaitTime);
229
230         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
231                 getSchemaContext(masterReference);
232         final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
233
234         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
235             @Override
236             public void onSuccess(final SchemaContext result) {
237                 LOG.info("{}: Schema context resolved: {}", id, result.getModules());
238                 slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference);
239             }
240
241             @Override
242             public void onFailure(@Nonnull final Throwable throwable) {
243                 LOG.error("{}: Failed to register mount point: {}", id, throwable);
244             }
245         });
246     }
247
248     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
249         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
250     }
251
252     private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(final ActorRef masterReference) {
253
254         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
255                 new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
256         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
257                 getContext().dispatcher());
258
259         registeredSchemas = sourceIdentifiers.stream()
260                 .map(sourceId ->
261                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
262                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
263                 .collect(Collectors.toList());
264
265         final SchemaContextFactory schemaContextFactory
266                 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
267
268         return schemaContextFactory.createSchemaContext(sourceIdentifiers);
269     }
270
271     private void closeSchemaSourceRegistrations() {
272         if (registeredSchemas != null) {
273             registeredSchemas.forEach(SchemaSourceRegistration::close);
274             registeredSchemas = null;
275         }
276     }
277
278 }