2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status.Failure;
13 import akka.actor.Status.Success;
14 import akka.pattern.AskTimeoutException;
15 import akka.util.Timeout;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.stream.Collectors;
24 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
25 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
26 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
27 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
28 import org.opendaylight.mdsal.dom.api.DOMActionResult;
29 import org.opendaylight.mdsal.dom.api.DOMActionService;
30 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
35 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
36 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
37 import org.opendaylight.mdsal.dom.api.DOMRpcService;
38 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService;
40 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
41 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
42 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
43 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
44 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
45 import org.opendaylight.netconf.topology.singleton.messages.ContainerNodeMessage;
46 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
47 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
48 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
49 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
50 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
51 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
52 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
53 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
54 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
55 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage;
56 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply;
57 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
58 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
59 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
60 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
61 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
62 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
64 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
65 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
66 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
67 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
68 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
69 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
70 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
71 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
72 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
73 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
74 import scala.concurrent.duration.Duration;
76 public class NetconfNodeActor extends AbstractUntypedActor {
77 private final Duration writeTxIdleTimeout;
78 private final DOMMountPointService mountPointService;
80 private SchemaSourceRegistry schemaRegistry;
81 private SchemaRepository schemaRepository;
82 private Timeout actorResponseWaitTime;
83 private RemoteDeviceId id;
84 private NetconfTopologySetup setup;
85 private List<SourceIdentifier> sourceIdentifiers;
86 private DOMRpcService deviceRpc;
87 private DOMActionService deviceAction;
88 private SlaveSalFacade slaveSalManager;
89 private DOMDataBroker deviceDataBroker;
90 //readTxActor can be shared
91 private ActorRef readTxActor;
92 private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
94 public static Props props(final NetconfTopologySetup setup,
95 final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
96 final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
97 final DOMMountPointService mountPointService) {
98 return Props.create(NetconfNodeActor.class, () ->
99 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime,
103 protected NetconfNodeActor(final NetconfTopologySetup setup,
104 final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
105 final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
106 final DOMMountPointService mountPointService) {
109 this.schemaRegistry = schemaRegistry;
110 this.schemaRepository = schemaRepository;
111 this.actorResponseWaitTime = actorResponseWaitTime;
112 this.writeTxIdleTimeout = setup.getIdleTimeout();
113 this.mountPointService = mountPointService;
116 @SuppressWarnings("checkstyle:IllegalCatch")
118 public void handleReceive(final Object message) {
119 LOG.debug("{}: received message {}", id, message);
121 if (message instanceof CreateInitialMasterActorData) { // master
123 final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
124 sourceIdentifiers = masterActorData.getSourceIndentifiers();
125 this.deviceDataBroker = masterActorData.getDeviceDataBroker();
126 final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
127 readTxActor = context().actorOf(ReadTransactionActor.props(tx));
128 this.deviceRpc = masterActorData.getDeviceRpc();
129 this.deviceAction = masterActorData.getDeviceAction();
131 sender().tell(new MasterActorDataInitialized(), self());
133 LOG.debug("{}: Master is ready.", id);
135 } else if (message instanceof RefreshSetupMasterActorData) {
136 setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
137 id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
138 sender().tell(new MasterActorDataInitialized(), self());
139 } else if (message instanceof AskForMasterMountPoint) { // master
140 AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message;
142 // only master contains reference to deviceDataBroker
143 if (deviceDataBroker != null) {
144 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
145 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
148 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
149 sender().tell(new Failure(new NotMasterException(self())), self());
152 } else if (message instanceof YangTextSchemaSourceRequest) { // master
154 final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
155 sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
157 } else if (message instanceof NewReadTransactionRequest) { // master
158 sender().tell(new Success(readTxActor), self());
159 } else if (message instanceof NewWriteTransactionRequest) { // master
161 final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
162 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
163 sender().tell(new Success(txActor), self());
164 } catch (final Exception t) {
165 sender().tell(new Failure(t), self());
168 } else if (message instanceof NewReadWriteTransactionRequest) {
170 final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
171 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
172 sender().tell(new Success(txActor), self());
173 } catch (final Exception t) {
174 sender().tell(new Failure(t), self());
176 } else if (message instanceof InvokeRpcMessage) { // master
177 final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
178 invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
180 } else if (message instanceof InvokeActionMessage) { // master
181 final InvokeActionMessage invokeActionMessage = (InvokeActionMessage) message;
182 LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString());
183 invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(),
184 invokeActionMessage.getDOMDataTreeIdentifier(), sender());
185 } else if (message instanceof RegisterMountPoint) { //slaves
186 RegisterMountPoint registerMountPoint = (RegisterMountPoint)message;
187 sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
188 registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
189 sender().tell(new Success(null), self());
190 } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
191 unregisterSlaveMountPoint();
192 } else if (message instanceof RefreshSlaveActor) { //slave
193 actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
194 id = ((RefreshSlaveActor) message).getId();
195 schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
196 setup = ((RefreshSlaveActor) message).getSetup();
197 schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
203 public void postStop() throws Exception {
207 unregisterSlaveMountPoint();
211 private void unregisterSlaveMountPoint() {
212 if (slaveSalManager != null) {
213 slaveSalManager.close();
214 slaveSalManager = null;
217 closeSchemaSourceRegistrations();
220 private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
221 final ListenableFuture<YangTextSchemaSource> schemaSourceFuture =
222 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
224 Futures.addCallback(schemaSourceFuture, new FutureCallback<YangTextSchemaSource>() {
226 public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
228 LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
229 sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
230 } catch (IOException e) {
231 sender.tell(new Failure(e), getSelf());
236 public void onFailure(final Throwable throwable) {
237 LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
238 sender.tell(new Failure(throwable), getSelf());
240 }, MoreExecutors.directExecutor());
243 private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
244 final ActorRef recipient) {
246 LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage,
249 final ListenableFuture<DOMRpcResult> rpcResult = deviceRpc.invokeRpc(schemaPath,
250 normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
252 Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
254 public void onSuccess(final DOMRpcResult domRpcResult) {
255 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, schemaPath, domRpcResult);
257 if (domRpcResult == null) {
258 recipient.tell(new EmptyResultResponse(), getSender());
261 NormalizedNodeMessage nodeMessageReply = null;
262 if (domRpcResult.getResult() != null) {
263 nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
264 domRpcResult.getResult());
266 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
270 public void onFailure(final Throwable throwable) {
271 recipient.tell(new Failure(throwable), getSelf());
273 }, MoreExecutors.directExecutor());
277 * Invoking Action on Slave Node in Odl Cluster Environment.
279 * @param schemaPath {@link SchemaPath}
280 * @param containerNodeMessage {@link ContainerNodeMessage}
281 * @param domDataTreeIdentifier {@link DOMDataTreeIdentifier}
282 * @param recipient {@link ActorRef}
284 private void invokeSlaveAction(final SchemaPath schemaPath, final ContainerNodeMessage containerNodeMessage,
285 final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) {
286 LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath,
287 containerNodeMessage, domDataTreeIdentifier, deviceAction);
289 final ListenableFuture<? extends DOMActionResult> actionResult = deviceAction.invokeAction(schemaPath,
290 domDataTreeIdentifier, containerNodeMessage != null ? containerNodeMessage.getNode() : null);
292 Futures.addCallback(actionResult, new FutureCallback<DOMActionResult>() {
295 public void onSuccess(final DOMActionResult domActionResult) {
296 LOG.debug("{}: invokeSlaveAction for {}, domActionResult: {}", id, schemaPath, domActionResult);
297 if (domActionResult == null) {
298 recipient.tell(new EmptyResultResponse(), getSender());
302 //Check DomActionResult containing Ok onSuccess pass empty nodeMessageReply
303 ContainerNodeMessage nodeMessageReply = domActionResult.getOutput().map(ContainerNodeMessage::new)
305 recipient.tell(new InvokeActionMessageReply(nodeMessageReply, domActionResult.getErrors()), getSelf());
309 public void onFailure(final Throwable throwable) {
310 recipient.tell(new Failure(throwable), getSelf());
312 }, MoreExecutors.directExecutor());
315 private void registerSlaveMountPoint(final ActorRef masterReference) {
316 unregisterSlaveMountPoint();
318 slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
320 resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
323 private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
324 return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
327 private DOMActionService getDOMActionService(final ActorRef masterReference) {
328 return new ProxyDOMActionService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
331 private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
332 final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
333 new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
334 final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
335 getContext().dispatcher());
337 registeredSchemas = sourceIdentifiers.stream()
339 schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
340 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
341 .collect(Collectors.toList());
343 return schemaRepository.createEffectiveModelContextFactory();
346 private void resolveSchemaContext(final EffectiveModelContextFactory schemaContextFactory,
347 final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) {
348 final ListenableFuture<EffectiveModelContext> schemaContextFuture =
349 schemaContextFactory.createEffectiveModelContext(sourceIdentifiers);
350 Futures.addCallback(schemaContextFuture, new FutureCallback<SchemaContext>() {
352 public void onSuccess(final SchemaContext result) {
353 executeInSelf(() -> {
354 // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
356 if (slaveSalManager == localSlaveSalManager) {
357 LOG.info("{}: Schema context resolved: {} - registering slave mount point",
358 id, result.getModules());
359 slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
360 getDOMActionService(masterReference), masterReference);
366 public void onFailure(final Throwable throwable) {
367 executeInSelf(() -> {
368 if (slaveSalManager == localSlaveSalManager) {
369 final Throwable cause = Throwables.getRootCause(throwable);
370 if (cause instanceof AskTimeoutException) {
371 if (tries <= 5 || tries % 10 == 0) {
372 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
375 resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
376 masterReference, tries + 1);
378 LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
380 closeSchemaSourceRegistrations();
385 }, MoreExecutors.directExecutor());
388 private void closeSchemaSourceRegistrations() {
389 if (registeredSchemas != null) {
390 registeredSchemas.forEach(SchemaSourceRegistration::close);
391 registeredSchemas = null;