2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status.Failure;
14 import akka.actor.Status.Success;
15 import akka.pattern.AskTimeoutException;
16 import akka.util.Timeout;
17 import com.google.common.base.Throwables;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.stream.Collectors;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
30 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
31 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
32 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
35 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
36 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
37 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
38 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
39 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
40 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
41 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
42 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
43 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
44 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
45 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
46 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
47 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
48 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
49 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
50 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
51 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
52 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
53 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
54 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
55 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
56 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
57 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
58 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
59 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
60 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
61 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
62 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
63 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
64 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
68 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
69 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
70 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
71 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
72 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
73 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
74 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
75 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
76 import scala.concurrent.duration.Duration;
78 public class NetconfNodeActor extends AbstractUntypedActor {
80 private final Duration writeTxIdleTimeout;
81 private final DOMMountPointService mountPointService;
83 private SchemaSourceRegistry schemaRegistry;
84 private SchemaRepository schemaRepository;
85 private Timeout actorResponseWaitTime;
86 private RemoteDeviceId id;
87 private NetconfTopologySetup setup;
88 private List<SourceIdentifier> sourceIdentifiers;
89 private DOMRpcService deviceRpc;
90 private SlaveSalFacade slaveSalManager;
91 private DOMDataBroker deviceDataBroker;
92 //readTxActor can be shared
93 private ActorRef readTxActor;
94 private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
96 public static Props props(final NetconfTopologySetup setup,
97 final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
98 final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
99 final DOMMountPointService mountPointService) {
100 return Props.create(NetconfNodeActor.class, () ->
101 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime,
105 protected NetconfNodeActor(final NetconfTopologySetup setup,
106 final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
107 final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
108 final DOMMountPointService mountPointService) {
111 this.schemaRegistry = schemaRegistry;
112 this.schemaRepository = schemaRepository;
113 this.actorResponseWaitTime = actorResponseWaitTime;
114 this.writeTxIdleTimeout = setup.getIdleTimeout();
115 this.mountPointService = mountPointService;
118 @SuppressWarnings("checkstyle:IllegalCatch")
120 public void handleReceive(final Object message) throws Exception {
121 LOG.debug("{}: received message {}", id, message);
123 if (message instanceof CreateInitialMasterActorData) { // master
125 final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
126 sourceIdentifiers = masterActorData.getSourceIndentifiers();
127 this.deviceDataBroker = masterActorData.getDeviceDataBroker();
128 final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
129 readTxActor = context().actorOf(ReadTransactionActor.props(tx));
130 this.deviceRpc = masterActorData.getDeviceRpc();
132 sender().tell(new MasterActorDataInitialized(), self());
134 LOG.debug("{}: Master is ready.", id);
136 } else if (message instanceof RefreshSetupMasterActorData) {
137 setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
138 id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
139 sender().tell(new MasterActorDataInitialized(), self());
140 } else if (message instanceof AskForMasterMountPoint) { // master
141 AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message;
143 // only master contains reference to deviceDataBroker
144 if (deviceDataBroker != null) {
145 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
146 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
149 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
150 sender().tell(new Failure(new NotMasterException(self())), self());
153 } else if (message instanceof YangTextSchemaSourceRequest) { // master
155 final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
156 sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
158 } else if (message instanceof NewReadTransactionRequest) { // master
160 sender().tell(new NewReadTransactionReply(readTxActor), self());
162 } else if (message instanceof NewWriteTransactionRequest) { // master
164 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
165 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
166 sender().tell(new NewWriteTransactionReply(txActor), self());
167 } catch (final Exception t) {
168 sender().tell(t, self());
171 } else if (message instanceof NewReadWriteTransactionRequest) {
173 final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
174 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
175 sender().tell(new NewReadWriteTransactionReply(txActor), self());
176 } catch (final Exception t) {
177 sender().tell(t, self());
179 } else if (message instanceof InvokeRpcMessage) { // master
180 final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
181 invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
183 } else if (message instanceof RegisterMountPoint) { //slaves
184 RegisterMountPoint registerMountPoint = (RegisterMountPoint)message;
185 sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
186 registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
187 sender().tell(new Success(null), self());
188 } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
189 unregisterSlaveMountPoint();
190 } else if (message instanceof RefreshSlaveActor) { //slave
191 actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
192 id = ((RefreshSlaveActor) message).getId();
193 schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
194 setup = ((RefreshSlaveActor) message).getSetup();
195 schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
201 public void postStop() throws Exception {
205 unregisterSlaveMountPoint();
209 private void unregisterSlaveMountPoint() {
210 if (slaveSalManager != null) {
211 slaveSalManager.close();
212 slaveSalManager = null;
215 closeSchemaSourceRegistrations();
218 private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
219 final ListenableFuture<@NonNull YangTextSchemaSource> yangTextSchemaSource =
220 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
222 Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
224 public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
226 sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
227 } catch (final IOException exception) {
228 sender.tell(exception.getCause(), getSelf());
233 public void onFailure(@Nonnull final Throwable throwable) {
234 sender.tell(throwable, getSelf());
236 }, MoreExecutors.directExecutor());
239 private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
240 final ActorRef recipient) {
242 final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
243 deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
245 Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
247 public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
248 if (domRpcResult == null) {
249 recipient.tell(new EmptyResultResponse(), getSender());
252 NormalizedNodeMessage nodeMessageReply = null;
253 if (domRpcResult.getResult() != null) {
254 nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
255 domRpcResult.getResult());
257 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
261 public void onFailure(@Nonnull final Throwable throwable) {
262 recipient.tell(throwable, getSelf());
264 }, MoreExecutors.directExecutor());
267 private void registerSlaveMountPoint(final ActorRef masterReference) {
268 unregisterSlaveMountPoint();
270 slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
272 resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
275 private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
276 return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
279 private SchemaContextFactory createSchemaContextFactory(final ActorRef masterReference) {
280 final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
281 new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
282 final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
283 getContext().dispatcher());
285 registeredSchemas = sourceIdentifiers.stream()
287 schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
288 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
289 .collect(Collectors.toList());
291 return schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
294 private void resolveSchemaContext(final SchemaContextFactory schemaContextFactory,
295 final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, int tries) {
296 final ListenableFuture<SchemaContext> schemaContextFuture =
297 schemaContextFactory.createSchemaContext(sourceIdentifiers);
298 Futures.addCallback(schemaContextFuture, new FutureCallback<SchemaContext>() {
300 public void onSuccess(@Nonnull final SchemaContext result) {
301 executeInSelf(() -> {
302 // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
304 if (slaveSalManager == localSlaveSalManager) {
305 LOG.info("{}: Schema context resolved: {} - registering slave mount point",
306 id, result.getModules());
307 slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
314 public void onFailure(@Nonnull final Throwable throwable) {
315 executeInSelf(() -> {
316 if (slaveSalManager == localSlaveSalManager) {
317 final Throwable cause = Throwables.getRootCause(throwable);
318 if (cause instanceof AskTimeoutException) {
319 if (tries <= 5 || tries % 10 == 0) {
320 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
323 resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
324 masterReference, tries + 1);
326 LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
332 }, MoreExecutors.directExecutor());
335 private void closeSchemaSourceRegistrations() {
336 if (registeredSchemas != null) {
337 registeredSchemas.forEach(SchemaSourceRegistration::close);
338 registeredSchemas = null;