Merge "Fix error reporting for PUT/POST"
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import java.io.IOException;
18 import java.util.List;
19 import javax.annotation.Nonnull;
20 import javax.annotation.Nullable;
21 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
22 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
23 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
26 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
27 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
28 import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
29 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
30 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
31 import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl;
32 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
33 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
34 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
35 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
36 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
37 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
38 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
39 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
40 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
41 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
43 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
44 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
46 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
47 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
48 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
49 import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
50 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
51 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
52 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
53 import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
57 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
58 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
59 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
60 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
61 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
62 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
63 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
64 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
65 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69 public class NetconfNodeActor extends UntypedActor {
70
71     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
72
73     private NetconfTopologySetup setup;
74     private RemoteDeviceId id;
75     private final SchemaSourceRegistry schemaRegistry;
76     private final SchemaRepository schemaRepository;
77
78     private RemoteOperationTxProcessor operationsProcessor;
79     private List<SourceIdentifier> sourceIdentifiers;
80     private DOMRpcService deviceRpc;
81     private SlaveSalFacade slaveSalManager;
82
83     public static Props props(final NetconfTopologySetup setup,
84                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
85                               final SchemaRepository schemaRepository) {
86         return Props.create(NetconfNodeActor.class, () ->
87                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository));
88     }
89
90     private NetconfNodeActor(final NetconfTopologySetup setup,
91                              final RemoteDeviceId id, SchemaSourceRegistry schemaRegistry,
92                              final SchemaRepository schemaRepository) {
93         this.setup = setup;
94         this.id = id;
95         this.schemaRegistry = schemaRegistry;
96         this.schemaRepository = schemaRepository;
97     }
98
99     @Override
100     public void onReceive(final Object message) throws Exception {
101         if (message instanceof CreateInitialMasterActorData) { // master
102
103             sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers();
104             operationsProcessor =
105                     new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
106                             id);
107             this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc();
108
109             sender().tell(new MasterActorDataInitialized(), self());
110
111             LOG.debug("{}: Master is ready.", id);
112
113         } else if (message instanceof  RefreshSetupMasterActorData) {
114             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
115             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
116             sender().tell(new MasterActorDataInitialized(), self());
117         } else if (message instanceof AskForMasterMountPoint) { // master
118             // only master contains reference to operations processor
119             if (operationsProcessor != null) {
120                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
121             }
122
123         } else if (message instanceof TransactionRequest) { // master
124
125             resolveProxyCalls(message, sender(), getSelf());
126
127         } else if (message instanceof YangTextSchemaSourceRequest) { // master
128
129             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
130             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
131
132         } else if (message instanceof InvokeRpcMessage) {
133
134             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
135             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
136
137         } else if (message instanceof RegisterMountPoint) { //slaves
138
139             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
140             registerSlaveMountPoint(getSender());
141
142         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
143             if (slaveSalManager != null) {
144                 slaveSalManager.close();
145                 slaveSalManager = null;
146             }
147
148         }
149     }
150
151     private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
152         if (message instanceof OpenTransaction) {
153             operationsProcessor.doOpenTransaction(recipient, futureSender);
154         } else if (message instanceof ReadRequest) {
155
156             final ReadRequest readRequest = (ReadRequest) message;
157             operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
158
159         } else if (message instanceof ExistsRequest) {
160
161             final ExistsRequest readRequest = (ExistsRequest) message;
162             operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
163
164         } else if (message instanceof MergeRequest) {
165
166             final MergeRequest mergeRequest = (MergeRequest) message;
167             operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage());
168
169         } else if (message instanceof PutRequest) {
170
171             final PutRequest putRequest = (PutRequest) message;
172             operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage());
173
174         } else if (message instanceof DeleteRequest) {
175
176             final DeleteRequest deleteRequest = (DeleteRequest) message;
177             operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath());
178
179         } else if (message instanceof CancelRequest) {
180
181             operationsProcessor.doCancel(recipient, futureSender);
182
183         } else if (message instanceof SubmitRequest) {
184
185             operationsProcessor.doSubmit(recipient, futureSender);
186         }
187     }
188
189     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
190         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
191                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
192
193         Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
194             @Override
195             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
196                 try {
197                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
198                 } catch (IOException exception) {
199                     sender.tell(exception.getCause(), getSelf());
200                 }
201             }
202
203             @Override
204             public void onFailure(@Nonnull final Throwable throwable) {
205                 sender.tell(throwable, getSelf());
206             }
207         });
208     }
209
210     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
211                                 final ActorRef recipient) {
212
213         final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
214                 deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
215
216         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
217             @Override
218             public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
219                 if (domRpcResult == null) {
220                     recipient.tell(new EmptyResultResponse(), getSender());
221                     return;
222                 }
223                 NormalizedNodeMessage nodeMessageReply = null;
224                 if (domRpcResult.getResult() != null) {
225                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
226                             domRpcResult.getResult());
227                 }
228                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
229             }
230
231             @Override
232             public void onFailure(@Nonnull final Throwable throwable) {
233                 recipient.tell(throwable, getSelf());
234             }
235         });
236     }
237
238     private void registerSlaveMountPoint(ActorRef masterReference) {
239         if (this.slaveSalManager != null) {
240             slaveSalManager.close();
241         }
242         slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem());
243
244         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
245                 getSchemaContext(masterReference);
246         final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
247
248         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
249             @Override
250             public void onSuccess(final SchemaContext result) {
251                 LOG.info("{}: Schema context resolved: {}", id, result.getModules());
252                 slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference);
253             }
254
255             @Override
256             public void onFailure(@Nonnull final Throwable throwable) {
257                 LOG.error("{}: Failed to register mount point: {}", id, throwable);
258             }
259         });
260     }
261
262     private DOMRpcService getDOMRpcService(ActorRef masterReference) {
263         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id);
264     }
265
266     private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(ActorRef masterReference) {
267
268         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
269                 new ProxyYangTextSourceProvider(masterReference, getContext());
270         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
271                 getContext().dispatcher());
272
273         sourceIdentifiers.forEach(sourceId ->
274                 schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
275                         YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
276
277         final SchemaContextFactory schemaContextFactory
278                 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
279
280         return schemaContextFactory.createSchemaContext(sourceIdentifiers);
281     }
282
283 }