X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2Factors%2FNetconfNodeActor.java;h=05ae2a05824b35eb114a3a9b8b29a66eedb21d3d;hb=7d83179b4d81e63eeb5c1c1adf4794c6e74c5709;hp=2af981783e4e0882dae9bf28e55c5c44adc4e625;hpb=3eac4fad8be9aae42fd76037e1c0a8ef5dc7b608;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java index 2af981783e..05ae2a0582 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.topology.singleton.impl.actors; import akka.actor.ActorRef; @@ -15,35 +14,37 @@ import akka.actor.Status.Success; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; +import java.time.Duration; import java.util.List; import java.util.stream.Collectors; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider; import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider; import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy; -import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.mdsal.dom.api.DOMActionResult; +import org.opendaylight.mdsal.dom.api.DOMActionService; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; +import org.opendaylight.netconf.dom.api.NetconfDataTreeService; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService; import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService; import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider; import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; +import org.opendaylight.netconf.topology.singleton.messages.ContainerNodeMessage; import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; @@ -53,30 +54,28 @@ import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor; import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint; import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest; +import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage; +import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply; +import org.opendaylight.netconf.topology.singleton.messages.netconf.NetconfDataTreeServiceRequest; import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage; import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse; -import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; -import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; +import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; +import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory; import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; -import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter; import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; -import scala.concurrent.duration.Duration; public class NetconfNodeActor extends AbstractUntypedActor { - private final Duration writeTxIdleTimeout; private final DOMMountPointService mountPointService; @@ -87,29 +86,27 @@ public class NetconfNodeActor extends AbstractUntypedActor { private NetconfTopologySetup setup; private List sourceIdentifiers; private DOMRpcService deviceRpc; + private DOMActionService deviceAction; private SlaveSalFacade slaveSalManager; private DOMDataBroker deviceDataBroker; + private NetconfDataTreeService netconfService; //readTxActor can be shared private ActorRef readTxActor; private List> registeredSchemas; - public static Props props(final NetconfTopologySetup setup, - final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry, - final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime, - final DOMMountPointService mountPointService) { + public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id, + final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) { return Props.create(NetconfNodeActor.class, () -> - new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime, - mountPointService)); + new NetconfNodeActor(setup, id, actorResponseWaitTime, mountPointService)); } protected NetconfNodeActor(final NetconfTopologySetup setup, - final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry, - final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime, + final RemoteDeviceId id, final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) { this.setup = setup; this.id = id; - this.schemaRegistry = schemaRegistry; - this.schemaRepository = schemaRepository; + this.schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry(); + this.schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository(); this.actorResponseWaitTime = actorResponseWaitTime; this.writeTxIdleTimeout = setup.getIdleTimeout(); this.mountPointService = mountPointService; @@ -117,71 +114,68 @@ public class NetconfNodeActor extends AbstractUntypedActor { @SuppressWarnings("checkstyle:IllegalCatch") @Override - public void handleReceive(final Object message) throws Exception { + public void handleReceive(final Object message) { LOG.debug("{}: received message {}", id, message); if (message instanceof CreateInitialMasterActorData) { // master - final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message; sourceIdentifiers = masterActorData.getSourceIndentifiers(); this.deviceDataBroker = masterActorData.getDeviceDataBroker(); - final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction(); + this.netconfService = masterActorData.getNetconfDataTreeService(); + final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction(); readTxActor = context().actorOf(ReadTransactionActor.props(tx)); this.deviceRpc = masterActorData.getDeviceRpc(); + this.deviceAction = masterActorData.getDeviceAction(); sender().tell(new MasterActorDataInitialized(), self()); - LOG.debug("{}: Master is ready.", id); - - } else if (message instanceof RefreshSetupMasterActorData) { + } else if (message instanceof RefreshSetupMasterActorData) { setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup(); id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId(); sender().tell(new MasterActorDataInitialized(), self()); } else if (message instanceof AskForMasterMountPoint) { // master - AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message; - + AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint) message; // only master contains reference to deviceDataBroker if (deviceDataBroker != null) { LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef()); askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()), - sender()); + sender()); } else { LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint); sender().tell(new Failure(new NotMasterException(self())), self()); } - } else if (message instanceof YangTextSchemaSourceRequest) { // master - final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message; sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender()); - } else if (message instanceof NewReadTransactionRequest) { // master - - sender().tell(new NewReadTransactionReply(readTxActor), self()); - + sender().tell(new Success(readTxActor), self()); } else if (message instanceof NewWriteTransactionRequest) { // master try { - final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction(); + final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction(); final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout)); - sender().tell(new NewWriteTransactionReply(txActor), self()); + sender().tell(new Success(txActor), self()); } catch (final Exception t) { - sender().tell(t, self()); + sender().tell(new Failure(t), self()); } - } else if (message instanceof NewReadWriteTransactionRequest) { try { - final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction(); + final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction(); final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout)); - sender().tell(new NewReadWriteTransactionReply(txActor), self()); + sender().tell(new Success(txActor), self()); } catch (final Exception t) { - sender().tell(t, self()); + sender().tell(new Failure(t), self()); } } else if (message instanceof InvokeRpcMessage) { // master final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message; - invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender()); - + invokeSlaveRpc(invokeRpcMessage.getSchemaPath().lastNodeIdentifier(), + invokeRpcMessage.getNormalizedNodeMessage(), sender()); + } else if (message instanceof InvokeActionMessage) { // master + final InvokeActionMessage invokeActionMessage = (InvokeActionMessage) message; + LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString()); + invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(), + invokeActionMessage.getDOMDataTreeIdentifier(), sender()); } else if (message instanceof RegisterMountPoint) { //slaves - RegisterMountPoint registerMountPoint = (RegisterMountPoint)message; + RegisterMountPoint registerMountPoint = (RegisterMountPoint) message; sourceIdentifiers = registerMountPoint.getSourceIndentifiers(); registerSlaveMountPoint(registerMountPoint.getMasterActorRef()); sender().tell(new Success(null), self()); @@ -193,8 +187,11 @@ public class NetconfNodeActor extends AbstractUntypedActor { schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry(); setup = ((RefreshSlaveActor) message).getSetup(); schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository(); + } else if (message instanceof NetconfDataTreeServiceRequest) { + ActorRef netconfActor = context() + .actorOf(NetconfDataTreeServiceActor.props(netconfService, writeTxIdleTimeout)); + sender().tell(new Success(netconfActor), self()); } - } @Override @@ -216,13 +213,14 @@ public class NetconfNodeActor extends AbstractUntypedActor { } private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) { - final ListenableFuture<@NonNull YangTextSchemaSource> schemaSourceFuture = + final ListenableFuture schemaSourceFuture = schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class); Futures.addCallback(schemaSourceFuture, new FutureCallback() { @Override public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) { try { + LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier); sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf()); } catch (IOException e) { sender.tell(new Failure(e), getSelf()); @@ -230,35 +228,80 @@ public class NetconfNodeActor extends AbstractUntypedActor { } @Override - public void onFailure(@Nonnull final Throwable throwable) { + public void onFailure(final Throwable throwable) { + LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable); sender.tell(new Failure(throwable), getSelf()); } }, MoreExecutors.directExecutor()); } - private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage, + private void invokeSlaveRpc(final QName qname, final NormalizedNodeMessage normalizedNodeMessage, final ActorRef recipient) { - final CheckedFuture rpcResult = - deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode()); + LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, qname, normalizedNodeMessage, + deviceRpc); + + final ListenableFuture rpcResult = deviceRpc.invokeRpc(qname, + normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null); Futures.addCallback(rpcResult, new FutureCallback() { @Override - public void onSuccess(@Nullable final DOMRpcResult domRpcResult) { + public void onSuccess(final DOMRpcResult domRpcResult) { + LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, qname, domRpcResult); + if (domRpcResult == null) { recipient.tell(new EmptyResultResponse(), getSender()); return; } NormalizedNodeMessage nodeMessageReply = null; if (domRpcResult.getResult() != null) { - nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY, + nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), domRpcResult.getResult()); } recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf()); } @Override - public void onFailure(@Nonnull final Throwable throwable) { + public void onFailure(final Throwable throwable) { + recipient.tell(new Failure(throwable), getSelf()); + } + }, MoreExecutors.directExecutor()); + } + + /** + * Invoking Action on Slave Node in Odl Cluster Environment. + * + * @param schemaPath {@link Absolute} + * @param containerNodeMessage {@link ContainerNodeMessage} + * @param domDataTreeIdentifier {@link DOMDataTreeIdentifier} + * @param recipient {@link ActorRef} + */ + private void invokeSlaveAction(final Absolute schemaPath, final ContainerNodeMessage containerNodeMessage, + final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) { + LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath, + containerNodeMessage, domDataTreeIdentifier, deviceAction); + + final ListenableFuture actionResult = deviceAction.invokeAction(schemaPath, + domDataTreeIdentifier, containerNodeMessage != null ? containerNodeMessage.getNode() : null); + + Futures.addCallback(actionResult, new FutureCallback() { + + @Override + public void onSuccess(final DOMActionResult domActionResult) { + LOG.debug("{}: invokeSlaveAction for {}, domActionResult: {}", id, schemaPath, domActionResult); + if (domActionResult == null) { + recipient.tell(new EmptyResultResponse(), getSender()); + return; + } + + //Check DomActionResult containing Ok onSuccess pass empty nodeMessageReply + ContainerNodeMessage nodeMessageReply = domActionResult.getOutput().map(ContainerNodeMessage::new) + .orElse(null); + recipient.tell(new InvokeActionMessageReply(nodeMessageReply, domActionResult.getErrors()), getSelf()); + } + + @Override + public void onFailure(final Throwable throwable) { recipient.tell(new Failure(throwable), getSelf()); } }, MoreExecutors.directExecutor()); @@ -276,7 +319,11 @@ public class NetconfNodeActor extends AbstractUntypedActor { return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime); } - private SchemaContextFactory createSchemaContextFactory(final ActorRef masterReference) { + private DOMActionService getDOMActionService(final ActorRef masterReference) { + return new ProxyDOMActionService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime); + } + + private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) { final RemoteYangTextSourceProvider remoteYangTextSourceProvider = new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime); final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, @@ -288,16 +335,16 @@ public class NetconfNodeActor extends AbstractUntypedActor { YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue()))) .collect(Collectors.toList()); - return schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT); + return schemaRepository.createEffectiveModelContextFactory(); } - private void resolveSchemaContext(final SchemaContextFactory schemaContextFactory, - final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, int tries) { - final ListenableFuture schemaContextFuture = - schemaContextFactory.createSchemaContext(sourceIdentifiers); - Futures.addCallback(schemaContextFuture, new FutureCallback() { + private void resolveSchemaContext(final EffectiveModelContextFactory schemaContextFactory, + final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) { + final ListenableFuture schemaContextFuture = + schemaContextFactory.createEffectiveModelContext(sourceIdentifiers); + Futures.addCallback(schemaContextFuture, new FutureCallback() { @Override - public void onSuccess(@Nonnull final SchemaContext result) { + public void onSuccess(final EffectiveModelContext result) { executeInSelf(() -> { // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context // resolution. @@ -305,13 +352,13 @@ public class NetconfNodeActor extends AbstractUntypedActor { LOG.info("{}: Schema context resolved: {} - registering slave mount point", id, result.getModules()); slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference), - masterReference); + getDOMActionService(masterReference), masterReference); } }); } @Override - public void onFailure(@Nonnull final Throwable throwable) { + public void onFailure(final Throwable throwable) { executeInSelf(() -> { if (slaveSalManager == localSlaveSalManager) { final Throwable cause = Throwables.getRootCause(throwable); @@ -339,5 +386,4 @@ public class NetconfNodeActor extends AbstractUntypedActor { registeredSchemas = null; } } - }