X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2Factors%2FNetconfNodeActor.java;h=05ae2a05824b35eb114a3a9b8b29a66eedb21d3d;hb=7d83179b4d81e63eeb5c1c1adf4794c6e74c5709;hp=3a3fad9ce46a4628a6b8d8b6d717462e02dd2399;hpb=a3022c29beea055ec8e7cacaa30dd64d5a80887c;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java index 3a3fad9ce4..05ae2a0582 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java @@ -5,77 +5,77 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.topology.singleton.impl.actors; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActor; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; +import akka.pattern.AskTimeoutException; import akka.util.Timeout; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; +import java.time.Duration; import java.util.List; import java.util.stream.Collectors; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider; import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider; import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy; -import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.mdsal.dom.api.DOMActionResult; +import org.opendaylight.mdsal.dom.api.DOMActionService; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; +import org.opendaylight.netconf.dom.api.NetconfDataTreeService; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService; import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService; import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider; import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; +import org.opendaylight.netconf.topology.singleton.messages.ContainerNodeMessage; import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.NotMasterException; import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor; import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint; import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest; +import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage; +import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply; +import org.opendaylight.netconf.topology.singleton.messages.netconf.NetconfDataTreeServiceRequest; import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage; import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse; -import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest; -import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; -import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; +import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; +import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory; import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; -import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter; import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.duration.Duration; - -public final class NetconfNodeActor extends UntypedActor { - - private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class); +public class NetconfNodeActor extends AbstractUntypedActor { private final Duration writeTxIdleTimeout; private final DOMMountPointService mountPointService; @@ -86,29 +86,27 @@ public final class NetconfNodeActor extends UntypedActor { private NetconfTopologySetup setup; private List sourceIdentifiers; private DOMRpcService deviceRpc; + private DOMActionService deviceAction; private SlaveSalFacade slaveSalManager; private DOMDataBroker deviceDataBroker; + private NetconfDataTreeService netconfService; //readTxActor can be shared private ActorRef readTxActor; private List> registeredSchemas; - public static Props props(final NetconfTopologySetup setup, - final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry, - final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime, - final DOMMountPointService mountPointService) { + public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id, + final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) { return Props.create(NetconfNodeActor.class, () -> - new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime, - mountPointService)); + new NetconfNodeActor(setup, id, actorResponseWaitTime, mountPointService)); } - private NetconfNodeActor(final NetconfTopologySetup setup, - final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry, - final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime, - final DOMMountPointService mountPointService) { + protected NetconfNodeActor(final NetconfTopologySetup setup, + final RemoteDeviceId id, final Timeout actorResponseWaitTime, + final DOMMountPointService mountPointService) { this.setup = setup; this.id = id; - this.schemaRegistry = schemaRegistry; - this.schemaRepository = schemaRepository; + this.schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry(); + this.schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository(); this.actorResponseWaitTime = actorResponseWaitTime; this.writeTxIdleTimeout = setup.getIdleTimeout(); this.mountPointService = mountPointService; @@ -116,169 +114,218 @@ public final class NetconfNodeActor extends UntypedActor { @SuppressWarnings("checkstyle:IllegalCatch") @Override - public void onReceive(final Object message) throws Exception { - if (message instanceof CreateInitialMasterActorData) { // master + public void handleReceive(final Object message) { + LOG.debug("{}: received message {}", id, message); + if (message instanceof CreateInitialMasterActorData) { // master final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message; sourceIdentifiers = masterActorData.getSourceIndentifiers(); this.deviceDataBroker = masterActorData.getDeviceDataBroker(); - final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction(); + this.netconfService = masterActorData.getNetconfDataTreeService(); + final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction(); readTxActor = context().actorOf(ReadTransactionActor.props(tx)); this.deviceRpc = masterActorData.getDeviceRpc(); + this.deviceAction = masterActorData.getDeviceAction(); sender().tell(new MasterActorDataInitialized(), self()); - LOG.debug("{}: Master is ready.", id); - - } else if (message instanceof RefreshSetupMasterActorData) { + } else if (message instanceof RefreshSetupMasterActorData) { setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup(); id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId(); sender().tell(new MasterActorDataInitialized(), self()); } else if (message instanceof AskForMasterMountPoint) { // master + AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint) message; // only master contains reference to deviceDataBroker if (deviceDataBroker != null) { - getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf()); + LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef()); + askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()), + sender()); + } else { + LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint); + sender().tell(new Failure(new NotMasterException(self())), self()); } - } else if (message instanceof YangTextSchemaSourceRequest) { // master - final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message; sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender()); - } else if (message instanceof NewReadTransactionRequest) { // master - - sender().tell(new NewReadTransactionReply(readTxActor), self()); - + sender().tell(new Success(readTxActor), self()); } else if (message instanceof NewWriteTransactionRequest) { // master try { - final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction(); + final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction(); final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout)); - sender().tell(new NewWriteTransactionReply(txActor), self()); + sender().tell(new Success(txActor), self()); } catch (final Exception t) { - sender().tell(t, self()); + sender().tell(new Failure(t), self()); } - } else if (message instanceof NewReadWriteTransactionRequest) { try { - final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction(); + final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction(); final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout)); - sender().tell(new NewReadWriteTransactionReply(txActor), self()); + sender().tell(new Success(txActor), self()); } catch (final Exception t) { - sender().tell(t, self()); + sender().tell(new Failure(t), self()); } } else if (message instanceof InvokeRpcMessage) { // master - final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message; - invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender()); - + invokeSlaveRpc(invokeRpcMessage.getSchemaPath().lastNodeIdentifier(), + invokeRpcMessage.getNormalizedNodeMessage(), sender()); + } else if (message instanceof InvokeActionMessage) { // master + final InvokeActionMessage invokeActionMessage = (InvokeActionMessage) message; + LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString()); + invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(), + invokeActionMessage.getDOMDataTreeIdentifier(), sender()); } else if (message instanceof RegisterMountPoint) { //slaves - - sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers(); - registerSlaveMountPoint(getSender()); - + RegisterMountPoint registerMountPoint = (RegisterMountPoint) message; + sourceIdentifiers = registerMountPoint.getSourceIndentifiers(); + registerSlaveMountPoint(registerMountPoint.getMasterActorRef()); + sender().tell(new Success(null), self()); } else if (message instanceof UnregisterSlaveMountPoint) { //slaves - if (slaveSalManager != null) { - slaveSalManager.close(); - slaveSalManager = null; - } + unregisterSlaveMountPoint(); } else if (message instanceof RefreshSlaveActor) { //slave actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime(); id = ((RefreshSlaveActor) message).getId(); schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry(); setup = ((RefreshSlaveActor) message).getSetup(); schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository(); + } else if (message instanceof NetconfDataTreeServiceRequest) { + ActorRef netconfActor = context() + .actorOf(NetconfDataTreeServiceActor.props(netconfService, writeTxIdleTimeout)); + sender().tell(new Success(netconfActor), self()); } - } @Override public void postStop() throws Exception { - super.postStop(); + try { + super.postStop(); + } finally { + unregisterSlaveMountPoint(); + } + } + + private void unregisterSlaveMountPoint() { + if (slaveSalManager != null) { + slaveSalManager.close(); + slaveSalManager = null; + } + closeSchemaSourceRegistrations(); } private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) { - final ListenableFuture<@NonNull YangTextSchemaSource> yangTextSchemaSource = + final ListenableFuture schemaSourceFuture = schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class); - Futures.addCallback(yangTextSchemaSource, new FutureCallback() { + Futures.addCallback(schemaSourceFuture, new FutureCallback() { @Override public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) { try { + LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier); sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf()); - } catch (final IOException exception) { - sender.tell(exception.getCause(), getSelf()); + } catch (IOException e) { + sender.tell(new Failure(e), getSelf()); } } @Override - public void onFailure(@Nonnull final Throwable throwable) { - sender.tell(throwable, getSelf()); + public void onFailure(final Throwable throwable) { + LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable); + sender.tell(new Failure(throwable), getSelf()); } }, MoreExecutors.directExecutor()); } - private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage, + private void invokeSlaveRpc(final QName qname, final NormalizedNodeMessage normalizedNodeMessage, final ActorRef recipient) { - final CheckedFuture rpcResult = - deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode()); + LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, qname, normalizedNodeMessage, + deviceRpc); + + final ListenableFuture rpcResult = deviceRpc.invokeRpc(qname, + normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null); Futures.addCallback(rpcResult, new FutureCallback() { @Override - public void onSuccess(@Nullable final DOMRpcResult domRpcResult) { + public void onSuccess(final DOMRpcResult domRpcResult) { + LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, qname, domRpcResult); + if (domRpcResult == null) { recipient.tell(new EmptyResultResponse(), getSender()); return; } NormalizedNodeMessage nodeMessageReply = null; if (domRpcResult.getResult() != null) { - nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY, + nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), domRpcResult.getResult()); } recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf()); } @Override - public void onFailure(@Nonnull final Throwable throwable) { - recipient.tell(throwable, getSelf()); + public void onFailure(final Throwable throwable) { + recipient.tell(new Failure(throwable), getSelf()); } }, MoreExecutors.directExecutor()); } - private void registerSlaveMountPoint(final ActorRef masterReference) { - if (this.slaveSalManager != null) { - slaveSalManager.close(); - } - closeSchemaSourceRegistrations(); - slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, - mountPointService); + /** + * Invoking Action on Slave Node in Odl Cluster Environment. + * + * @param schemaPath {@link Absolute} + * @param containerNodeMessage {@link ContainerNodeMessage} + * @param domDataTreeIdentifier {@link DOMDataTreeIdentifier} + * @param recipient {@link ActorRef} + */ + private void invokeSlaveAction(final Absolute schemaPath, final ContainerNodeMessage containerNodeMessage, + final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) { + LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath, + containerNodeMessage, domDataTreeIdentifier, deviceAction); + + final ListenableFuture actionResult = deviceAction.invokeAction(schemaPath, + domDataTreeIdentifier, containerNodeMessage != null ? containerNodeMessage.getNode() : null); - final ListenableFuture remoteSchemaContext = getSchemaContext(masterReference); - final DOMRpcService deviceRpcService = getDOMRpcService(masterReference); + Futures.addCallback(actionResult, new FutureCallback() { - Futures.addCallback(remoteSchemaContext, new FutureCallback() { @Override - public void onSuccess(@Nonnull final SchemaContext result) { - LOG.info("{}: Schema context resolved: {}", id, result.getModules()); - slaveSalManager.registerSlaveMountPoint(result, deviceRpcService, masterReference); + public void onSuccess(final DOMActionResult domActionResult) { + LOG.debug("{}: invokeSlaveAction for {}, domActionResult: {}", id, schemaPath, domActionResult); + if (domActionResult == null) { + recipient.tell(new EmptyResultResponse(), getSender()); + return; + } + + //Check DomActionResult containing Ok onSuccess pass empty nodeMessageReply + ContainerNodeMessage nodeMessageReply = domActionResult.getOutput().map(ContainerNodeMessage::new) + .orElse(null); + recipient.tell(new InvokeActionMessageReply(nodeMessageReply, domActionResult.getErrors()), getSelf()); } @Override - public void onFailure(@Nonnull final Throwable throwable) { - LOG.error("{}: Failed to register mount point: {}", id, throwable); + public void onFailure(final Throwable throwable) { + recipient.tell(new Failure(throwable), getSelf()); } }, MoreExecutors.directExecutor()); } + private void registerSlaveMountPoint(final ActorRef masterReference) { + unregisterSlaveMountPoint(); + + slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService); + + resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1); + } + private DOMRpcService getDOMRpcService(final ActorRef masterReference) { return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime); } - private ListenableFuture getSchemaContext(final ActorRef masterReference) { + private DOMActionService getDOMActionService(final ActorRef masterReference) { + return new ProxyDOMActionService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime); + } + private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) { final RemoteYangTextSourceProvider remoteYangTextSourceProvider = - new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime); + new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime); final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, getContext().dispatcher()); @@ -288,10 +335,49 @@ public final class NetconfNodeActor extends UntypedActor { YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue()))) .collect(Collectors.toList()); - final SchemaContextFactory schemaContextFactory - = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT); + return schemaRepository.createEffectiveModelContextFactory(); + } + + private void resolveSchemaContext(final EffectiveModelContextFactory schemaContextFactory, + final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) { + final ListenableFuture schemaContextFuture = + schemaContextFactory.createEffectiveModelContext(sourceIdentifiers); + Futures.addCallback(schemaContextFuture, new FutureCallback() { + @Override + public void onSuccess(final EffectiveModelContext result) { + executeInSelf(() -> { + // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context + // resolution. + if (slaveSalManager == localSlaveSalManager) { + LOG.info("{}: Schema context resolved: {} - registering slave mount point", + id, result.getModules()); + slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference), + getDOMActionService(masterReference), masterReference); + } + }); + } - return schemaContextFactory.createSchemaContext(sourceIdentifiers); + @Override + public void onFailure(final Throwable throwable) { + executeInSelf(() -> { + if (slaveSalManager == localSlaveSalManager) { + final Throwable cause = Throwables.getRootCause(throwable); + if (cause instanceof AskTimeoutException) { + if (tries <= 5 || tries % 10 == 0) { + LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable); + } + + resolveSchemaContext(schemaContextFactory, localSlaveSalManager, + masterReference, tries + 1); + } else { + LOG.error("{}: Failed to resolve schema context - unable to register slave mount point", + id, throwable); + closeSchemaSourceRegistrations(); + } + } + }); + } + }, MoreExecutors.directExecutor()); } private void closeSchemaSourceRegistrations() { @@ -300,5 +386,4 @@ public final class NetconfNodeActor extends UntypedActor { registeredSchemas = null; } } - }