X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2Factors%2FNetconfNodeActor.java;h=f04430f6660d02ad9898b9097b97856d4862158d;hb=211fb2e30f8e22dd96acdedcc9a18a964e90f201;hp=75c7e62e8a3f6d1cbc56346d861575755e0bf2ea;hpb=9800d265b6e0747d1295a2fa0bd2b42527b58359;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java index 75c7e62e8a..f04430f666 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java @@ -11,39 +11,48 @@ package org.opendaylight.netconf.topology.singleton.impl.actors; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; +import akka.util.Timeout; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider; import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider; import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor; import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService; import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider; -import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl; import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint; import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest; +import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage; +import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply; +import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException; @@ -52,47 +61,65 @@ import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter; import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; public class NetconfNodeActor extends UntypedActor { private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class); - private NetconfTopologySetup setup; - private RemoteDeviceId id; private final SchemaSourceRegistry schemaRegistry; private final SchemaRepository schemaRepository; + private final Timeout actorResponseWaitTime; + private final Duration writeTxIdleTimeout; + private final DOMMountPointService mountPointService; - private RemoteOperationTxProcessor operationsProcessor; + private RemoteDeviceId id; + private NetconfTopologySetup setup; private List sourceIdentifiers; + private DOMRpcService deviceRpc; private SlaveSalFacade slaveSalManager; + private DOMDataBroker deviceDataBroker; + //readTxActor can be shared + private ActorRef readTxActor; + private List> registeredSchemas; public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry, - final SchemaRepository schemaRepository) { + final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime, + final DOMMountPointService mountPointService) { return Props.create(NetconfNodeActor.class, () -> - new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository)); + new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime, + mountPointService)); } private NetconfNodeActor(final NetconfTopologySetup setup, - final RemoteDeviceId id, SchemaSourceRegistry schemaRegistry, - final SchemaRepository schemaRepository) { + final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry, + final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime, + final DOMMountPointService mountPointService) { this.setup = setup; this.id = id; this.schemaRegistry = schemaRegistry; this.schemaRepository = schemaRepository; + this.actorResponseWaitTime = actorResponseWaitTime; + this.writeTxIdleTimeout = setup.getIdleTimeout(); + this.mountPointService = mountPointService; } @Override public void onReceive(final Object message) throws Exception { if (message instanceof CreateInitialMasterActorData) { // master - sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers(); - operationsProcessor = - new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(), - id); + final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message; + sourceIdentifiers = masterActorData.getSourceIndentifiers(); + this.deviceDataBroker = masterActorData.getDeviceDataBroker(); + final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction(); + readTxActor = context().actorOf(ReadTransactionActor.props(tx)); + this.deviceRpc = masterActorData.getDeviceRpc(); + sender().tell(new MasterActorDataInitialized(), self()); LOG.debug("{}: Master is ready.", id); @@ -102,20 +129,34 @@ public class NetconfNodeActor extends UntypedActor { id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId(); sender().tell(new MasterActorDataInitialized(), self()); } else if (message instanceof AskForMasterMountPoint) { // master - // only master contains reference to operations processor - if (operationsProcessor != null) { + // only master contains reference to deviceDataBroker + if (deviceDataBroker != null) { getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf()); } - } else if (message instanceof TransactionRequest) { // master - - resolveProxyCalls(message, sender(), getSelf()); - } else if (message instanceof YangTextSchemaSourceRequest) { // master final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message; sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender()); + } else if (message instanceof NewReadTransactionRequest) { // master + + sender().tell(new NewReadTransactionReply(readTxActor), self()); + + } else if (message instanceof NewWriteTransactionRequest) { // master + try { + final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction(); + final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout)); + sender().tell(new NewWriteTransactionReply(txActor), self()); + } catch (final Throwable t) { + sender().tell(t, self()); + } + + } else if (message instanceof InvokeRpcMessage) { // master + + final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message); + invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender()); + } else if (message instanceof RegisterMountPoint) { //slaves sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers(); @@ -130,40 +171,10 @@ public class NetconfNodeActor extends UntypedActor { } } - private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) { - if (message instanceof ReadRequest) { - - final ReadRequest readRequest = (ReadRequest) message; - operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender); - - } else if (message instanceof ExistsRequest) { - - final ExistsRequest readRequest = (ExistsRequest) message; - operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender); - - } else if (message instanceof MergeRequest) { - - final MergeRequest mergeRequest = (MergeRequest) message; - operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage()); - - } else if (message instanceof PutRequest) { - - final PutRequest putRequest = (PutRequest) message; - operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage()); - - } else if (message instanceof DeleteRequest) { - - final DeleteRequest deleteRequest = (DeleteRequest) message; - operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath()); - - } else if (message instanceof CancelRequest) { - - operationsProcessor.doCancel(recipient, futureSender); - - } else if (message instanceof SubmitRequest) { - - operationsProcessor.doSubmit(recipient, futureSender); - } + @Override + public void postStop() throws Exception { + super.postStop(); + closeSchemaSourceRegistrations(); } private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) { @@ -175,7 +186,7 @@ public class NetconfNodeActor extends UntypedActor { public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) { try { sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf()); - } catch (IOException exception) { + } catch (final IOException exception) { sender.tell(exception.getCause(), getSelf()); } } @@ -187,15 +198,45 @@ public class NetconfNodeActor extends UntypedActor { }); } + private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage, + final ActorRef recipient) { + + final CheckedFuture rpcResult = + deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode()); + + Futures.addCallback(rpcResult, new FutureCallback() { + @Override + public void onSuccess(@Nullable final DOMRpcResult domRpcResult) { + if (domRpcResult == null) { + recipient.tell(new EmptyResultResponse(), getSender()); + return; + } + NormalizedNodeMessage nodeMessageReply = null; + if (domRpcResult.getResult() != null) { + nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY, + domRpcResult.getResult()); + } + recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf()); + } + + @Override + public void onFailure(@Nonnull final Throwable throwable) { + recipient.tell(throwable, getSelf()); + } + }); + } + private void registerSlaveMountPoint(final ActorRef masterReference) { if (this.slaveSalManager != null) { slaveSalManager.close(); } - slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem()); + closeSchemaSourceRegistrations(); + slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, + mountPointService); final CheckedFuture remoteSchemaContext = getSchemaContext(masterReference); - final DOMRpcService deviceRpc = getDOMRpcService(); + final DOMRpcService deviceRpc = getDOMRpcService(masterReference); Futures.addCallback(remoteSchemaContext, new FutureCallback() { @Override @@ -211,20 +252,22 @@ public class NetconfNodeActor extends UntypedActor { }); } - private DOMRpcService getDOMRpcService() { - return new ProxyDOMRpcService(); + private DOMRpcService getDOMRpcService(final ActorRef masterReference) { + return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime); } - private CheckedFuture getSchemaContext(ActorRef masterReference) { + private CheckedFuture getSchemaContext(final ActorRef masterReference) { final RemoteYangTextSourceProvider remoteYangTextSourceProvider = - new ProxyYangTextSourceProvider(masterReference, getContext()); + new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime); final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, getContext().dispatcher()); - sourceIdentifiers.forEach(sourceId -> - schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId, - YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue()))); + registeredSchemas = sourceIdentifiers.stream() + .map(sourceId -> + schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId, + YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue()))) + .collect(Collectors.toList()); final SchemaContextFactory schemaContextFactory = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT); @@ -232,4 +275,11 @@ public class NetconfNodeActor extends UntypedActor { return schemaContextFactory.createSchemaContext(sourceIdentifiers); } + private void closeSchemaSourceRegistrations() { + if (registeredSchemas != null) { + registeredSchemas.forEach(SchemaSourceRegistration::close); + registeredSchemas = null; + } + } + }