Bug 8152: Transaction is already opened
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.actors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.util.Timeout;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import java.io.IOException;
19 import java.util.List;
20 import javax.annotation.Nonnull;
21 import javax.annotation.Nullable;
22 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
23 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
24 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
25 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
28 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
29 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
30 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
31 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
32 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
33 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
34 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
35 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
36 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
37 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
38 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
39 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
40 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
41 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
42 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
43 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
44 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
45 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
46 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
47 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
48 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
49 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
50 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
54 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
55 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
56 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
57 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
58 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
59 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
60 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
61 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
62 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import scala.concurrent.duration.Duration;
66
67 public class NetconfNodeActor extends UntypedActor {
68
69     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
70
71     private final SchemaSourceRegistry schemaRegistry;
72     private final SchemaRepository schemaRepository;
73     private final Timeout actorResponseWaitTime;
74     private final Duration writeTxIdleTimeout;
75
76     private RemoteDeviceId id;
77     private NetconfTopologySetup setup;
78     private List<SourceIdentifier> sourceIdentifiers;
79     private DOMRpcService deviceRpc;
80     private SlaveSalFacade slaveSalManager;
81     private DOMDataBroker deviceDataBroker;
82     //readTxActor can be shared
83     private ActorRef readTxActor;
84
85     public static Props props(final NetconfTopologySetup setup,
86                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
87                               final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime) {
88         return Props.create(NetconfNodeActor.class, () ->
89                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime));
90     }
91
92     private NetconfNodeActor(final NetconfTopologySetup setup,
93                              final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
94                              final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime) {
95         this.setup = setup;
96         this.id = id;
97         this.schemaRegistry = schemaRegistry;
98         this.schemaRepository = schemaRepository;
99         this.actorResponseWaitTime = actorResponseWaitTime;
100         this.writeTxIdleTimeout = setup.getIdleTimeout();
101     }
102
103     @Override
104     public void onReceive(final Object message) throws Exception {
105         if (message instanceof CreateInitialMasterActorData) { // master
106
107             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
108             sourceIdentifiers = masterActorData.getSourceIndentifiers();
109             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
110             final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
111             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
112             this.deviceRpc = masterActorData.getDeviceRpc();
113
114             sender().tell(new MasterActorDataInitialized(), self());
115
116             LOG.debug("{}: Master is ready.", id);
117
118         } else if (message instanceof  RefreshSetupMasterActorData) {
119             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
120             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
121             sender().tell(new MasterActorDataInitialized(), self());
122         } else if (message instanceof AskForMasterMountPoint) { // master
123             // only master contains reference to deviceDataBroker
124             if (deviceDataBroker != null) {
125                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
126             }
127
128         } else if (message instanceof YangTextSchemaSourceRequest) { // master
129
130             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
131             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
132
133         } else if (message instanceof NewReadTransactionRequest) { // master
134
135             sender().tell(new NewReadTransactionReply(readTxActor), self());
136
137         } else if (message instanceof NewWriteTransactionRequest) { // master
138             try {
139                 final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
140                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
141                 sender().tell(new NewWriteTransactionReply(txActor), self());
142             } catch (final Throwable t) {
143                 sender().tell(t, self());
144             }
145
146         } else if (message instanceof InvokeRpcMessage) { // master
147
148             final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
149             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
150
151         } else if (message instanceof RegisterMountPoint) { //slaves
152
153             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
154             registerSlaveMountPoint(getSender());
155
156         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
157             if (slaveSalManager != null) {
158                 slaveSalManager.close();
159                 slaveSalManager = null;
160             }
161
162         }
163     }
164
165
166     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
167         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
168                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
169
170         Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
171             @Override
172             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
173                 try {
174                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
175                 } catch (final IOException exception) {
176                     sender.tell(exception.getCause(), getSelf());
177                 }
178             }
179
180             @Override
181             public void onFailure(@Nonnull final Throwable throwable) {
182                 sender.tell(throwable, getSelf());
183             }
184         });
185     }
186
187     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
188                                 final ActorRef recipient) {
189
190         final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
191                 deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
192
193         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
194             @Override
195             public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
196                 if (domRpcResult == null) {
197                     recipient.tell(new EmptyResultResponse(), getSender());
198                     return;
199                 }
200                 NormalizedNodeMessage nodeMessageReply = null;
201                 if (domRpcResult.getResult() != null) {
202                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
203                             domRpcResult.getResult());
204                 }
205                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
206             }
207
208             @Override
209             public void onFailure(@Nonnull final Throwable throwable) {
210                 recipient.tell(throwable, getSelf());
211             }
212         });
213     }
214
215     private void registerSlaveMountPoint(final ActorRef masterReference) {
216         if (this.slaveSalManager != null) {
217             slaveSalManager.close();
218         }
219         slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem(), actorResponseWaitTime);
220
221         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
222                 getSchemaContext(masterReference);
223         final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
224
225         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
226             @Override
227             public void onSuccess(final SchemaContext result) {
228                 LOG.info("{}: Schema context resolved: {}", id, result.getModules());
229                 slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference);
230             }
231
232             @Override
233             public void onFailure(@Nonnull final Throwable throwable) {
234                 LOG.error("{}: Failed to register mount point: {}", id, throwable);
235             }
236         });
237     }
238
239     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
240         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
241     }
242
243     private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(final ActorRef masterReference) {
244
245         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
246                 new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
247         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
248                 getContext().dispatcher());
249
250         sourceIdentifiers.forEach(sourceId ->
251                 schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
252                         YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
253
254         final SchemaContextFactory schemaContextFactory
255                 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
256
257         return schemaContextFactory.createSchemaContext(sourceIdentifiers);
258     }
259
260 }