571483afa9ba24e64a9c7b6f4529479620bf6a05
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import java.io.IOException;
18 import java.util.List;
19 import javax.annotation.Nonnull;
20 import javax.annotation.Nullable;
21 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
22 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
23 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
24 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
25 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
27 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
28 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
29 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
30 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
31 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
32 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
33 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
34 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
35 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
36 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
37 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
38 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
39 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
40 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
41 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
42 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
44 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
45 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
46 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
47 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
48 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
49 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
52 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
53 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
54 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
55 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
56 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
57 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
58 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
59 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
60 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
61 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.duration.Duration;
65
66 public class NetconfNodeActor extends UntypedActor {
67
68     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
69
70     private final SchemaSourceRegistry schemaRegistry;
71     private final SchemaRepository schemaRepository;
72     private final Duration writeTxIdleTimeout;
73
74     private RemoteDeviceId id;
75     private NetconfTopologySetup setup;
76     private List<SourceIdentifier> sourceIdentifiers;
77     private DOMRpcService deviceRpc;
78     private SlaveSalFacade slaveSalManager;
79     private DOMDataBroker deviceDataBroker;
80     //readTxActor can be shared
81     private ActorRef readTxActor;
82
83     public static Props props(final NetconfTopologySetup setup,
84                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
85                               final SchemaRepository schemaRepository) {
86         return Props.create(NetconfNodeActor.class, () ->
87                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository));
88     }
89
90     private NetconfNodeActor(final NetconfTopologySetup setup,
91                              final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
92                              final SchemaRepository schemaRepository) {
93         this.setup = setup;
94         this.id = id;
95         this.schemaRegistry = schemaRegistry;
96         this.schemaRepository = schemaRepository;
97         this.writeTxIdleTimeout = setup.getIdleTimeout();
98     }
99
100     @Override
101     public void onReceive(final Object message) throws Exception {
102         if (message instanceof CreateInitialMasterActorData) { // master
103
104             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
105             sourceIdentifiers = masterActorData.getSourceIndentifiers();
106             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
107             final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
108             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
109             this.deviceRpc = masterActorData.getDeviceRpc();
110
111             sender().tell(new MasterActorDataInitialized(), self());
112
113             LOG.debug("{}: Master is ready.", id);
114
115         } else if (message instanceof  RefreshSetupMasterActorData) {
116             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
117             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
118             sender().tell(new MasterActorDataInitialized(), self());
119         } else if (message instanceof AskForMasterMountPoint) { // master
120             // only master contains reference to deviceDataBroker
121             if (deviceDataBroker != null) {
122                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
123             }
124
125         } else if (message instanceof YangTextSchemaSourceRequest) { // master
126
127             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
128             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
129
130         } else if (message instanceof NewReadTransactionRequest) { // master
131
132             sender().tell(new NewReadTransactionReply(readTxActor), self());
133
134         } else if (message instanceof NewWriteTransactionRequest) { // master
135             try {
136                 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
137                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
138                 sender().tell(new NewWriteTransactionReply(txActor), self());
139             } catch (final Throwable t) {
140                 sender().tell(t, self());
141             }
142
143         } else if (message instanceof InvokeRpcMessage) { // master
144
145             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
146             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
147
148         } else if (message instanceof RegisterMountPoint) { //slaves
149
150             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
151             registerSlaveMountPoint(getSender());
152
153         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
154             if (slaveSalManager != null) {
155                 slaveSalManager.close();
156                 slaveSalManager = null;
157             }
158
159         }
160     }
161
162
163     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
164         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
165                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
166
167         Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
168             @Override
169             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
170                 try {
171                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
172                 } catch (final IOException exception) {
173                     sender.tell(exception.getCause(), getSelf());
174                 }
175             }
176
177             @Override
178             public void onFailure(@Nonnull final Throwable throwable) {
179                 sender.tell(throwable, getSelf());
180             }
181         });
182     }
183
184     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
185                                 final ActorRef recipient) {
186
187         final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
188                 deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
189
190         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
191             @Override
192             public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
193                 if (domRpcResult == null) {
194                     recipient.tell(new EmptyResultResponse(), getSender());
195                     return;
196                 }
197                 NormalizedNodeMessage nodeMessageReply = null;
198                 if (domRpcResult.getResult() != null) {
199                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
200                             domRpcResult.getResult());
201                 }
202                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
203             }
204
205             @Override
206             public void onFailure(@Nonnull final Throwable throwable) {
207                 recipient.tell(throwable, getSelf());
208             }
209         });
210     }
211
212     private void registerSlaveMountPoint(final ActorRef masterReference) {
213         if (this.slaveSalManager != null) {
214             slaveSalManager.close();
215         }
216         slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem());
217
218         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
219                 getSchemaContext(masterReference);
220         final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
221
222         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
223             @Override
224             public void onSuccess(final SchemaContext result) {
225                 LOG.info("{}: Schema context resolved: {}", id, result.getModules());
226                 slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference);
227             }
228
229             @Override
230             public void onFailure(@Nonnull final Throwable throwable) {
231                 LOG.error("{}: Failed to register mount point: {}", id, throwable);
232             }
233         });
234     }
235
236     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
237         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id);
238     }
239
240     private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(final ActorRef masterReference) {
241
242         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
243                 new ProxyYangTextSourceProvider(masterReference, getContext());
244         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
245                 getContext().dispatcher());
246
247         sourceIdentifiers.forEach(sourceId ->
248                 schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
249                         YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
250
251         final SchemaContextFactory schemaContextFactory
252                 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
253
254         return schemaContextFactory.createSchemaContext(sourceIdentifiers);
255     }
256
257 }