2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status.Failure;
13 import akka.actor.Status.Success;
14 import akka.pattern.AskTimeoutException;
15 import akka.util.Timeout;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.io.IOException;
22 import java.time.Duration;
23 import java.util.List;
24 import java.util.stream.Collectors;
25 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
26 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
27 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
28 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
29 import org.opendaylight.mdsal.dom.api.DOMActionResult;
30 import org.opendaylight.mdsal.dom.api.DOMActionService;
31 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
35 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
36 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
37 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
38 import org.opendaylight.mdsal.dom.api.DOMRpcService;
39 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
40 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
41 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices;
42 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices.Actions;
43 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices.Rpcs;
44 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService;
45 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
46 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
47 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
48 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
49 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
50 import org.opendaylight.netconf.topology.singleton.messages.ContainerNodeMessage;
51 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
52 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
53 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
54 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
55 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
56 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
57 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
58 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
59 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
60 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage;
61 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply;
62 import org.opendaylight.netconf.topology.singleton.messages.netconf.NetconfDataTreeServiceRequest;
63 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
64 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
65 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
66 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
67 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
68 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
69 import org.opendaylight.yangtools.yang.common.QName;
70 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
71 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
72 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
73 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
74 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
75 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
76 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
77 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
78 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
79 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
80 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
82 public class NetconfNodeActor extends AbstractUntypedActor {
83 private final Duration writeTxIdleTimeout;
84 private final DOMMountPointService mountPointService;
86 private SchemaSourceRegistry schemaRegistry;
87 private SchemaRepository schemaRepository;
88 private Timeout actorResponseWaitTime;
89 private RemoteDeviceId id;
90 private NetconfTopologySetup setup;
91 private List<SourceIdentifier> sourceIdentifiers;
92 private DOMRpcService deviceRpc;
93 private DOMActionService deviceAction;
94 private SlaveSalFacade slaveSalManager;
95 private DOMDataBroker deviceDataBroker;
96 private NetconfDataTreeService netconfService;
97 //readTxActor can be shared
98 private ActorRef readTxActor;
99 private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
101 public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id,
102 final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
103 return Props.create(NetconfNodeActor.class, () ->
104 new NetconfNodeActor(setup, id, actorResponseWaitTime, mountPointService));
107 protected NetconfNodeActor(final NetconfTopologySetup setup,
108 final RemoteDeviceId id, final Timeout actorResponseWaitTime,
109 final DOMMountPointService mountPointService) {
112 schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry();
113 schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository();
114 this.actorResponseWaitTime = actorResponseWaitTime;
115 writeTxIdleTimeout = setup.getIdleTimeout();
116 this.mountPointService = mountPointService;
119 @SuppressWarnings("checkstyle:IllegalCatch")
121 public void handleReceive(final Object message) {
122 LOG.debug("{}: received message {}", id, message);
124 if (message instanceof CreateInitialMasterActorData masterActorData) { // master
125 sourceIdentifiers = masterActorData.getSourceIndentifiers();
126 deviceDataBroker = masterActorData.getDeviceDataBroker();
127 netconfService = masterActorData.getNetconfDataTreeService();
128 final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
129 readTxActor = context().actorOf(ReadTransactionActor.props(tx));
131 final var deviceServices = masterActorData.getDeviceServices();
132 deviceRpc = deviceServices.rpcs() instanceof Rpcs.Normalized normalized ? normalized : null;
133 deviceAction = deviceServices.actions() instanceof Actions.Normalized normalized ? normalized : null;
135 sender().tell(new MasterActorDataInitialized(), self());
136 LOG.debug("{}: Master is ready.", id);
137 } else if (message instanceof RefreshSetupMasterActorData) {
138 setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
139 id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
140 sender().tell(new MasterActorDataInitialized(), self());
141 } else if (message instanceof AskForMasterMountPoint askForMasterMountPoint) { // master
142 // only master contains reference to deviceDataBroker
143 if (deviceDataBroker != null) {
144 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
145 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
148 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
149 sender().tell(new Failure(new NotMasterException(self())), self());
151 } else if (message instanceof YangTextSchemaSourceRequest yangTextSchemaSourceRequest) { // master
152 sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
153 } else if (message instanceof NewReadTransactionRequest) { // master
154 sender().tell(new Success(readTxActor), self());
155 } else if (message instanceof NewWriteTransactionRequest) { // master
157 final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
158 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
159 sender().tell(new Success(txActor), self());
160 } catch (final Exception t) {
161 sender().tell(new Failure(t), self());
163 } else if (message instanceof NewReadWriteTransactionRequest) {
165 final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
166 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
167 sender().tell(new Success(txActor), self());
168 } catch (final Exception t) {
169 sender().tell(new Failure(t), self());
171 } else if (message instanceof InvokeRpcMessage invokeRpcMessage) { // master
172 invokeSlaveRpc(invokeRpcMessage.getSchemaPath().lastNodeIdentifier(),
173 invokeRpcMessage.getNormalizedNodeMessage(), sender());
174 } else if (message instanceof InvokeActionMessage invokeActionMessage) { // master
175 LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString());
176 invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(),
177 invokeActionMessage.getDOMDataTreeIdentifier(), sender());
178 } else if (message instanceof RegisterMountPoint registerMountPoint) { //slaves
179 sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
180 registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
181 sender().tell(new Success(null), self());
182 } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
183 unregisterSlaveMountPoint();
184 } else if (message instanceof RefreshSlaveActor) { //slave
185 actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
186 id = ((RefreshSlaveActor) message).getId();
187 schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
188 setup = ((RefreshSlaveActor) message).getSetup();
189 schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
190 } else if (message instanceof NetconfDataTreeServiceRequest) {
191 ActorRef netconfActor = context()
192 .actorOf(NetconfDataTreeServiceActor.props(netconfService, writeTxIdleTimeout));
193 sender().tell(new Success(netconfActor), self());
198 public void postStop() throws Exception {
202 unregisterSlaveMountPoint();
206 private void unregisterSlaveMountPoint() {
207 if (slaveSalManager != null) {
208 slaveSalManager.close();
209 slaveSalManager = null;
212 closeSchemaSourceRegistrations();
215 private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
216 final ListenableFuture<YangTextSchemaSource> schemaSourceFuture =
217 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
219 Futures.addCallback(schemaSourceFuture, new FutureCallback<YangTextSchemaSource>() {
221 public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
223 LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
224 sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
225 } catch (IOException e) {
226 sender.tell(new Failure(e), getSelf());
231 public void onFailure(final Throwable throwable) {
232 LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
233 sender.tell(new Failure(throwable), getSelf());
235 }, MoreExecutors.directExecutor());
238 private void invokeSlaveRpc(final QName qname, final NormalizedNodeMessage normalizedNodeMessage,
239 final ActorRef recipient) {
240 LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, qname, normalizedNodeMessage,
243 final ListenableFuture<? extends DOMRpcResult> rpcResult = deviceRpc.invokeRpc(qname,
244 normalizedNodeMessage != null ? (ContainerNode) normalizedNodeMessage.getNode() : null);
246 Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
248 public void onSuccess(final DOMRpcResult domRpcResult) {
249 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, qname, domRpcResult);
251 if (domRpcResult == null) {
252 recipient.tell(new EmptyResultResponse(), getSender());
255 NormalizedNodeMessage nodeMessageReply = null;
256 if (domRpcResult.value() != null) {
257 nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), domRpcResult.value());
259 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.errors()), getSelf());
263 public void onFailure(final Throwable throwable) {
264 recipient.tell(new Failure(throwable), getSelf());
266 }, MoreExecutors.directExecutor());
270 * Invoking Action on Slave Node in Odl Cluster Environment.
272 * @param schemaPath {@link Absolute}
273 * @param containerNodeMessage {@link ContainerNodeMessage}
274 * @param domDataTreeIdentifier {@link DOMDataTreeIdentifier}
275 * @param recipient {@link ActorRef}
277 private void invokeSlaveAction(final Absolute schemaPath, final ContainerNodeMessage containerNodeMessage,
278 final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) {
279 LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath,
280 containerNodeMessage, domDataTreeIdentifier, deviceAction);
282 final ListenableFuture<? extends DOMActionResult> actionResult = deviceAction.invokeAction(schemaPath,
283 domDataTreeIdentifier, containerNodeMessage != null ? containerNodeMessage.getNode() : null);
285 Futures.addCallback(actionResult, new FutureCallback<DOMActionResult>() {
288 public void onSuccess(final DOMActionResult domActionResult) {
289 LOG.debug("{}: invokeSlaveAction for {}, domActionResult: {}", id, schemaPath, domActionResult);
290 if (domActionResult == null) {
291 recipient.tell(new EmptyResultResponse(), getSender());
295 //Check DomActionResult containing Ok onSuccess pass empty nodeMessageReply
296 ContainerNodeMessage nodeMessageReply = domActionResult.getOutput().map(ContainerNodeMessage::new)
298 recipient.tell(new InvokeActionMessageReply(nodeMessageReply, domActionResult.getErrors()), getSelf());
302 public void onFailure(final Throwable throwable) {
303 recipient.tell(new Failure(throwable), getSelf());
305 }, MoreExecutors.directExecutor());
308 private void registerSlaveMountPoint(final ActorRef masterReference) {
309 unregisterSlaveMountPoint();
311 slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
313 resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
316 private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
317 final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
318 new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
319 final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
320 getContext().dispatcher());
322 registeredSchemas = sourceIdentifiers.stream()
324 schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
325 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
326 .collect(Collectors.toList());
328 return schemaRepository.createEffectiveModelContextFactory();
331 private void resolveSchemaContext(final EffectiveModelContextFactory schemaContextFactory,
332 final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) {
333 final ListenableFuture<EffectiveModelContext> schemaContextFuture =
334 schemaContextFactory.createEffectiveModelContext(sourceIdentifiers);
335 Futures.addCallback(schemaContextFuture, new FutureCallback<EffectiveModelContext>() {
337 public void onSuccess(final EffectiveModelContext result) {
338 executeInSelf(() -> {
339 // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
341 if (slaveSalManager == localSlaveSalManager) {
342 LOG.info("{}: Schema context resolved: {} - registering slave mount point",
343 id, result.getModules());
344 final var actorSystem = setup.getActorSystem();
345 slaveSalManager.registerSlaveMountPoint(result, masterReference, new RemoteDeviceServices(
346 new ProxyDOMRpcService(actorSystem, masterReference, id, actorResponseWaitTime),
347 new ProxyDOMActionService(actorSystem, masterReference, id, actorResponseWaitTime)));
353 public void onFailure(final Throwable throwable) {
354 executeInSelf(() -> {
355 if (slaveSalManager == localSlaveSalManager) {
356 final Throwable cause = Throwables.getRootCause(throwable);
357 if (cause instanceof AskTimeoutException) {
358 if (tries <= 5 || tries % 10 == 0) {
359 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
362 resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
363 masterReference, tries + 1);
365 LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
367 closeSchemaSourceRegistrations();
372 }, MoreExecutors.directExecutor());
375 private void closeSchemaSourceRegistrations() {
376 if (registeredSchemas != null) {
377 registeredSchemas.forEach(SchemaSourceRegistration::close);
378 registeredSchemas = null;