Merge "Bug 8153: Enforce check-style rules for netconf - netconf-notification-impl"
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.util.Timeout;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import java.io.IOException;
19 import java.util.List;
20 import java.util.stream.Collectors;
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
24 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
25 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
29 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
30 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
33 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
34 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
35 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
36 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
37 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
38 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
39 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
40 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
41 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
42 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
43 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
44 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
45 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
46 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
47 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
48 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
49 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
50 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
51 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
52 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
56 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
57 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
58 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
59 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
60 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
61 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
62 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
63 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
64 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
65 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68 import scala.concurrent.duration.Duration;
69
70 public class NetconfNodeActor extends UntypedActor {
71
72     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
73
74     private final SchemaSourceRegistry schemaRegistry;
75     private final SchemaRepository schemaRepository;
76     private final Timeout actorResponseWaitTime;
77     private final Duration writeTxIdleTimeout;
78     private final DOMMountPointService mountPointService;
79
80     private RemoteDeviceId id;
81     private NetconfTopologySetup setup;
82     private List<SourceIdentifier> sourceIdentifiers;
83     private DOMRpcService deviceRpc;
84     private SlaveSalFacade slaveSalManager;
85     private DOMDataBroker deviceDataBroker;
86     //readTxActor can be shared
87     private ActorRef readTxActor;
88     private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
89
90     public static Props props(final NetconfTopologySetup setup,
91                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
92                               final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
93                               final DOMMountPointService mountPointService) {
94         return Props.create(NetconfNodeActor.class, () ->
95                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime,
96                         mountPointService));
97     }
98
99     private NetconfNodeActor(final NetconfTopologySetup setup,
100                              final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
101                              final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
102                              final DOMMountPointService mountPointService) {
103         this.setup = setup;
104         this.id = id;
105         this.schemaRegistry = schemaRegistry;
106         this.schemaRepository = schemaRepository;
107         this.actorResponseWaitTime = actorResponseWaitTime;
108         this.writeTxIdleTimeout = setup.getIdleTimeout();
109         this.mountPointService = mountPointService;
110     }
111
112     @Override
113     public void onReceive(final Object message) throws Exception {
114         if (message instanceof CreateInitialMasterActorData) { // master
115
116             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
117             sourceIdentifiers = masterActorData.getSourceIndentifiers();
118             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
119             final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
120             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
121             this.deviceRpc = masterActorData.getDeviceRpc();
122
123             sender().tell(new MasterActorDataInitialized(), self());
124
125             LOG.debug("{}: Master is ready.", id);
126
127         } else if (message instanceof  RefreshSetupMasterActorData) {
128             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
129             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
130             sender().tell(new MasterActorDataInitialized(), self());
131         } else if (message instanceof AskForMasterMountPoint) { // master
132             // only master contains reference to deviceDataBroker
133             if (deviceDataBroker != null) {
134                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
135             }
136
137         } else if (message instanceof YangTextSchemaSourceRequest) { // master
138
139             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
140             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
141
142         } else if (message instanceof NewReadTransactionRequest) { // master
143
144             sender().tell(new NewReadTransactionReply(readTxActor), self());
145
146         } else if (message instanceof NewWriteTransactionRequest) { // master
147             try {
148                 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
149                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
150                 sender().tell(new NewWriteTransactionReply(txActor), self());
151             } catch (final Throwable t) {
152                 sender().tell(t, self());
153             }
154
155         } else if (message instanceof InvokeRpcMessage) { // master
156
157             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
158             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
159
160         } else if (message instanceof RegisterMountPoint) { //slaves
161
162             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
163             registerSlaveMountPoint(getSender());
164
165         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
166             if (slaveSalManager != null) {
167                 slaveSalManager.close();
168                 slaveSalManager = null;
169             }
170
171         }
172     }
173
174     @Override
175     public void postStop() throws Exception {
176         super.postStop();
177         closeSchemaSourceRegistrations();
178     }
179
180     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
181         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
182                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
183
184         Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
185             @Override
186             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
187                 try {
188                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
189                 } catch (final IOException exception) {
190                     sender.tell(exception.getCause(), getSelf());
191                 }
192             }
193
194             @Override
195             public void onFailure(@Nonnull final Throwable throwable) {
196                 sender.tell(throwable, getSelf());
197             }
198         });
199     }
200
201     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
202                                 final ActorRef recipient) {
203
204         final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
205                 deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
206
207         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
208             @Override
209             public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
210                 if (domRpcResult == null) {
211                     recipient.tell(new EmptyResultResponse(), getSender());
212                     return;
213                 }
214                 NormalizedNodeMessage nodeMessageReply = null;
215                 if (domRpcResult.getResult() != null) {
216                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
217                             domRpcResult.getResult());
218                 }
219                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
220             }
221
222             @Override
223             public void onFailure(@Nonnull final Throwable throwable) {
224                 recipient.tell(throwable, getSelf());
225             }
226         });
227     }
228
229     private void registerSlaveMountPoint(final ActorRef masterReference) {
230         if (this.slaveSalManager != null) {
231             slaveSalManager.close();
232         }
233         closeSchemaSourceRegistrations();
234         slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime,
235                 mountPointService);
236
237         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
238                 getSchemaContext(masterReference);
239         final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
240
241         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
242             @Override
243             public void onSuccess(final SchemaContext result) {
244                 LOG.info("{}: Schema context resolved: {}", id, result.getModules());
245                 slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference);
246             }
247
248             @Override
249             public void onFailure(@Nonnull final Throwable throwable) {
250                 LOG.error("{}: Failed to register mount point: {}", id, throwable);
251             }
252         });
253     }
254
255     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
256         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
257     }
258
259     private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(final ActorRef masterReference) {
260
261         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
262                 new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
263         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
264                 getContext().dispatcher());
265
266         registeredSchemas = sourceIdentifiers.stream()
267                 .map(sourceId ->
268                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
269                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
270                 .collect(Collectors.toList());
271
272         final SchemaContextFactory schemaContextFactory
273                 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
274
275         return schemaContextFactory.createSchemaContext(sourceIdentifiers);
276     }
277
278     private void closeSchemaSourceRegistrations() {
279         if (registeredSchemas != null) {
280             registeredSchemas.forEach(SchemaSourceRegistration::close);
281             registeredSchemas = null;
282         }
283     }
284
285 }