Bug 8032 - Initialization in sal failed, disconnecting from device
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
index 75c7e62e8a3f6d1cbc56346d861575755e0bf2ea..f04430f6660d02ad9898b9097b97856d4862158d 100644 (file)
@@ -11,39 +11,48 @@ package org.opendaylight.netconf.topology.singleton.impl.actors;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
+import akka.util.Timeout;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
-import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl;
 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
@@ -52,47 +61,65 @@ import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
 
 public class NetconfNodeActor extends UntypedActor {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
 
-    private NetconfTopologySetup setup;
-    private RemoteDeviceId id;
     private final SchemaSourceRegistry schemaRegistry;
     private final SchemaRepository schemaRepository;
+    private final Timeout actorResponseWaitTime;
+    private final Duration writeTxIdleTimeout;
+    private final DOMMountPointService mountPointService;
 
-    private RemoteOperationTxProcessor operationsProcessor;
+    private RemoteDeviceId id;
+    private NetconfTopologySetup setup;
     private List<SourceIdentifier> sourceIdentifiers;
+    private DOMRpcService deviceRpc;
     private SlaveSalFacade slaveSalManager;
+    private DOMDataBroker deviceDataBroker;
+    //readTxActor can be shared
+    private ActorRef readTxActor;
+    private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
 
     public static Props props(final NetconfTopologySetup setup,
                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
-                              final SchemaRepository schemaRepository) {
+                              final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
+                              final DOMMountPointService mountPointService) {
         return Props.create(NetconfNodeActor.class, () ->
-                new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository));
+                new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime,
+                        mountPointService));
     }
 
     private NetconfNodeActor(final NetconfTopologySetup setup,
-                             final RemoteDeviceId id, SchemaSourceRegistry schemaRegistry,
-                             final SchemaRepository schemaRepository) {
+                             final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
+                             final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
+                             final DOMMountPointService mountPointService) {
         this.setup = setup;
         this.id = id;
         this.schemaRegistry = schemaRegistry;
         this.schemaRepository = schemaRepository;
+        this.actorResponseWaitTime = actorResponseWaitTime;
+        this.writeTxIdleTimeout = setup.getIdleTimeout();
+        this.mountPointService = mountPointService;
     }
 
     @Override
     public void onReceive(final Object message) throws Exception {
         if (message instanceof CreateInitialMasterActorData) { // master
 
-            sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers();
-            operationsProcessor =
-                    new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
-                            id);
+            final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
+            sourceIdentifiers = masterActorData.getSourceIndentifiers();
+            this.deviceDataBroker = masterActorData.getDeviceDataBroker();
+            final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
+            readTxActor = context().actorOf(ReadTransactionActor.props(tx));
+            this.deviceRpc = masterActorData.getDeviceRpc();
+
             sender().tell(new MasterActorDataInitialized(), self());
 
             LOG.debug("{}: Master is ready.", id);
@@ -102,20 +129,34 @@ public class NetconfNodeActor extends UntypedActor {
             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
             sender().tell(new MasterActorDataInitialized(), self());
         } else if (message instanceof AskForMasterMountPoint) { // master
-            // only master contains reference to operations processor
-            if (operationsProcessor != null) {
+            // only master contains reference to deviceDataBroker
+            if (deviceDataBroker != null) {
                 getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
             }
 
-        } else if (message instanceof TransactionRequest) { // master
-
-            resolveProxyCalls(message, sender(), getSelf());
-
         } else if (message instanceof YangTextSchemaSourceRequest) { // master
 
             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
 
+        } else if (message instanceof NewReadTransactionRequest) { // master
+
+            sender().tell(new NewReadTransactionReply(readTxActor), self());
+
+        } else if (message instanceof NewWriteTransactionRequest) { // master
+            try {
+                final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
+                final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
+                sender().tell(new NewWriteTransactionReply(txActor), self());
+            } catch (final Throwable t) {
+                sender().tell(t, self());
+            }
+
+        } else if (message instanceof InvokeRpcMessage) { // master
+
+            final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
+            invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
+
         } else if (message instanceof RegisterMountPoint) { //slaves
 
             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
@@ -130,40 +171,10 @@ public class NetconfNodeActor extends UntypedActor {
         }
     }
 
-    private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
-        if (message instanceof ReadRequest) {
-
-            final ReadRequest readRequest = (ReadRequest) message;
-            operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
-
-        } else if (message instanceof ExistsRequest) {
-
-            final ExistsRequest readRequest = (ExistsRequest) message;
-            operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
-
-        } else if (message instanceof MergeRequest) {
-
-            final MergeRequest mergeRequest = (MergeRequest) message;
-            operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage());
-
-        } else if (message instanceof PutRequest) {
-
-            final PutRequest putRequest = (PutRequest) message;
-            operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage());
-
-        } else if (message instanceof DeleteRequest) {
-
-            final DeleteRequest deleteRequest = (DeleteRequest) message;
-            operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath());
-
-        } else if (message instanceof CancelRequest) {
-
-            operationsProcessor.doCancel(recipient, futureSender);
-
-        } else if (message instanceof SubmitRequest) {
-
-            operationsProcessor.doSubmit(recipient, futureSender);
-        }
+    @Override
+    public void postStop() throws Exception {
+        super.postStop();
+        closeSchemaSourceRegistrations();
     }
 
     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
