2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status.Failure;
13 import akka.actor.Status.Success;
14 import akka.pattern.AskTimeoutException;
15 import akka.util.Timeout;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.stream.Collectors;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
28 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
29 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
30 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
31 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
35 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
36 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
37 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
38 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
40 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
41 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
43 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
44 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
45 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
46 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
47 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
48 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
49 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
50 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
51 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
52 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
53 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
54 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
55 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
56 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
57 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
58 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
59 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
60 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
61 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
62 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
63 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
64 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
65 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
66 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
67 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
68 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
69 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
70 import scala.concurrent.duration.Duration;
72 public class NetconfNodeActor extends AbstractUntypedActor {
74 private final Duration writeTxIdleTimeout;
75 private final DOMMountPointService mountPointService;
77 private SchemaSourceRegistry schemaRegistry;
78 private SchemaRepository schemaRepository;
79 private Timeout actorResponseWaitTime;
80 private RemoteDeviceId id;
81 private NetconfTopologySetup setup;
82 private List<SourceIdentifier> sourceIdentifiers;
83 private DOMRpcService deviceRpc;
84 private SlaveSalFacade slaveSalManager;
85 private DOMDataBroker deviceDataBroker;
86 //readTxActor can be shared
87 private ActorRef readTxActor;
88 private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
90 public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id,
91 final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
92 return Props.create(NetconfNodeActor.class, () ->
93 new NetconfNodeActor(setup, id, actorResponseWaitTime, mountPointService));
96 protected NetconfNodeActor(final NetconfTopologySetup setup,
97 final RemoteDeviceId id, final Timeout actorResponseWaitTime,
98 final DOMMountPointService mountPointService) {
101 this.schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry();
102 this.schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository();
103 this.actorResponseWaitTime = actorResponseWaitTime;
104 this.writeTxIdleTimeout = setup.getIdleTimeout();
105 this.mountPointService = mountPointService;
108 @SuppressWarnings("checkstyle:IllegalCatch")
110 public void handleReceive(final Object message) {
111 LOG.debug("{}: received message {}", id, message);
113 if (message instanceof CreateInitialMasterActorData) { // master
115 final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
116 sourceIdentifiers = masterActorData.getSourceIndentifiers();
117 this.deviceDataBroker = masterActorData.getDeviceDataBroker();
118 final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
119 readTxActor = context().actorOf(ReadTransactionActor.props(tx));
120 this.deviceRpc = masterActorData.getDeviceRpc();
122 sender().tell(new MasterActorDataInitialized(), self());
124 LOG.debug("{}: Master is ready.", id);
126 } else if (message instanceof RefreshSetupMasterActorData) {
127 setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
128 id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
129 sender().tell(new MasterActorDataInitialized(), self());
130 } else if (message instanceof AskForMasterMountPoint) { // master
131 AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message;
133 // only master contains reference to deviceDataBroker
134 if (deviceDataBroker != null) {
135 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
136 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
139 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
140 sender().tell(new Failure(new NotMasterException(self())), self());
143 } else if (message instanceof YangTextSchemaSourceRequest) { // master
145 final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
146 sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
148 } else if (message instanceof NewReadTransactionRequest) { // master
149 sender().tell(new Success(readTxActor), self());
150 } else if (message instanceof NewWriteTransactionRequest) { // master
152 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
153 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
154 sender().tell(new Success(txActor), self());
155 } catch (final Exception t) {
156 sender().tell(new Failure(t), self());
159 } else if (message instanceof NewReadWriteTransactionRequest) {
161 final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
162 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
163 sender().tell(new Success(txActor), self());
164 } catch (final Exception t) {
165 sender().tell(new Failure(t), self());
167 } else if (message instanceof InvokeRpcMessage) { // master
168 final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
169 invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
171 } else if (message instanceof RegisterMountPoint) { //slaves
172 RegisterMountPoint registerMountPoint = (RegisterMountPoint)message;
173 sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
174 registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
175 sender().tell(new Success(null), self());
176 } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
177 unregisterSlaveMountPoint();
178 } else if (message instanceof RefreshSlaveActor) { //slave
179 actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
180 id = ((RefreshSlaveActor) message).getId();
181 schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
182 setup = ((RefreshSlaveActor) message).getSetup();
183 schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
189 public void postStop() throws Exception {
193 unregisterSlaveMountPoint();
197 private void unregisterSlaveMountPoint() {
198 if (slaveSalManager != null) {
199 slaveSalManager.close();
200 slaveSalManager = null;
203 closeSchemaSourceRegistrations();
206 private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
207 final ListenableFuture<@NonNull YangTextSchemaSource> schemaSourceFuture =
208 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
210 Futures.addCallback(schemaSourceFuture, new FutureCallback<YangTextSchemaSource>() {
212 public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
214 LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
215 sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
216 } catch (IOException e) {
217 sender.tell(new Failure(e), getSelf());
222 public void onFailure(@Nonnull final Throwable throwable) {
223 LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
224 sender.tell(new Failure(throwable), getSelf());
226 }, MoreExecutors.directExecutor());
229 private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
230 final ActorRef recipient) {
232 LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage,
235 final ListenableFuture<DOMRpcResult> rpcResult = deviceRpc.invokeRpc(schemaPath,
236 normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
238 Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
240 public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
241 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, schemaPath, domRpcResult);
243 if (domRpcResult == null) {
244 recipient.tell(new EmptyResultResponse(), getSender());
247 NormalizedNodeMessage nodeMessageReply = null;
248 if (domRpcResult.getResult() != null) {
249 nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
250 domRpcResult.getResult());
252 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
256 public void onFailure(@Nonnull final Throwable throwable) {
257 recipient.tell(new Failure(throwable), getSelf());
259 }, MoreExecutors.directExecutor());
262 private void registerSlaveMountPoint(final ActorRef masterReference) {
263 unregisterSlaveMountPoint();
265 slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
267 resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
270 private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
271 return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
274 private SchemaContextFactory createSchemaContextFactory(final ActorRef masterReference) {
275 final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
276 new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
277 final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
278 getContext().dispatcher());
280 registeredSchemas = sourceIdentifiers.stream()
282 schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
283 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
284 .collect(Collectors.toList());
286 return schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
289 private void resolveSchemaContext(final SchemaContextFactory schemaContextFactory,
290 final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, int tries) {
291 final ListenableFuture<SchemaContext> schemaContextFuture =
292 schemaContextFactory.createSchemaContext(sourceIdentifiers);
293 Futures.addCallback(schemaContextFuture, new FutureCallback<SchemaContext>() {
295 public void onSuccess(@Nonnull final SchemaContext result) {
296 executeInSelf(() -> {
297 // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
299 if (slaveSalManager == localSlaveSalManager) {
300 LOG.info("{}: Schema context resolved: {} - registering slave mount point",
301 id, result.getModules());
302 slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
309 public void onFailure(@Nonnull final Throwable throwable) {
310 executeInSelf(() -> {
311 if (slaveSalManager == localSlaveSalManager) {
312 final Throwable cause = Throwables.getRootCause(throwable);
313 if (cause instanceof AskTimeoutException) {
314 if (tries <= 5 || tries % 10 == 0) {
315 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
318 resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
319 masterReference, tries + 1);
321 LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
323 closeSchemaSourceRegistrations();
328 }, MoreExecutors.directExecutor());
331 private void closeSchemaSourceRegistrations() {
332 if (registeredSchemas != null) {
333 registeredSchemas.forEach(SchemaSourceRegistration::close);
334 registeredSchemas = null;