import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
+import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.mdsal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService;
import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage;
import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply;
+import org.opendaylight.netconf.topology.singleton.messages.netconf.NetconfDataTreeServiceRequest;
import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
-import scala.concurrent.duration.Duration;
public class NetconfNodeActor extends AbstractUntypedActor {
private final Duration writeTxIdleTimeout;
private DOMActionService deviceAction;
private SlaveSalFacade slaveSalManager;
private DOMDataBroker deviceDataBroker;
+ private NetconfDataTreeService netconfService;
//readTxActor can be shared
private ActorRef readTxActor;
private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
final DOMMountPointService mountPointService) {
this.setup = setup;
this.id = id;
- this.schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry();
- this.schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository();
+ schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry();
+ schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository();
this.actorResponseWaitTime = actorResponseWaitTime;
- this.writeTxIdleTimeout = setup.getIdleTimeout();
+ writeTxIdleTimeout = setup.getIdleTimeout();
this.mountPointService = mountPointService;
}
public void handleReceive(final Object message) {
LOG.debug("{}: received message {}", id, message);
- if (message instanceof CreateInitialMasterActorData) { // master
-
- final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
+ if (message instanceof CreateInitialMasterActorData masterActorData) { // master
sourceIdentifiers = masterActorData.getSourceIndentifiers();
- this.deviceDataBroker = masterActorData.getDeviceDataBroker();
+ deviceDataBroker = masterActorData.getDeviceDataBroker();
+ netconfService = masterActorData.getNetconfDataTreeService();
final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
readTxActor = context().actorOf(ReadTransactionActor.props(tx));
- this.deviceRpc = masterActorData.getDeviceRpc();
- this.deviceAction = masterActorData.getDeviceAction();
+ deviceRpc = masterActorData.getDeviceRpc();
+ deviceAction = masterActorData.getDeviceAction();
sender().tell(new MasterActorDataInitialized(), self());
-
LOG.debug("{}: Master is ready.", id);
-
} else if (message instanceof RefreshSetupMasterActorData) {
setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
sender().tell(new MasterActorDataInitialized(), self());
- } else if (message instanceof AskForMasterMountPoint) { // master
- AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint) message;
-
+ } else if (message instanceof AskForMasterMountPoint askForMasterMountPoint) { // master
// only master contains reference to deviceDataBroker
if (deviceDataBroker != null) {
LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
sender().tell(new Failure(new NotMasterException(self())), self());
}
-
- } else if (message instanceof YangTextSchemaSourceRequest) { // master
-
- final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
+ } else if (message instanceof YangTextSchemaSourceRequest yangTextSchemaSourceRequest) { // master
sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
-
} else if (message instanceof NewReadTransactionRequest) { // master
sender().tell(new Success(readTxActor), self());
} else if (message instanceof NewWriteTransactionRequest) { // master
} catch (final Exception t) {
sender().tell(new Failure(t), self());
}
-
} else if (message instanceof NewReadWriteTransactionRequest) {
try {
final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
} catch (final Exception t) {
sender().tell(new Failure(t), self());
}
- } else if (message instanceof InvokeRpcMessage) { // master
- final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
- invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
- } else if (message instanceof InvokeActionMessage) { // master
- final InvokeActionMessage invokeActionMessage = (InvokeActionMessage) message;
+ } else if (message instanceof InvokeRpcMessage invokeRpcMessage) { // master
+ invokeSlaveRpc(invokeRpcMessage.getSchemaPath().lastNodeIdentifier(),
+ invokeRpcMessage.getNormalizedNodeMessage(), sender());
+ } else if (message instanceof InvokeActionMessage invokeActionMessage) { // master
LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString());
invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(),
invokeActionMessage.getDOMDataTreeIdentifier(), sender());
- } else if (message instanceof RegisterMountPoint) { //slaves
- RegisterMountPoint registerMountPoint = (RegisterMountPoint) message;
+ } else if (message instanceof RegisterMountPoint registerMountPoint) { //slaves
sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
sender().tell(new Success(null), self());
schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
setup = ((RefreshSlaveActor) message).getSetup();
schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
+ } else if (message instanceof NetconfDataTreeServiceRequest) {
+ ActorRef netconfActor = context()
+ .actorOf(NetconfDataTreeServiceActor.props(netconfService, writeTxIdleTimeout));
+ sender().tell(new Success(netconfActor), self());
}
}
}, MoreExecutors.directExecutor());
}
- private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
+ private void invokeSlaveRpc(final QName qname, final NormalizedNodeMessage normalizedNodeMessage,
final ActorRef recipient) {
- LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage,
+ LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, qname, normalizedNodeMessage,
deviceRpc);
- final ListenableFuture<DOMRpcResult> rpcResult = deviceRpc.invokeRpc(schemaPath,
+ final ListenableFuture<? extends DOMRpcResult> rpcResult = deviceRpc.invokeRpc(qname,
normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
@Override
public void onSuccess(final DOMRpcResult domRpcResult) {
- LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, schemaPath, domRpcResult);
+ LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, qname, domRpcResult);
if (domRpcResult == null) {
recipient.tell(new EmptyResultResponse(), getSender());
/**
* Invoking Action on Slave Node in Odl Cluster Environment.
*
- * @param schemaPath {@link SchemaPath}
+ * @param schemaPath {@link Absolute}
* @param containerNodeMessage {@link ContainerNodeMessage}
* @param domDataTreeIdentifier {@link DOMDataTreeIdentifier}
* @param recipient {@link ActorRef}
*/
- private void invokeSlaveAction(final SchemaPath schemaPath, final ContainerNodeMessage containerNodeMessage,
+ private void invokeSlaveAction(final Absolute schemaPath, final ContainerNodeMessage containerNodeMessage,
final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) {
LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath,
containerNodeMessage, domDataTreeIdentifier, deviceAction);
resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
}
- private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
- return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
- }
-
- private DOMActionService getDOMActionService(final ActorRef masterReference) {
- return new ProxyDOMActionService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
- }
-
private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) {
final ListenableFuture<EffectiveModelContext> schemaContextFuture =
schemaContextFactory.createEffectiveModelContext(sourceIdentifiers);
- Futures.addCallback(schemaContextFuture, new FutureCallback<SchemaContext>() {
+ Futures.addCallback(schemaContextFuture, new FutureCallback<EffectiveModelContext>() {
@Override
- public void onSuccess(final SchemaContext result) {
+ public void onSuccess(final EffectiveModelContext result) {
executeInSelf(() -> {
// Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
// resolution.
if (slaveSalManager == localSlaveSalManager) {
LOG.info("{}: Schema context resolved: {} - registering slave mount point",
id, result.getModules());
- slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
- getDOMActionService(masterReference), masterReference);
+ final var actorSystem = setup.getActorSystem();
+ slaveSalManager.registerSlaveMountPoint(result, masterReference, new RemoteDeviceServices(
+ new ProxyDOMRpcService(actorSystem, masterReference, id, actorResponseWaitTime),
+ new ProxyDOMActionService(actorSystem, masterReference, id, actorResponseWaitTime)));
}
});
}
registeredSchemas = null;
}
}
-}
\ No newline at end of file
+}