2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.util.Timeout;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import java.io.IOException;
21 import java.util.List;
22 import java.util.stream.Collectors;
23 import javax.annotation.Nonnull;
24 import javax.annotation.Nullable;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
27 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
28 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
30 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
31 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
33 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
34 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
35 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
36 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
37 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
38 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
39 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
40 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
42 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
43 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
44 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
45 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
46 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
47 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
48 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
49 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
50 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
51 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
52 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
53 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
54 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
55 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
56 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
57 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
58 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
59 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
60 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
62 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
63 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
64 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
65 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
66 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
67 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
68 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
69 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
70 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73 import scala.concurrent.duration.Duration;
75 public final class NetconfNodeActor extends UntypedActor {
77 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
79 private final Duration writeTxIdleTimeout;
80 private final DOMMountPointService mountPointService;
82 private SchemaSourceRegistry schemaRegistry;
83 private SchemaRepository schemaRepository;
84 private Timeout actorResponseWaitTime;
85 private RemoteDeviceId id;
86 private NetconfTopologySetup setup;
87 private List<SourceIdentifier> sourceIdentifiers;
88 private DOMRpcService deviceRpc;
89 private SlaveSalFacade slaveSalManager;
90 private DOMDataBroker deviceDataBroker;
91 //readTxActor can be shared
92 private ActorRef readTxActor;
93 private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
95 public static Props props(final NetconfTopologySetup setup,
96 final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
97 final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
98 final DOMMountPointService mountPointService) {
99 return Props.create(NetconfNodeActor.class, () ->
100 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime,
104 private NetconfNodeActor(final NetconfTopologySetup setup,
105 final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
106 final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
107 final DOMMountPointService mountPointService) {
110 this.schemaRegistry = schemaRegistry;
111 this.schemaRepository = schemaRepository;
112 this.actorResponseWaitTime = actorResponseWaitTime;
113 this.writeTxIdleTimeout = setup.getIdleTimeout();
114 this.mountPointService = mountPointService;
117 @SuppressWarnings("checkstyle:IllegalCatch")
119 public void onReceive(final Object message) throws Exception {
120 if (message instanceof CreateInitialMasterActorData) { // master
122 final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
123 sourceIdentifiers = masterActorData.getSourceIndentifiers();
124 this.deviceDataBroker = masterActorData.getDeviceDataBroker();
125 final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
126 readTxActor = context().actorOf(ReadTransactionActor.props(tx));
127 this.deviceRpc = masterActorData.getDeviceRpc();
129 sender().tell(new MasterActorDataInitialized(), self());
131 LOG.debug("{}: Master is ready.", id);
133 } else if (message instanceof RefreshSetupMasterActorData) {
134 setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
135 id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
136 sender().tell(new MasterActorDataInitialized(), self());
137 } else if (message instanceof AskForMasterMountPoint) { // master
138 // only master contains reference to deviceDataBroker
139 if (deviceDataBroker != null) {
140 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
143 } else if (message instanceof YangTextSchemaSourceRequest) { // master
145 final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
146 sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
148 } else if (message instanceof NewReadTransactionRequest) { // master
150 sender().tell(new NewReadTransactionReply(readTxActor), self());
152 } else if (message instanceof NewWriteTransactionRequest) { // master
154 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
155 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
156 sender().tell(new NewWriteTransactionReply(txActor), self());
157 } catch (final Exception t) {
158 sender().tell(t, self());
161 } else if (message instanceof NewReadWriteTransactionRequest) {
163 final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
164 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
165 sender().tell(new NewReadWriteTransactionReply(txActor), self());
166 } catch (final Exception t) {
167 sender().tell(t, self());
169 } else if (message instanceof InvokeRpcMessage) { // master
171 final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
172 invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
174 } else if (message instanceof RegisterMountPoint) { //slaves
176 sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
177 registerSlaveMountPoint(getSender());
179 } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
180 if (slaveSalManager != null) {
181 slaveSalManager.close();
182 slaveSalManager = null;
184 } else if (message instanceof RefreshSlaveActor) { //slave
185 actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
186 id = ((RefreshSlaveActor) message).getId();
187 schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
188 setup = ((RefreshSlaveActor) message).getSetup();
189 schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
195 public void postStop() throws Exception {
197 closeSchemaSourceRegistrations();
200 private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
201 final ListenableFuture<@NonNull YangTextSchemaSource> yangTextSchemaSource =
202 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
204 Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
206 public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
208 sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
209 } catch (final IOException exception) {
210 sender.tell(exception.getCause(), getSelf());
215 public void onFailure(@Nonnull final Throwable throwable) {
216 sender.tell(throwable, getSelf());
218 }, MoreExecutors.directExecutor());
221 private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
222 final ActorRef recipient) {
224 final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
225 deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
227 Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
229 public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
230 if (domRpcResult == null) {
231 recipient.tell(new EmptyResultResponse(), getSender());
234 NormalizedNodeMessage nodeMessageReply = null;
235 if (domRpcResult.getResult() != null) {
236 nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
237 domRpcResult.getResult());
239 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
243 public void onFailure(@Nonnull final Throwable throwable) {
244 recipient.tell(throwable, getSelf());
246 }, MoreExecutors.directExecutor());
249 private void registerSlaveMountPoint(final ActorRef masterReference) {
250 if (this.slaveSalManager != null) {
251 slaveSalManager.close();
253 closeSchemaSourceRegistrations();
254 slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime,
257 final ListenableFuture<SchemaContext> remoteSchemaContext = getSchemaContext(masterReference);
258 final DOMRpcService deviceRpcService = getDOMRpcService(masterReference);
260 Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
262 public void onSuccess(final SchemaContext result) {
263 LOG.info("{}: Schema context resolved: {}", id, result.getModules());
264 slaveSalManager.registerSlaveMountPoint(result, deviceRpcService, masterReference);
268 public void onFailure(@Nonnull final Throwable throwable) {
269 LOG.error("{}: Failed to register mount point: {}", id, throwable);
271 }, MoreExecutors.directExecutor());
274 private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
275 return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
278 private ListenableFuture<SchemaContext> getSchemaContext(final ActorRef masterReference) {
280 final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
281 new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
282 final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
283 getContext().dispatcher());
285 registeredSchemas = sourceIdentifiers.stream()
287 schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
288 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
289 .collect(Collectors.toList());
291 final SchemaContextFactory schemaContextFactory
292 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
294 return schemaContextFactory.createSchemaContext(sourceIdentifiers);
297 private void closeSchemaSourceRegistrations() {
298 if (registeredSchemas != null) {
299 registeredSchemas.forEach(SchemaSourceRegistration::close);
300 registeredSchemas = null;