2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import java.io.IOException;
18 import java.util.List;
19 import javax.annotation.Nonnull;
20 import javax.annotation.Nullable;
21 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
22 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
23 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
26 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
27 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
28 import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
29 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
30 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
31 import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl;
32 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
33 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
34 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
35 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
36 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
37 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
38 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
39 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
40 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
41 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
43 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
44 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
46 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
47 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
48 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
49 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
50 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
51 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
52 import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
56 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
57 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
58 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
59 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
60 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
61 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
62 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
63 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
64 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
68 public class NetconfNodeActor extends UntypedActor {
70 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
72 private NetconfTopologySetup setup;
73 private RemoteDeviceId id;
74 private final SchemaSourceRegistry schemaRegistry;
75 private final SchemaRepository schemaRepository;
77 private RemoteOperationTxProcessor operationsProcessor;
78 private List<SourceIdentifier> sourceIdentifiers;
79 private DOMRpcService deviceRpc;
80 private SlaveSalFacade slaveSalManager;
82 public static Props props(final NetconfTopologySetup setup,
83 final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
84 final SchemaRepository schemaRepository) {
85 return Props.create(NetconfNodeActor.class, () ->
86 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository));
89 private NetconfNodeActor(final NetconfTopologySetup setup,
90 final RemoteDeviceId id, SchemaSourceRegistry schemaRegistry,
91 final SchemaRepository schemaRepository) {
94 this.schemaRegistry = schemaRegistry;
95 this.schemaRepository = schemaRepository;
99 public void onReceive(final Object message) throws Exception {
100 if (message instanceof CreateInitialMasterActorData) { // master
102 sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers();
103 operationsProcessor =
104 new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
106 this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc();
108 sender().tell(new MasterActorDataInitialized(), self());
110 LOG.debug("{}: Master is ready.", id);
112 } else if (message instanceof RefreshSetupMasterActorData) {
113 setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
114 id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
115 sender().tell(new MasterActorDataInitialized(), self());
116 } else if (message instanceof AskForMasterMountPoint) { // master
117 // only master contains reference to operations processor
118 if (operationsProcessor != null) {
119 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
122 } else if (message instanceof TransactionRequest) { // master
124 resolveProxyCalls(message, sender(), getSelf());
126 } else if (message instanceof YangTextSchemaSourceRequest) { // master
128 final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
129 sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
131 } else if (message instanceof InvokeRpcMessage) {
133 final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
134 invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
136 } else if (message instanceof RegisterMountPoint) { //slaves
138 sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
139 registerSlaveMountPoint(getSender());
141 } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
142 if (slaveSalManager != null) {
143 slaveSalManager.close();
144 slaveSalManager = null;
150 private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
151 if (message instanceof ReadRequest) {
153 final ReadRequest readRequest = (ReadRequest) message;
154 operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
156 } else if (message instanceof ExistsRequest) {
158 final ExistsRequest readRequest = (ExistsRequest) message;
159 operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
161 } else if (message instanceof MergeRequest) {
163 final MergeRequest mergeRequest = (MergeRequest) message;
164 operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage());
166 } else if (message instanceof PutRequest) {
168 final PutRequest putRequest = (PutRequest) message;
169 operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage());
171 } else if (message instanceof DeleteRequest) {
173 final DeleteRequest deleteRequest = (DeleteRequest) message;
174 operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath());
176 } else if (message instanceof CancelRequest) {
178 operationsProcessor.doCancel(recipient, futureSender);
180 } else if (message instanceof SubmitRequest) {
182 operationsProcessor.doSubmit(recipient, futureSender);
186 private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
187 final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
188 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
190 Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
192 public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
194 sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
195 } catch (IOException exception) {
196 sender.tell(exception.getCause(), getSelf());
201 public void onFailure(@Nonnull final Throwable throwable) {
202 sender.tell(throwable, getSelf());
207 private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
208 final ActorRef recipient) {
210 final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
211 deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
213 Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
215 public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
216 if (domRpcResult == null) {
217 recipient.tell(new EmptyResultResponse(), getSender());
220 NormalizedNodeMessage nodeMessageReply = null;
221 if (domRpcResult.getResult() != null) {
222 nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
223 domRpcResult.getResult());
225 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
229 public void onFailure(@Nonnull final Throwable throwable) {
230 recipient.tell(throwable, getSelf());
235 private void registerSlaveMountPoint(ActorRef masterReference) {
236 if (this.slaveSalManager != null) {
237 slaveSalManager.close();
239 slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem());
241 final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
242 getSchemaContext(masterReference);
243 final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
245 Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
247 public void onSuccess(final SchemaContext result) {
248 LOG.info("{}: Schema context resolved: {}", id, result.getModules());
249 slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference);
253 public void onFailure(@Nonnull final Throwable throwable) {
254 LOG.error("{}: Failed to register mount point: {}", id, throwable);
259 private DOMRpcService getDOMRpcService(ActorRef masterReference) {
260 return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id);
263 private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(ActorRef masterReference) {
265 final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
266 new ProxyYangTextSourceProvider(masterReference, getContext());
267 final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
268 getContext().dispatcher());
270 sourceIdentifiers.forEach(sourceId ->
271 schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
272 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
274 final SchemaContextFactory schemaContextFactory
275 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
277 return schemaContextFactory.createSchemaContext(sourceIdentifiers);