@@ -175,7 +186,7 @@ public class NetconfNodeActor extends UntypedActor {
             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
                 try {
                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
-                } catch (IOException exception) {
+                } catch (final IOException exception) {
                     sender.tell(exception.getCause(), getSelf());
                 }
             }
@@ -187,15 +198,45 @@ public class NetconfNodeActor extends UntypedActor {
         });
     }
 
+    private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
+                                final ActorRef recipient) {
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
+                deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
+
+        Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
+            @Override
+            public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
+                if (domRpcResult == null) {
+                    recipient.tell(new EmptyResultResponse(), getSender());
+                    return;
+                }
+                NormalizedNodeMessage nodeMessageReply = null;
+                if (domRpcResult.getResult() != null) {
+                    nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
+                            domRpcResult.getResult());
+                }
+                recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                recipient.tell(throwable, getSelf());
+            }
+        });
+    }
+
     private void registerSlaveMountPoint(final ActorRef masterReference) {
         if (this.slaveSalManager != null) {
             slaveSalManager.close();
         }
-        slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem());
+        closeSchemaSourceRegistrations();
+        slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime,
+                mountPointService);
 
         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
                 getSchemaContext(masterReference);
-        final DOMRpcService deviceRpc = getDOMRpcService();
+        final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
 
         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
             @Override
@@ -211,20 +252,22 @@ public class NetconfNodeActor extends UntypedActor {
         });
     }
 
-    private DOMRpcService getDOMRpcService() {
-        return new ProxyDOMRpcService();
+    private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
+        return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
     }
 
-    private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(ActorRef masterReference) {
+    private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(final ActorRef masterReference) {
 
         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
-                new ProxyYangTextSourceProvider(masterReference, getContext());
+                new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
                 getContext().dispatcher());
 
-        sourceIdentifiers.forEach(sourceId ->
-                schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
-                        YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+        registeredSchemas = sourceIdentifiers.stream()
+                .map(sourceId ->
+                        schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
+                                YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
+                .collect(Collectors.toList());
 
         final SchemaContextFactory schemaContextFactory
                 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
@@ -232,4 +275,11 @@ public class NetconfNodeActor extends UntypedActor {
         return schemaContextFactory.createSchemaContext(sourceIdentifiers);
     }
 
+    private void closeSchemaSourceRegistrations() {
+        if (registeredSchemas != null) {
+            registeredSchemas.forEach(SchemaSourceRegistration::close);
+            registeredSchemas = null;
+        }
+    }
+
 }