X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2Factors%2FNetconfNodeActor.java;h=1cf7b9904eadc11d812550064b564923292b7dad;hb=8e59d67f1b7580c2135cbcc229d4c377c8cc1b09;hp=75c7e62e8a3f6d1cbc56346d861575755e0bf2ea;hpb=dd80556b7d2d71f32cb3eeb4ed1489b520535eaf;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java index 75c7e62e8a..1cf7b9904e 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java @@ -5,94 +5,120 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.topology.singleton.impl.actors; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActor; -import com.google.common.util.concurrent.CheckedFuture; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; +import akka.pattern.AskTimeoutException; +import akka.util.Timeout; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider; import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider; import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; -import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor; import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService; import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider; -import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl; import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.NotMasterException; import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData; +import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor; import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint; import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest; +import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage; +import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply; +import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; -import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException; -import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException; import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter; import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; -public class NetconfNodeActor extends UntypedActor { +public class NetconfNodeActor extends AbstractUntypedActor { - private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class); + private final Duration writeTxIdleTimeout; + private final DOMMountPointService mountPointService; - private NetconfTopologySetup setup; + private SchemaSourceRegistry schemaRegistry; + private SchemaRepository schemaRepository; + private Timeout actorResponseWaitTime; private RemoteDeviceId id; - private final SchemaSourceRegistry schemaRegistry; - private final SchemaRepository schemaRepository; - - private RemoteOperationTxProcessor operationsProcessor; + private NetconfTopologySetup setup; private List sourceIdentifiers; + private DOMRpcService deviceRpc; private SlaveSalFacade slaveSalManager; + private DOMDataBroker deviceDataBroker; + //readTxActor can be shared + private ActorRef readTxActor; + private List> registeredSchemas; - public static Props props(final NetconfTopologySetup setup, - final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry, - final SchemaRepository schemaRepository) { + public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id, + final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) { return Props.create(NetconfNodeActor.class, () -> - new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository)); + new NetconfNodeActor(setup, id, actorResponseWaitTime, mountPointService)); } - private NetconfNodeActor(final NetconfTopologySetup setup, - final RemoteDeviceId id, SchemaSourceRegistry schemaRegistry, - final SchemaRepository schemaRepository) { + protected NetconfNodeActor(final NetconfTopologySetup setup, + final RemoteDeviceId id, final Timeout actorResponseWaitTime, + final DOMMountPointService mountPointService) { this.setup = setup; this.id = id; - this.schemaRegistry = schemaRegistry; - this.schemaRepository = schemaRepository; + this.schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry(); + this.schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository(); + this.actorResponseWaitTime = actorResponseWaitTime; + this.writeTxIdleTimeout = setup.getIdleTimeout(); + this.mountPointService = mountPointService; } + @SuppressWarnings("checkstyle:IllegalCatch") @Override - public void onReceive(final Object message) throws Exception { + public void handleReceive(final Object message) { + LOG.debug("{}: received message {}", id, message); + if (message instanceof CreateInitialMasterActorData) { // master - sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers(); - operationsProcessor = - new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(), - id); + final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message; + sourceIdentifiers = masterActorData.getSourceIndentifiers(); + this.deviceDataBroker = masterActorData.getDeviceDataBroker(); + final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction(); + readTxActor = context().actorOf(ReadTransactionActor.props(tx)); + this.deviceRpc = masterActorData.getDeviceRpc(); + sender().tell(new MasterActorDataInitialized(), self()); LOG.debug("{}: Master is ready.", id); @@ -102,134 +128,211 @@ public class NetconfNodeActor extends UntypedActor { id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId(); sender().tell(new MasterActorDataInitialized(), self()); } else if (message instanceof AskForMasterMountPoint) { // master - // only master contains reference to operations processor - if (operationsProcessor != null) { - getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf()); + AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message; + + // only master contains reference to deviceDataBroker + if (deviceDataBroker != null) { + LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef()); + askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()), + sender()); + } else { + LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint); + sender().tell(new Failure(new NotMasterException(self())), self()); } - } else if (message instanceof TransactionRequest) { // master - - resolveProxyCalls(message, sender(), getSelf()); - } else if (message instanceof YangTextSchemaSourceRequest) { // master final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message; sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender()); - } else if (message instanceof RegisterMountPoint) { //slaves - - sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers(); - registerSlaveMountPoint(getSender()); + } else if (message instanceof NewReadTransactionRequest) { // master + sender().tell(new Success(readTxActor), self()); + } else if (message instanceof NewWriteTransactionRequest) { // master + try { + final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction(); + final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout)); + sender().tell(new Success(txActor), self()); + } catch (final Exception t) { + sender().tell(new Failure(t), self()); + } - } else if (message instanceof UnregisterSlaveMountPoint) { //slaves - if (slaveSalManager != null) { - slaveSalManager.close(); - slaveSalManager = null; + } else if (message instanceof NewReadWriteTransactionRequest) { + try { + final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction(); + final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout)); + sender().tell(new Success(txActor), self()); + } catch (final Exception t) { + sender().tell(new Failure(t), self()); } + } else if (message instanceof InvokeRpcMessage) { // master + final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message; + invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender()); + } else if (message instanceof RegisterMountPoint) { //slaves + RegisterMountPoint registerMountPoint = (RegisterMountPoint)message; + sourceIdentifiers = registerMountPoint.getSourceIndentifiers(); + registerSlaveMountPoint(registerMountPoint.getMasterActorRef()); + sender().tell(new Success(null), self()); + } else if (message instanceof UnregisterSlaveMountPoint) { //slaves + unregisterSlaveMountPoint(); + } else if (message instanceof RefreshSlaveActor) { //slave + actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime(); + id = ((RefreshSlaveActor) message).getId(); + schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry(); + setup = ((RefreshSlaveActor) message).getSetup(); + schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository(); } - } - - private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) { - if (message instanceof ReadRequest) { - - final ReadRequest readRequest = (ReadRequest) message; - operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender); - - } else if (message instanceof ExistsRequest) { - - final ExistsRequest readRequest = (ExistsRequest) message; - operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender); - - } else if (message instanceof MergeRequest) { - - final MergeRequest mergeRequest = (MergeRequest) message; - operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage()); - } else if (message instanceof PutRequest) { - - final PutRequest putRequest = (PutRequest) message; - operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage()); - - } else if (message instanceof DeleteRequest) { - - final DeleteRequest deleteRequest = (DeleteRequest) message; - operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath()); - - } else if (message instanceof CancelRequest) { - - operationsProcessor.doCancel(recipient, futureSender); + } - } else if (message instanceof SubmitRequest) { + @Override + public void postStop() throws Exception { + try { + super.postStop(); + } finally { + unregisterSlaveMountPoint(); + } + } - operationsProcessor.doSubmit(recipient, futureSender); + private void unregisterSlaveMountPoint() { + if (slaveSalManager != null) { + slaveSalManager.close(); + slaveSalManager = null; } + + closeSchemaSourceRegistrations(); } private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) { - final CheckedFuture yangTextSchemaSource = + final ListenableFuture<@NonNull YangTextSchemaSource> schemaSourceFuture = schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class); - Futures.addCallback(yangTextSchemaSource, new FutureCallback() { + Futures.addCallback(schemaSourceFuture, new FutureCallback() { @Override public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) { try { + LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier); sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf()); - } catch (IOException exception) { - sender.tell(exception.getCause(), getSelf()); + } catch (IOException e) { + sender.tell(new Failure(e), getSelf()); } } @Override public void onFailure(@Nonnull final Throwable throwable) { - sender.tell(throwable, getSelf()); + LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable); + sender.tell(new Failure(throwable), getSelf()); } - }); + }, MoreExecutors.directExecutor()); } - private void registerSlaveMountPoint(final ActorRef masterReference) { - if (this.slaveSalManager != null) { - slaveSalManager.close(); - } - slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem()); + private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage, + final ActorRef recipient) { + + LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage, + deviceRpc); - final CheckedFuture remoteSchemaContext = - getSchemaContext(masterReference); - final DOMRpcService deviceRpc = getDOMRpcService(); + final ListenableFuture rpcResult = deviceRpc.invokeRpc(schemaPath, + normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null); - Futures.addCallback(remoteSchemaContext, new FutureCallback() { + Futures.addCallback(rpcResult, new FutureCallback() { @Override - public void onSuccess(final SchemaContext result) { - LOG.info("{}: Schema context resolved: {}", id, result.getModules()); - slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference); + public void onSuccess(@Nullable final DOMRpcResult domRpcResult) { + LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, schemaPath, domRpcResult); + + if (domRpcResult == null) { + recipient.tell(new EmptyResultResponse(), getSender()); + return; + } + NormalizedNodeMessage nodeMessageReply = null; + if (domRpcResult.getResult() != null) { + nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY, + domRpcResult.getResult()); + } + recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf()); } @Override public void onFailure(@Nonnull final Throwable throwable) { - LOG.error("{}: Failed to register mount point: {}", id, throwable); + recipient.tell(new Failure(throwable), getSelf()); } - }); + }, MoreExecutors.directExecutor()); } - private DOMRpcService getDOMRpcService() { - return new ProxyDOMRpcService(); + private void registerSlaveMountPoint(final ActorRef masterReference) { + unregisterSlaveMountPoint(); + + slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService); + + resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1); } - private CheckedFuture getSchemaContext(ActorRef masterReference) { + private DOMRpcService getDOMRpcService(final ActorRef masterReference) { + return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime); + } + private SchemaContextFactory createSchemaContextFactory(final ActorRef masterReference) { final RemoteYangTextSourceProvider remoteYangTextSourceProvider = - new ProxyYangTextSourceProvider(masterReference, getContext()); + new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime); final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, getContext().dispatcher()); - sourceIdentifiers.forEach(sourceId -> - schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId, - YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue()))); + registeredSchemas = sourceIdentifiers.stream() + .map(sourceId -> + schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId, + YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue()))) + .collect(Collectors.toList()); - final SchemaContextFactory schemaContextFactory - = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT); + return schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT); + } + + private void resolveSchemaContext(final SchemaContextFactory schemaContextFactory, + final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) { + final ListenableFuture schemaContextFuture = + schemaContextFactory.createSchemaContext(sourceIdentifiers); + Futures.addCallback(schemaContextFuture, new FutureCallback() { + @Override + public void onSuccess(@Nonnull final SchemaContext result) { + executeInSelf(() -> { + // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context + // resolution. + if (slaveSalManager == localSlaveSalManager) { + LOG.info("{}: Schema context resolved: {} - registering slave mount point", + id, result.getModules()); + slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference), + masterReference); + } + }); + } - return schemaContextFactory.createSchemaContext(sourceIdentifiers); + @Override + public void onFailure(@Nonnull final Throwable throwable) { + executeInSelf(() -> { + if (slaveSalManager == localSlaveSalManager) { + final Throwable cause = Throwables.getRootCause(throwable); + if (cause instanceof AskTimeoutException) { + if (tries <= 5 || tries % 10 == 0) { + LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable); + } + + resolveSchemaContext(schemaContextFactory, localSlaveSalManager, + masterReference, tries + 1); + } else { + LOG.error("{}: Failed to resolve schema context - unable to register slave mount point", + id, throwable); + closeSchemaSourceRegistrations(); + } + } + }); + } + }, MoreExecutors.directExecutor()); + } + + private void closeSchemaSourceRegistrations() { + if (registeredSchemas != null) { + registeredSchemas.forEach(SchemaSourceRegistration::close); + registeredSchemas = null; + } } }