Merge "NECONF-524 : Setting the netconf keepalive logic to be more proactive."
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status.Failure;
13 import akka.actor.Status.Success;
14 import akka.pattern.AskTimeoutException;
15 import akka.util.Timeout;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.stream.Collectors;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
28 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
29 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
30 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
31 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
35 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
36 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
37 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
38 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
40 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
41 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
43 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
44 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
45 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
46 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
47 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
48 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
49 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
50 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
51 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
52 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
53 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
54 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
55 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
56 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
57 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
58 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
59 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
60 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
61 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
62 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
63 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
64 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
65 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
66 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
67 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
68 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
69 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
70 import scala.concurrent.duration.Duration;
71
72 public class NetconfNodeActor extends AbstractUntypedActor {
73
74     private final Duration writeTxIdleTimeout;
75     private final DOMMountPointService mountPointService;
76
77     private SchemaSourceRegistry schemaRegistry;
78     private SchemaRepository schemaRepository;
79     private Timeout actorResponseWaitTime;
80     private RemoteDeviceId id;
81     private NetconfTopologySetup setup;
82     private List<SourceIdentifier> sourceIdentifiers;
83     private DOMRpcService deviceRpc;
84     private SlaveSalFacade slaveSalManager;
85     private DOMDataBroker deviceDataBroker;
86     //readTxActor can be shared
87     private ActorRef readTxActor;
88     private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
89
90     public static Props props(final NetconfTopologySetup setup,
91                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
92                               final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
93                               final DOMMountPointService mountPointService) {
94         return Props.create(NetconfNodeActor.class, () ->
95                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime,
96                         mountPointService));
97     }
98
99     protected NetconfNodeActor(final NetconfTopologySetup setup,
100                                final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
101                                final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
102                                final DOMMountPointService mountPointService) {
103         this.setup = setup;
104         this.id = id;
105         this.schemaRegistry = schemaRegistry;
106         this.schemaRepository = schemaRepository;
107         this.actorResponseWaitTime = actorResponseWaitTime;
108         this.writeTxIdleTimeout = setup.getIdleTimeout();
109         this.mountPointService = mountPointService;
110     }
111
112     @SuppressWarnings("checkstyle:IllegalCatch")
113     @Override
114     public void handleReceive(final Object message) {
115         LOG.debug("{}:  received message {}", id, message);
116
117         if (message instanceof CreateInitialMasterActorData) { // master
118
119             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
120             sourceIdentifiers = masterActorData.getSourceIndentifiers();
121             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
122             final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
123             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
124             this.deviceRpc = masterActorData.getDeviceRpc();
125
126             sender().tell(new MasterActorDataInitialized(), self());
127
128             LOG.debug("{}: Master is ready.", id);
129
130         } else if (message instanceof  RefreshSetupMasterActorData) {
131             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
132             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
133             sender().tell(new MasterActorDataInitialized(), self());
134         } else if (message instanceof AskForMasterMountPoint) { // master
135             AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message;
136
137             // only master contains reference to deviceDataBroker
138             if (deviceDataBroker != null) {
139                 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
140                 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
141                         sender());
142             } else {
143                 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
144                 sender().tell(new Failure(new NotMasterException(self())), self());
145             }
146
147         } else if (message instanceof YangTextSchemaSourceRequest) { // master
148
149             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
150             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
151
152         } else if (message instanceof NewReadTransactionRequest) { // master
153             sender().tell(new Success(readTxActor), self());
154         } else if (message instanceof NewWriteTransactionRequest) { // master
155             try {
156                 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
157                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
158                 sender().tell(new Success(txActor), self());
159             } catch (final Exception t) {
160                 sender().tell(new Failure(t), self());
161             }
162
163         } else if (message instanceof NewReadWriteTransactionRequest) {
164             try {
165                 final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
166                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
167                 sender().tell(new Success(txActor), self());
168             } catch (final Exception t) {
169                 sender().tell(new Failure(t), self());
170             }
171         } else if (message instanceof InvokeRpcMessage) { // master
172             final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
173             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
174
175         } else if (message instanceof RegisterMountPoint) { //slaves
176             RegisterMountPoint registerMountPoint = (RegisterMountPoint)message;
177             sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
178             registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
179             sender().tell(new Success(null), self());
180         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
181             unregisterSlaveMountPoint();
182         } else if (message instanceof RefreshSlaveActor) { //slave
183             actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
184             id = ((RefreshSlaveActor) message).getId();
185             schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
186             setup = ((RefreshSlaveActor) message).getSetup();
187             schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
188         }
189
190     }
191
192     @Override
193     public void postStop() throws Exception {
194         try {
195             super.postStop();
196         } finally {
197             unregisterSlaveMountPoint();
198         }
199     }
200
201     private void unregisterSlaveMountPoint() {
202         if (slaveSalManager != null) {
203             slaveSalManager.close();
204             slaveSalManager = null;
205         }
206
207         closeSchemaSourceRegistrations();
208     }
209
210     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
211         final ListenableFuture<@NonNull YangTextSchemaSource> schemaSourceFuture =
212                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
213
214         Futures.addCallback(schemaSourceFuture, new FutureCallback<YangTextSchemaSource>() {
215             @Override
216             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
217                 try {
218                     LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
219                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
220                 } catch (IOException e) {
221                     sender.tell(new Failure(e), getSelf());
222                 }
223             }
224
225             @Override
226             public void onFailure(@Nonnull final Throwable throwable) {
227                 LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
228                 sender.tell(new Failure(throwable), getSelf());
229             }
230         }, MoreExecutors.directExecutor());
231     }
232
233     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
234                                 final ActorRef recipient) {
235
236         LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage,
237                 deviceRpc);
238
239         final ListenableFuture<DOMRpcResult> rpcResult = deviceRpc.invokeRpc(schemaPath,
240                 normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
241
242         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
243             @Override
244             public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
245                 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, schemaPath, domRpcResult);
246
247                 if (domRpcResult == null) {
248                     recipient.tell(new EmptyResultResponse(), getSender());
249                     return;
250                 }
251                 NormalizedNodeMessage nodeMessageReply = null;
252                 if (domRpcResult.getResult() != null) {
253                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
254                             domRpcResult.getResult());
255                 }
256                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
257             }
258
259             @Override
260             public void onFailure(@Nonnull final Throwable throwable) {
261                 recipient.tell(new Failure(throwable), getSelf());
262             }
263         }, MoreExecutors.directExecutor());
264     }
265
266     private void registerSlaveMountPoint(final ActorRef masterReference) {
267         unregisterSlaveMountPoint();
268
269         slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
270
271         resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
272     }
273
274     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
275         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
276     }
277
278     private SchemaContextFactory createSchemaContextFactory(final ActorRef masterReference) {
279         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
280                 new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
281         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
282                 getContext().dispatcher());
283
284         registeredSchemas = sourceIdentifiers.stream()
285                 .map(sourceId ->
286                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
287                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
288                 .collect(Collectors.toList());
289
290         return schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
291     }
292
293     private void resolveSchemaContext(final SchemaContextFactory schemaContextFactory,
294             final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, int tries) {
295         final ListenableFuture<SchemaContext> schemaContextFuture =
296                 schemaContextFactory.createSchemaContext(sourceIdentifiers);
297         Futures.addCallback(schemaContextFuture, new FutureCallback<SchemaContext>() {
298             @Override
299             public void onSuccess(@Nonnull final SchemaContext result) {
300                 executeInSelf(() -> {
301                     // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
302                     // resolution.
303                     if (slaveSalManager == localSlaveSalManager) {
304                         LOG.info("{}: Schema context resolved: {} - registering slave mount point",
305                                 id, result.getModules());
306                         slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
307                                 masterReference);
308                     }
309                 });
310             }
311
312             @Override
313             public void onFailure(@Nonnull final Throwable throwable) {
314                 executeInSelf(() -> {
315                     if (slaveSalManager == localSlaveSalManager) {
316                         final Throwable cause = Throwables.getRootCause(throwable);
317                         if (cause instanceof AskTimeoutException) {
318                             if (tries <= 5 || tries % 10 == 0) {
319                                 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
320                             }
321
322                             resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
323                                     masterReference, tries + 1);
324                         } else {
325                             LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
326                                     id, throwable);
327                             closeSchemaSourceRegistrations();
328                         }
329                     }
330                 });
331             }
332         }, MoreExecutors.directExecutor());
333     }
334
335     private void closeSchemaSourceRegistrations() {
336         if (registeredSchemas != null) {
337             registeredSchemas.forEach(SchemaSourceRegistration::close);
338             registeredSchemas = null;
339         }
340     }
341
342 }