Merge "Add unit tests for sal-netconf-connector transactions"
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import java.io.IOException;
18 import java.util.List;
19 import javax.annotation.Nonnull;
20 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
21 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
22 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
23 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
24 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
25 import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
26 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
27 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
28 import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl;
29 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
30 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
31 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
32 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
33 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
34 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
35 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
36 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
37 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
38 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
40 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
41 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
44 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
48 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
49 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
50 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
51 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
52 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
53 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
54 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
55 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public class NetconfNodeActor extends UntypedActor {
60
61     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
62
63     private NetconfTopologySetup setup;
64     private RemoteDeviceId id;
65     private final SchemaSourceRegistry schemaRegistry;
66     private final SchemaRepository schemaRepository;
67
68     private RemoteOperationTxProcessor operationsProcessor;
69     private List<SourceIdentifier> sourceIdentifiers;
70     private SlaveSalFacade slaveSalManager;
71
72     public static Props props(final NetconfTopologySetup setup,
73                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
74                               final SchemaRepository schemaRepository) {
75         return Props.create(NetconfNodeActor.class, () ->
76                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository));
77     }
78
79     private NetconfNodeActor(final NetconfTopologySetup setup,
80                              final RemoteDeviceId id, SchemaSourceRegistry schemaRegistry,
81                              final SchemaRepository schemaRepository) {
82         this.setup = setup;
83         this.id = id;
84         this.schemaRegistry = schemaRegistry;
85         this.schemaRepository = schemaRepository;
86     }
87
88     @Override
89     public void onReceive(final Object message) throws Exception {
90         if (message instanceof CreateInitialMasterActorData) { // master
91
92             sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers();
93             operationsProcessor =
94                     new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
95                             id);
96             sender().tell(new MasterActorDataInitialized(), self());
97
98             LOG.debug("{}: Master is ready.", id);
99
100         } else if (message instanceof  RefreshSetupMasterActorData) {
101             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
102             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
103             sender().tell(new MasterActorDataInitialized(), self());
104         } else if (message instanceof AskForMasterMountPoint) { // master
105             // only master contains reference to operations processor
106             if (operationsProcessor != null) {
107                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
108             }
109
110         } else if (message instanceof TransactionRequest) { // master
111
112             resolveProxyCalls(message, sender(), getSelf());
113
114         } else if (message instanceof YangTextSchemaSourceRequest) { // master
115
116             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
117             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
118
119         } else if (message instanceof RegisterMountPoint) { //slaves
120
121             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
122             registerSlaveMountPoint(getSender());
123
124         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
125             if (slaveSalManager != null) {
126                 slaveSalManager.close();
127                 slaveSalManager = null;
128             }
129
130         }
131     }
132
133     private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
134         if (message instanceof ReadRequest) {
135
136             final ReadRequest readRequest = (ReadRequest) message;
137             operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
138
139         } else if (message instanceof ExistsRequest) {
140
141             final ExistsRequest readRequest = (ExistsRequest) message;
142             operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
143
144         } else if (message instanceof MergeRequest) {
145
146             final MergeRequest mergeRequest = (MergeRequest) message;
147             operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage());
148
149         } else if (message instanceof PutRequest) {
150
151             final PutRequest putRequest = (PutRequest) message;
152             operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage());
153
154         } else if (message instanceof DeleteRequest) {
155
156             final DeleteRequest deleteRequest = (DeleteRequest) message;
157             operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath());
158
159         } else if (message instanceof CancelRequest) {
160
161             operationsProcessor.doCancel(recipient, futureSender);
162
163         } else if (message instanceof SubmitRequest) {
164
165             operationsProcessor.doSubmit(recipient, futureSender);
166         }
167     }
168
169     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
170         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
171                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
172
173         Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
174             @Override
175             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
176                 try {
177                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
178                 } catch (IOException exception) {
179                     sender.tell(exception.getCause(), getSelf());
180                 }
181             }
182
183             @Override
184             public void onFailure(@Nonnull final Throwable throwable) {
185                 sender.tell(throwable, getSelf());
186             }
187         });
188     }
189
190     private void registerSlaveMountPoint(final ActorRef masterReference) {
191         if (this.slaveSalManager != null) {
192             slaveSalManager.close();
193         }
194         slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem());
195
196         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
197                 getSchemaContext(masterReference);
198         final DOMRpcService deviceRpc = getDOMRpcService();
199
200         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
201             @Override
202             public void onSuccess(final SchemaContext result) {
203                 LOG.info("{}: Schema context resolved: {}", id, result.getModules());
204                 slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference);
205             }
206
207             @Override
208             public void onFailure(@Nonnull final Throwable throwable) {
209                 LOG.error("{}: Failed to register mount point: {}", id, throwable);
210             }
211         });
212     }
213
214     private DOMRpcService getDOMRpcService() {
215         return new ProxyDOMRpcService();
216     }
217
218     private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(ActorRef masterReference) {
219
220         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
221                 new ProxyYangTextSourceProvider(masterReference, getContext());
222         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
223                 getContext().dispatcher());
224
225         sourceIdentifiers.forEach(sourceId ->
226                 schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
227                         YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
228
229         final SchemaContextFactory schemaContextFactory
230                 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
231
232         return schemaContextFactory.createSchemaContext(sourceIdentifiers);
233     }
234
235 }