import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
+import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.mdsal.dom.api.DOMRpcService;
import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService;
import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
-import scala.concurrent.duration.Duration;
public class NetconfNodeActor extends AbstractUntypedActor {
private final Duration writeTxIdleTimeout;
final DOMMountPointService mountPointService) {
this.setup = setup;
this.id = id;
- this.schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry();
- this.schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository();
+ schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry();
+ schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository();
this.actorResponseWaitTime = actorResponseWaitTime;
- this.writeTxIdleTimeout = setup.getIdleTimeout();
+ writeTxIdleTimeout = setup.getIdleTimeout();
this.mountPointService = mountPointService;
}
public void handleReceive(final Object message) {
LOG.debug("{}: received message {}", id, message);
- if (message instanceof CreateInitialMasterActorData) { // master
- final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
+ if (message instanceof CreateInitialMasterActorData masterActorData) { // master
sourceIdentifiers = masterActorData.getSourceIndentifiers();
- this.deviceDataBroker = masterActorData.getDeviceDataBroker();
- this.netconfService = masterActorData.getNetconfDataTreeService();
+ deviceDataBroker = masterActorData.getDeviceDataBroker();
+ netconfService = masterActorData.getNetconfDataTreeService();
final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
readTxActor = context().actorOf(ReadTransactionActor.props(tx));
- this.deviceRpc = masterActorData.getDeviceRpc();
- this.deviceAction = masterActorData.getDeviceAction();
+ deviceRpc = masterActorData.getDeviceRpc();
+ deviceAction = masterActorData.getDeviceAction();
sender().tell(new MasterActorDataInitialized(), self());
LOG.debug("{}: Master is ready.", id);
setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
sender().tell(new MasterActorDataInitialized(), self());
- } else if (message instanceof AskForMasterMountPoint) { // master
- AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint) message;
+ } else if (message instanceof AskForMasterMountPoint askForMasterMountPoint) { // master
// only master contains reference to deviceDataBroker
if (deviceDataBroker != null) {
LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
sender().tell(new Failure(new NotMasterException(self())), self());
}
- } else if (message instanceof YangTextSchemaSourceRequest) { // master
- final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
+ } else if (message instanceof YangTextSchemaSourceRequest yangTextSchemaSourceRequest) { // master
sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
} else if (message instanceof NewReadTransactionRequest) { // master
sender().tell(new Success(readTxActor), self());
} catch (final Exception t) {
sender().tell(new Failure(t), self());
}
- } else if (message instanceof InvokeRpcMessage) { // master
- final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
- invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
- } else if (message instanceof InvokeActionMessage) { // master
- final InvokeActionMessage invokeActionMessage = (InvokeActionMessage) message;
+ } else if (message instanceof InvokeRpcMessage invokeRpcMessage) { // master
+ invokeSlaveRpc(invokeRpcMessage.getSchemaPath().lastNodeIdentifier(),
+ invokeRpcMessage.getNormalizedNodeMessage(), sender());
+ } else if (message instanceof InvokeActionMessage invokeActionMessage) { // master
LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString());
invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(),
invokeActionMessage.getDOMDataTreeIdentifier(), sender());
- } else if (message instanceof RegisterMountPoint) { //slaves
- RegisterMountPoint registerMountPoint = (RegisterMountPoint) message;
+ } else if (message instanceof RegisterMountPoint registerMountPoint) { //slaves
sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
sender().tell(new Success(null), self());
}, MoreExecutors.directExecutor());
}
- private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
+ private void invokeSlaveRpc(final QName qname, final NormalizedNodeMessage normalizedNodeMessage,
final ActorRef recipient) {
- LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage,
+ LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, qname, normalizedNodeMessage,
deviceRpc);
- final ListenableFuture<? extends DOMRpcResult> rpcResult = deviceRpc.invokeRpc(schemaPath,
+ final ListenableFuture<? extends DOMRpcResult> rpcResult = deviceRpc.invokeRpc(qname,
normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
@Override
public void onSuccess(final DOMRpcResult domRpcResult) {
- LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, schemaPath, domRpcResult);
+ LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, qname, domRpcResult);
if (domRpcResult == null) {
recipient.tell(new EmptyResultResponse(), getSender());
/**
* Invoking Action on Slave Node in Odl Cluster Environment.
*
- * @param schemaPath {@link SchemaPath}
+ * @param schemaPath {@link Absolute}
* @param containerNodeMessage {@link ContainerNodeMessage}
* @param domDataTreeIdentifier {@link DOMDataTreeIdentifier}
* @param recipient {@link ActorRef}
*/
- private void invokeSlaveAction(final SchemaPath schemaPath, final ContainerNodeMessage containerNodeMessage,
+ private void invokeSlaveAction(final Absolute schemaPath, final ContainerNodeMessage containerNodeMessage,
final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) {
LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath,
containerNodeMessage, domDataTreeIdentifier, deviceAction);
resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
}
- @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
- justification = "https://github.com/spotbugs/spotbugs/issues/811")
- private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
- return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
- }
-
- @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
- justification = "https://github.com/spotbugs/spotbugs/issues/811")
- private DOMActionService getDOMActionService(final ActorRef masterReference) {
- return new ProxyDOMActionService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
- }
-
private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
if (slaveSalManager == localSlaveSalManager) {
LOG.info("{}: Schema context resolved: {} - registering slave mount point",
id, result.getModules());
- slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
- getDOMActionService(masterReference), masterReference);
+ final var actorSystem = setup.getActorSystem();
+ slaveSalManager.registerSlaveMountPoint(result, masterReference, new RemoteDeviceServices(
+ new ProxyDOMRpcService(actorSystem, masterReference, id, actorResponseWaitTime),
+ new ProxyDOMActionService(actorSystem, masterReference, id, actorResponseWaitTime)));
}
});
}
registeredSchemas = null;
}
}
-}
\ No newline at end of file
+}