Add slave/master end-to-end test
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status.Failure;
14 import akka.actor.Status.Success;
15 import akka.pattern.AskTimeoutException;
16 import akka.util.Timeout;
17 import com.google.common.base.Throwables;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.stream.Collectors;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
30 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
31 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
32 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
35 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
36 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
37 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
38 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
39 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
40 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
41 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
42 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
43 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
44 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
45 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
46 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
47 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
48 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
49 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
50 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
51 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
52 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
53 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
54 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
55 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
56 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
57 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
58 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
59 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
60 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
61 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
62 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
63 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
64 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
68 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
69 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
70 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
71 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
72 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
73 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
74 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
75 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
76 import scala.concurrent.duration.Duration;
77
78 public class NetconfNodeActor extends AbstractUntypedActor {
79
80     private final Duration writeTxIdleTimeout;
81     private final DOMMountPointService mountPointService;
82
83     private SchemaSourceRegistry schemaRegistry;
84     private SchemaRepository schemaRepository;
85     private Timeout actorResponseWaitTime;
86     private RemoteDeviceId id;
87     private NetconfTopologySetup setup;
88     private List<SourceIdentifier> sourceIdentifiers;
89     private DOMRpcService deviceRpc;
90     private SlaveSalFacade slaveSalManager;
91     private DOMDataBroker deviceDataBroker;
92     //readTxActor can be shared
93     private ActorRef readTxActor;
94     private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
95
96     public static Props props(final NetconfTopologySetup setup,
97                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
98                               final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
99                               final DOMMountPointService mountPointService) {
100         return Props.create(NetconfNodeActor.class, () ->
101                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime,
102                         mountPointService));
103     }
104
105     protected NetconfNodeActor(final NetconfTopologySetup setup,
106                                final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
107                                final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
108                                final DOMMountPointService mountPointService) {
109         this.setup = setup;
110         this.id = id;
111         this.schemaRegistry = schemaRegistry;
112         this.schemaRepository = schemaRepository;
113         this.actorResponseWaitTime = actorResponseWaitTime;
114         this.writeTxIdleTimeout = setup.getIdleTimeout();
115         this.mountPointService = mountPointService;
116     }
117
118     @SuppressWarnings("checkstyle:IllegalCatch")
119     @Override
120     public void handleReceive(final Object message) throws Exception {
121         LOG.debug("{}:  received message {}", id, message);
122
123         if (message instanceof CreateInitialMasterActorData) { // master
124
125             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
126             sourceIdentifiers = masterActorData.getSourceIndentifiers();
127             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
128             final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
129             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
130             this.deviceRpc = masterActorData.getDeviceRpc();
131
132             sender().tell(new MasterActorDataInitialized(), self());
133
134             LOG.debug("{}: Master is ready.", id);
135
136         } else if (message instanceof  RefreshSetupMasterActorData) {
137             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
138             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
139             sender().tell(new MasterActorDataInitialized(), self());
140         } else if (message instanceof AskForMasterMountPoint) { // master
141             AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message;
142
143             // only master contains reference to deviceDataBroker
144             if (deviceDataBroker != null) {
145                 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
146                 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
147                         sender());
148             } else {
149                 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
150                 sender().tell(new Failure(new NotMasterException(self())), self());
151             }
152
153         } else if (message instanceof YangTextSchemaSourceRequest) { // master
154
155             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
156             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
157
158         } else if (message instanceof NewReadTransactionRequest) { // master
159
160             sender().tell(new NewReadTransactionReply(readTxActor), self());
161
162         } else if (message instanceof NewWriteTransactionRequest) { // master
163             try {
164                 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
165                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
166                 sender().tell(new NewWriteTransactionReply(txActor), self());
167             } catch (final Exception t) {
168                 sender().tell(t, self());
169             }
170
171         } else if (message instanceof NewReadWriteTransactionRequest) {
172             try {
173                 final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
174                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
175                 sender().tell(new NewReadWriteTransactionReply(txActor), self());
176             } catch (final Exception t) {
177                 sender().tell(t, self());
178             }
179         } else if (message instanceof InvokeRpcMessage) { // master
180             final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
181             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
182
183         } else if (message instanceof RegisterMountPoint) { //slaves
184             RegisterMountPoint registerMountPoint = (RegisterMountPoint)message;
185             sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
186             registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
187             sender().tell(new Success(null), self());
188         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
189             unregisterSlaveMountPoint();
190         } else if (message instanceof RefreshSlaveActor) { //slave
191             actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
192             id = ((RefreshSlaveActor) message).getId();
193             schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
194             setup = ((RefreshSlaveActor) message).getSetup();
195             schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
196         }
197
198     }
199
200     @Override
201     public void postStop() throws Exception {
202         try {
203             super.postStop();
204         } finally {
205             unregisterSlaveMountPoint();
206         }
207     }
208
209     private void unregisterSlaveMountPoint() {
210         if (slaveSalManager != null) {
211             slaveSalManager.close();
212             slaveSalManager = null;
213         }
214
215         closeSchemaSourceRegistrations();
216     }
217
218     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
219         final ListenableFuture<@NonNull YangTextSchemaSource> schemaSourceFuture =
220                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
221
222         Futures.addCallback(schemaSourceFuture, new FutureCallback<YangTextSchemaSource>() {
223             @Override
224             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
225                 try {
226                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
227                 } catch (IOException e) {
228                     sender.tell(new Failure(e), getSelf());
229                 }
230             }
231
232             @Override
233             public void onFailure(@Nonnull final Throwable throwable) {
234                 sender.tell(new Failure(throwable), getSelf());
235             }
236         }, MoreExecutors.directExecutor());
237     }
238
239     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
240                                 final ActorRef recipient) {
241
242         LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage,
243                 deviceRpc);
244
245         final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult = deviceRpc.invokeRpc(schemaPath,
246                 normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
247
248         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
249             @Override
250             public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
251                 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, schemaPath, domRpcResult);
252
253                 if (domRpcResult == null) {
254                     recipient.tell(new EmptyResultResponse(), getSender());
255                     return;
256                 }
257                 NormalizedNodeMessage nodeMessageReply = null;
258                 if (domRpcResult.getResult() != null) {
259                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
260                             domRpcResult.getResult());
261                 }
262                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
263             }
264
265             @Override
266             public void onFailure(@Nonnull final Throwable throwable) {
267                 recipient.tell(new Failure(throwable), getSelf());
268             }
269         }, MoreExecutors.directExecutor());
270     }
271
272     private void registerSlaveMountPoint(final ActorRef masterReference) {
273         unregisterSlaveMountPoint();
274
275         slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
276
277         resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
278     }
279
280     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
281         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
282     }
283
284     private SchemaContextFactory createSchemaContextFactory(final ActorRef masterReference) {
285         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
286                 new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
287         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
288                 getContext().dispatcher());
289
290         registeredSchemas = sourceIdentifiers.stream()
291                 .map(sourceId ->
292                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
293                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
294                 .collect(Collectors.toList());
295
296         return schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
297     }
298
299     private void resolveSchemaContext(final SchemaContextFactory schemaContextFactory,
300             final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, int tries) {
301         final ListenableFuture<SchemaContext> schemaContextFuture =
302                 schemaContextFactory.createSchemaContext(sourceIdentifiers);
303         Futures.addCallback(schemaContextFuture, new FutureCallback<SchemaContext>() {
304             @Override
305             public void onSuccess(@Nonnull final SchemaContext result) {
306                 executeInSelf(() -> {
307                     // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
308                     // resolution.
309                     if (slaveSalManager == localSlaveSalManager) {
310                         LOG.info("{}: Schema context resolved: {} - registering slave mount point",
311                                 id, result.getModules());
312                         slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
313                                 masterReference);
314                     }
315                 });
316             }
317
318             @Override
319             public void onFailure(@Nonnull final Throwable throwable) {
320                 executeInSelf(() -> {
321                     if (slaveSalManager == localSlaveSalManager) {
322                         final Throwable cause = Throwables.getRootCause(throwable);
323                         if (cause instanceof AskTimeoutException) {
324                             if (tries <= 5 || tries % 10 == 0) {
325                                 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
326                             }
327
328                             resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
329                                     masterReference, tries + 1);
330                         } else {
331                             LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
332                                     id, throwable);
333                             closeSchemaSourceRegistrations();
334                         }
335                     }
336                 });
337             }
338         }, MoreExecutors.directExecutor());
339     }
340
341     private void closeSchemaSourceRegistrations() {
342         if (registeredSchemas != null) {
343             registeredSchemas.forEach(SchemaSourceRegistration::close);
344             registeredSchemas = null;
345         }
346     }
347
348 }