Bug 6581 - Make timeout for ask configurable
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.util.Timeout;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import java.io.IOException;
19 import java.util.List;
20 import javax.annotation.Nonnull;
21 import javax.annotation.Nullable;
22 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
23 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
24 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
26 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
27 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
28 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
29 import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
30 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
31 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
32 import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl;
33 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
34 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
35 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
36 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
37 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
38 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
39 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
40 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
41 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
42 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
44 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
45 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
46 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
47 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
48 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
49 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
50 import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
51 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
52 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
53 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
54 import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
58 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
59 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
60 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
61 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
62 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
63 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
64 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
65 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
66 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 public class NetconfNodeActor extends UntypedActor {
71
72     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
73
74     private NetconfTopologySetup setup;
75     private RemoteDeviceId id;
76     private final SchemaSourceRegistry schemaRegistry;
77     private final SchemaRepository schemaRepository;
78
79     private RemoteOperationTxProcessor operationsProcessor;
80     private List<SourceIdentifier> sourceIdentifiers;
81     private DOMRpcService deviceRpc;
82     private SlaveSalFacade slaveSalManager;
83     private final Timeout actorResponseWaitTime;
84
85     public static Props props(final NetconfTopologySetup setup,
86                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
87                               final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime) {
88         return Props.create(NetconfNodeActor.class, () ->
89                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime));
90     }
91
92     private NetconfNodeActor(final NetconfTopologySetup setup,
93                              final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
94                              final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime) {
95         this.setup = setup;
96         this.id = id;
97         this.schemaRegistry = schemaRegistry;
98         this.schemaRepository = schemaRepository;
99         this.actorResponseWaitTime = actorResponseWaitTime;
100     }
101
102     @Override
103     public void onReceive(final Object message) throws Exception {
104         if (message instanceof CreateInitialMasterActorData) { // master
105
106             sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers();
107             operationsProcessor =
108                     new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
109                             id);
110             this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc();
111
112             sender().tell(new MasterActorDataInitialized(), self());
113
114             LOG.debug("{}: Master is ready.", id);
115
116         } else if (message instanceof  RefreshSetupMasterActorData) {
117             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
118             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
119             sender().tell(new MasterActorDataInitialized(), self());
120         } else if (message instanceof AskForMasterMountPoint) { // master
121             // only master contains reference to operations processor
122             if (operationsProcessor != null) {
123                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
124             }
125
126         } else if (message instanceof TransactionRequest) { // master
127
128             resolveProxyCalls(message, sender(), getSelf());
129
130         } else if (message instanceof YangTextSchemaSourceRequest) { // master
131
132             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
133             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
134
135         } else if (message instanceof InvokeRpcMessage) {
136
137             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
138             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
139
140         } else if (message instanceof RegisterMountPoint) { //slaves
141
142             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
143             registerSlaveMountPoint(getSender());
144
145         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
146             if (slaveSalManager != null) {
147                 slaveSalManager.close();
148                 slaveSalManager = null;
149             }
150
151         }
152     }
153
154     private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
155         if (message instanceof OpenTransaction) {
156             operationsProcessor.doOpenTransaction(recipient, futureSender);
157         } else if (message instanceof ReadRequest) {
158
159             final ReadRequest readRequest = (ReadRequest) message;
160             operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
161
162         } else if (message instanceof ExistsRequest) {
163
164             final ExistsRequest readRequest = (ExistsRequest) message;
165             operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
166
167         } else if (message instanceof MergeRequest) {
168
169             final MergeRequest mergeRequest = (MergeRequest) message;
170             operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage());
171
172         } else if (message instanceof PutRequest) {
173
174             final PutRequest putRequest = (PutRequest) message;
175             operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage());
176
177         } else if (message instanceof DeleteRequest) {
178
179             final DeleteRequest deleteRequest = (DeleteRequest) message;
180             operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath());
181
182         } else if (message instanceof CancelRequest) {
183
184             operationsProcessor.doCancel(recipient, futureSender);
185
186         } else if (message instanceof SubmitRequest) {
187
188             operationsProcessor.doSubmit(recipient, futureSender);
189         }
190     }
191
192     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
193         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
194                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
195
196         Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
197             @Override
198             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
199                 try {
200                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
201                 } catch (final IOException exception) {
202                     sender.tell(exception.getCause(), getSelf());
203                 }
204             }
205
206             @Override
207             public void onFailure(@Nonnull final Throwable throwable) {
208                 sender.tell(throwable, getSelf());
209             }
210         });
211     }
212
213     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
214                                 final ActorRef recipient) {
215
216         final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
217                 deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
218
219         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
220             @Override
221             public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
222                 if (domRpcResult == null) {
223                     recipient.tell(new EmptyResultResponse(), getSender());
224                     return;
225                 }
226                 NormalizedNodeMessage nodeMessageReply = null;
227                 if (domRpcResult.getResult() != null) {
228                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
229                             domRpcResult.getResult());
230                 }
231                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
232             }
233
234             @Override
235             public void onFailure(@Nonnull final Throwable throwable) {
236                 recipient.tell(throwable, getSelf());
237             }
238         });
239     }
240
241     private void registerSlaveMountPoint(final ActorRef masterReference) {
242         if (this.slaveSalManager != null) {
243             slaveSalManager.close();
244         }
245         slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem(), actorResponseWaitTime);
246
247         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
248                 getSchemaContext(masterReference);
249         final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
250
251         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
252             @Override
253             public void onSuccess(final SchemaContext result) {
254                 LOG.info("{}: Schema context resolved: {}", id, result.getModules());
255                 slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference);
256             }
257
258             @Override
259             public void onFailure(@Nonnull final Throwable throwable) {
260                 LOG.error("{}: Failed to register mount point: {}", id, throwable);
261             }
262         });
263     }
264
265     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
266         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
267     }
268
269     private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(final ActorRef masterReference) {
270
271         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
272                 new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
273         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
274                 getContext().dispatcher());
275
276         sourceIdentifiers.forEach(sourceId ->
277                 schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
278                         YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
279
280         final SchemaContextFactory schemaContextFactory
281                 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
282
283         return schemaContextFactory.createSchemaContext(sourceIdentifiers);
284     }
285
286 }