Custom scheme-cache-directory yang models are not replicated among
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status.Failure;
13 import akka.actor.Status.Success;
14 import akka.pattern.AskTimeoutException;
15 import akka.util.Timeout;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.stream.Collectors;
24 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
25 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
26 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
27 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
28 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
32 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
33 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
34 import org.opendaylight.mdsal.dom.api.DOMRpcService;
35 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
36 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
37 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
38 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
39 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
40 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
41 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
42 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
43 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
44 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
45 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
46 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
47 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
48 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
49 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
50 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
51 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
52 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
53 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
54 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
55 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
60 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
61 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
62 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
63 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
64 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
65 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
66 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
67 import scala.concurrent.duration.Duration;
68
69 public class NetconfNodeActor extends AbstractUntypedActor {
70     private final Duration writeTxIdleTimeout;
71     private final DOMMountPointService mountPointService;
72
73     private SchemaSourceRegistry schemaRegistry;
74     private SchemaRepository schemaRepository;
75     private Timeout actorResponseWaitTime;
76     private RemoteDeviceId id;
77     private NetconfTopologySetup setup;
78     private List<SourceIdentifier> sourceIdentifiers;
79     private DOMRpcService deviceRpc;
80     private SlaveSalFacade slaveSalManager;
81     private DOMDataBroker deviceDataBroker;
82     //readTxActor can be shared
83     private ActorRef readTxActor;
84     private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
85
86     public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id,
87             final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
88         return Props.create(NetconfNodeActor.class, () ->
89                 new NetconfNodeActor(setup, id, actorResponseWaitTime, mountPointService));
90     }
91
92     protected NetconfNodeActor(final NetconfTopologySetup setup,
93                                final RemoteDeviceId id, final Timeout actorResponseWaitTime,
94                                final DOMMountPointService mountPointService) {
95         this.setup = setup;
96         this.id = id;
97         this.schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry();
98         this.schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository();
99         this.actorResponseWaitTime = actorResponseWaitTime;
100         this.writeTxIdleTimeout = setup.getIdleTimeout();
101         this.mountPointService = mountPointService;
102     }
103
104     @SuppressWarnings("checkstyle:IllegalCatch")
105     @Override
106     public void handleReceive(final Object message) {
107         LOG.debug("{}:  received message {}", id, message);
108
109         if (message instanceof CreateInitialMasterActorData) { // master
110
111             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
112             sourceIdentifiers = masterActorData.getSourceIndentifiers();
113             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
114             final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
115             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
116             this.deviceRpc = masterActorData.getDeviceRpc();
117
118             sender().tell(new MasterActorDataInitialized(), self());
119
120             LOG.debug("{}: Master is ready.", id);
121
122         } else if (message instanceof  RefreshSetupMasterActorData) {
123             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
124             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
125             sender().tell(new MasterActorDataInitialized(), self());
126         } else if (message instanceof AskForMasterMountPoint) { // master
127             AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message;
128
129             // only master contains reference to deviceDataBroker
130             if (deviceDataBroker != null) {
131                 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
132                 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
133                         sender());
134             } else {
135                 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
136                 sender().tell(new Failure(new NotMasterException(self())), self());
137             }
138
139         } else if (message instanceof YangTextSchemaSourceRequest) { // master
140
141             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
142             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
143
144         } else if (message instanceof NewReadTransactionRequest) { // master
145             sender().tell(new Success(readTxActor), self());
146         } else if (message instanceof NewWriteTransactionRequest) { // master
147             try {
148                 final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
149                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
150                 sender().tell(new Success(txActor), self());
151             } catch (final Exception t) {
152                 sender().tell(new Failure(t), self());
153             }
154
155         } else if (message instanceof NewReadWriteTransactionRequest) {
156             try {
157                 final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
158                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
159                 sender().tell(new Success(txActor), self());
160             } catch (final Exception t) {
161                 sender().tell(new Failure(t), self());
162             }
163         } else if (message instanceof InvokeRpcMessage) { // master
164             final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
165             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
166
167         } else if (message instanceof RegisterMountPoint) { //slaves
168             RegisterMountPoint registerMountPoint = (RegisterMountPoint)message;
169             sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
170             registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
171             sender().tell(new Success(null), self());
172         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
173             unregisterSlaveMountPoint();
174         } else if (message instanceof RefreshSlaveActor) { //slave
175             actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
176             id = ((RefreshSlaveActor) message).getId();
177             schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
178             setup = ((RefreshSlaveActor) message).getSetup();
179             schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
180         }
181
182     }
183
184     @Override
185     public void postStop() throws Exception {
186         try {
187             super.postStop();
188         } finally {
189             unregisterSlaveMountPoint();
190         }
191     }
192
193     private void unregisterSlaveMountPoint() {
194         if (slaveSalManager != null) {
195             slaveSalManager.close();
196             slaveSalManager = null;
197         }
198
199         closeSchemaSourceRegistrations();
200     }
201
202     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
203         final ListenableFuture<YangTextSchemaSource> schemaSourceFuture =
204                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
205
206         Futures.addCallback(schemaSourceFuture, new FutureCallback<YangTextSchemaSource>() {
207             @Override
208             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
209                 try {
210                     LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
211                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
212                 } catch (IOException e) {
213                     sender.tell(new Failure(e), getSelf());
214                 }
215             }
216
217             @Override
218             public void onFailure(final Throwable throwable) {
219                 LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
220                 sender.tell(new Failure(throwable), getSelf());
221             }
222         }, MoreExecutors.directExecutor());
223     }
224
225     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
226                                 final ActorRef recipient) {
227
228         LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage,
229                 deviceRpc);
230
231         final ListenableFuture<DOMRpcResult> rpcResult = deviceRpc.invokeRpc(schemaPath,
232                 normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
233
234         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
235             @Override
236             public void onSuccess(final DOMRpcResult domRpcResult) {
237                 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, schemaPath, domRpcResult);
238
239                 if (domRpcResult == null) {
240                     recipient.tell(new EmptyResultResponse(), getSender());
241                     return;
242                 }
243                 NormalizedNodeMessage nodeMessageReply = null;
244                 if (domRpcResult.getResult() != null) {
245                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
246                             domRpcResult.getResult());
247                 }
248                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
249             }
250
251             @Override
252             public void onFailure(final Throwable throwable) {
253                 recipient.tell(new Failure(throwable), getSelf());
254             }
255         }, MoreExecutors.directExecutor());
256     }
257
258     private void registerSlaveMountPoint(final ActorRef masterReference) {
259         unregisterSlaveMountPoint();
260
261         slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
262
263         resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
264     }
265
266     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
267         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
268     }
269
270     private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
271         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
272                 new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
273         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
274                 getContext().dispatcher());
275
276         registeredSchemas = sourceIdentifiers.stream()
277                 .map(sourceId ->
278                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
279                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
280                 .collect(Collectors.toList());
281
282         return schemaRepository.createEffectiveModelContextFactory();
283     }
284
285     private void resolveSchemaContext(final EffectiveModelContextFactory schemaContextFactory,
286             final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) {
287         final ListenableFuture<EffectiveModelContext> schemaContextFuture =
288                 schemaContextFactory.createEffectiveModelContext(sourceIdentifiers);
289         Futures.addCallback(schemaContextFuture, new FutureCallback<SchemaContext>() {
290             @Override
291             public void onSuccess(final SchemaContext result) {
292                 executeInSelf(() -> {
293                     // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
294                     // resolution.
295                     if (slaveSalManager == localSlaveSalManager) {
296                         LOG.info("{}: Schema context resolved: {} - registering slave mount point",
297                                 id, result.getModules());
298                         slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
299                                 masterReference);
300                     }
301                 });
302             }
303
304             @Override
305             public void onFailure(final Throwable throwable) {
306                 executeInSelf(() -> {
307                     if (slaveSalManager == localSlaveSalManager) {
308                         final Throwable cause = Throwables.getRootCause(throwable);
309                         if (cause instanceof AskTimeoutException) {
310                             if (tries <= 5 || tries % 10 == 0) {
311                                 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
312                             }
313
314                             resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
315                                     masterReference, tries + 1);
316                         } else {
317                             LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
318                                     id, throwable);
319                             closeSchemaSourceRegistrations();
320                         }
321                     }
322                 });
323             }
324         }, MoreExecutors.directExecutor());
325     }
326
327     private void closeSchemaSourceRegistrations() {
328         if (registeredSchemas != null) {
329             registeredSchemas.forEach(SchemaSourceRegistration::close);
330             registeredSchemas = null;
331         }
332     }
333 }