import static java.util.Objects.requireNonNullElseGet;
import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.local.LocalAddress;
-import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import java.io.Closeable;
import java.io.IOException;
-import java.net.BindException;
-import java.net.Inet4Address;
-import java.net.InetSocketAddress;
+import java.net.InetAddress;
+import java.net.ServerSocket;
import java.net.UnknownHostException;
-import java.nio.channels.AsynchronousChannelGroup;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.IntStream;
import org.opendaylight.netconf.api.CapabilityURN;
-import org.opendaylight.netconf.northbound.ssh.SshProxyServer;
-import org.opendaylight.netconf.northbound.ssh.SshProxyServerConfiguration;
-import org.opendaylight.netconf.northbound.ssh.SshProxyServerConfigurationBuilder;
-import org.opendaylight.netconf.server.NetconfServerDispatcherImpl;
+import org.opendaylight.netconf.server.NetconfServerFactoryImpl;
import org.opendaylight.netconf.server.NetconfServerSessionNegotiatorFactory;
import org.opendaylight.netconf.server.ServerChannelInitializer;
import org.opendaylight.netconf.server.api.SessionIdProvider;
import org.opendaylight.netconf.server.api.operations.NetconfOperationServiceFactory;
import org.opendaylight.netconf.server.impl.DefaultSessionIdProvider;
import org.opendaylight.netconf.server.osgi.AggregatedNetconfOperationServiceFactory;
-import org.opendaylight.netconf.shaded.sshd.common.keyprovider.KeyPairProvider;
-import org.opendaylight.netconf.shaded.sshd.common.util.threads.ThreadUtils;
+import org.opendaylight.netconf.shaded.sshd.server.auth.UserAuthFactory;
+import org.opendaylight.netconf.shaded.sshd.server.auth.password.UserAuthPasswordFactory;
+import org.opendaylight.netconf.shaded.sshd.server.auth.pubkey.UserAuthPublicKeyFactory;
import org.opendaylight.netconf.test.tool.config.Configuration;
import org.opendaylight.netconf.test.tool.customrpc.SettableOperationProvider;
import org.opendaylight.netconf.test.tool.monitoring.NetconfMonitoringOperationServiceFactory;
import org.opendaylight.netconf.test.tool.operations.OperationsProvider;
import org.opendaylight.netconf.test.tool.rpchandler.SettableOperationRpcProvider;
import org.opendaylight.netconf.test.tool.schemacache.SchemaSourceCache;
+import org.opendaylight.netconf.transport.api.TransportStack;
+import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
+import org.opendaylight.netconf.transport.ssh.SSHTransportStackFactory;
+import org.opendaylight.netconf.transport.ssh.ServerFactoryManagerConfigurator;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.server.rev230417.netconf.server.listen.stack.grouping.transport.ssh.ssh.TcpServerParametersBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev230417.TcpServerGrouping;
import org.opendaylight.yangtools.yang.common.Revision;
+import org.opendaylight.yangtools.yang.common.Uint16;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.ModuleLike;
public class NetconfDeviceSimulator implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceSimulator.class);
- private final NioEventLoopGroup nettyThreadgroup;
private final HashedWheelTimer hashedWheelTimer;
- private final List<Channel> devicesChannels = new ArrayList<>();
- private final List<SshProxyServer> sshWrappers = new ArrayList<>();
- private final ScheduledExecutorService minaTimerExecutor;
- private final ExecutorService nioExecutor;
private final Configuration configuration;
+ private final List<TransportStack> servers;
+ private final SSHTransportStackFactory sshTransportStackFactory;
private EffectiveModelContext schemaContext;
private boolean sendFakeSchema = false;
public NetconfDeviceSimulator(final Configuration configuration) {
this.configuration = configuration;
- nettyThreadgroup = new NioEventLoopGroup();
+ this.servers = new ArrayList<>(configuration.getDeviceCount());
+ this.sshTransportStackFactory = new SSHTransportStackFactory("netconf-device-simulator-threads",
+ configuration.getThreadPoolSize());
hashedWheelTimer = new HashedWheelTimer();
- minaTimerExecutor = Executors.newScheduledThreadPool(configuration.getThreadPoolSize(),
- new ThreadFactoryBuilder().setNameFormat("netconf-ssh-server-mina-timers-%d").build());
- nioExecutor = ThreadUtils.newFixedThreadPool("netconf-ssh-server-nio-group", configuration.getThreadPoolSize());
}
- private NetconfServerDispatcherImpl createDispatcher(final Set<Capability> capabilities,
+ private ServerChannelInitializer createServerChannelInitializer(final Set<Capability> capabilities,
final SchemaSourceProvider<YangTextSchemaSource> sourceProvider) {
final Set<Capability> transformedCapabilities = new HashSet<>(Collections2.transform(capabilities, input -> {
configuration.getGenerateConfigsTimeout(),
monitoringService1, serverCapabilities);
- final ServerChannelInitializer serverChannelInitializer =
- new ServerChannelInitializer(serverNegotiatorFactory);
- return new NetconfServerDispatcherImpl(serverChannelInitializer, nettyThreadgroup, nettyThreadgroup);
+ return new ServerChannelInitializer(serverNegotiatorFactory);
}
private NetconfOperationServiceFactory createOperationServiceFactory(
public List<Integer> start() {
final var proto = configuration.isSsh() ? "SSH" : "TCP";
LOG.info("Starting {}, {} simulated devices starting on port {}",
- configuration.getDeviceCount(), proto, configuration.getStartingPort());
+ configuration.getDeviceCount(), proto, configuration.getStartingPort());
- final SharedSchemaRepository schemaRepo = new SharedSchemaRepository("netconf-simulator");
- final Set<Capability> capabilities = parseSchemasToModuleCapabilities(schemaRepo);
-
- final NetconfServerDispatcherImpl dispatcher = createDispatcher(capabilities,
+ final var schemaRepo = new SharedSchemaRepository("netconf-simulator");
+ final var capabilities = parseSchemasToModuleCapabilities(schemaRepo);
+ final var serverChannelInitializer = createServerChannelInitializer(capabilities,
sourceIdentifier -> schemaRepo.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class));
+ final var serverFactory = new NetconfServerFactoryImpl(serverChannelInitializer, sshTransportStackFactory);
- int currentPort = configuration.getStartingPort();
-
- final List<Integer> openDevices = new ArrayList<>();
+ final var ipAddress = getIpAddress(configuration);
+ final var startingPort = getStartingPort(configuration);
+ final var deviceCount = configuration.getDeviceCount();
+ final var ports = IntStream.range(startingPort, Math.min(startingPort + deviceCount, 65536))
+ .mapToObj(Integer::new).toList();
- // Generate key to temp folder
- final KeyPairProvider keyPairProvider = new VirtualKeyPairProvider();
+ final var openDevices = new ArrayList<Integer>(ports.size());
+ final var configurator = configuration.isSsh() ? createServerFactoryManagerConfigurator(configuration) : null;
- final AsynchronousChannelGroup group;
- try {
- group = AsynchronousChannelGroup.withThreadPool(nioExecutor);
- } catch (final IOException e) {
- throw new IllegalStateException("Failed to create group", e);
- }
+ LOG.debug("Ports: {}", ports);
- for (int i = 0; i < configuration.getDeviceCount(); i++) {
- if (currentPort > 65535) {
- LOG.warn("Port cannot be greater than 65535, stopping further attempts.");
+ for (final int port : ports) {
+ try {
+ final var connectParams = connectionParams(ipAddress, port);
+ final var serverFuture = configuration.isSsh()
+ ? serverFactory.createSshServer(connectParams, null, configurator)
+ : serverFactory.createTcpServer(connectParams);
+ servers.add(serverFuture.get());
+ openDevices.add(port);
+ } catch (UnsupportedConfigurationException | InterruptedException | ExecutionException e) {
+ LOG.error("Could not start {} simulated device on port {}", proto, port, e);
break;
}
- final InetSocketAddress address = getAddress(configuration.getIp(), currentPort);
-
- final ChannelFuture server;
- if (configuration.isSsh()) {
- final InetSocketAddress bindingAddress = InetSocketAddress.createUnresolved("0.0.0.0", currentPort);
- final LocalAddress tcpLocalAddress = new LocalAddress(address.toString());
-
- server = dispatcher.createLocalServer(tcpLocalAddress);
- try {
- final SshProxyServer sshServer = new SshProxyServer(
- minaTimerExecutor, nettyThreadgroup, group);
- sshServer.bind(getSshConfiguration(bindingAddress, tcpLocalAddress, keyPairProvider));
- sshWrappers.add(sshServer);
- } catch (final BindException e) {
- LOG.warn("Cannot start simulated device on {}, port already in use. Skipping.", address);
- // Close local server and continue
- server.cancel(true);
- if (server.isDone()) {
- server.channel().close();
- }
- continue;
- } catch (final IOException e) {
- LOG.warn("Cannot start simulated device on {} due to IOException.", address, e);
- break;
- } finally {
- currentPort++;
- }
-
- try {
- server.get();
- } catch (final InterruptedException e) {
- throw new IllegalStateException("Interrupted while waiting for server", e);
- } catch (final ExecutionException e) {
- LOG.warn("Cannot start ssh simulated device on {}, skipping", address, e);
- continue;
- }
-
- LOG.debug("Simulated SSH device started on {}", address);
-
- } else {
- server = dispatcher.createServer(address);
- currentPort++;
-
- try {
- server.get();
- } catch (final InterruptedException e) {
- throw new IllegalStateException("Interrupted while waiting for server", e);
- } catch (final ExecutionException e) {
- LOG.warn("Cannot start tcp simulated device on {}, skipping", address, e);
- continue;
- }
-
- LOG.debug("Simulated TCP device started on {}", server.channel().localAddress());
- }
-
- devicesChannels.add(server.channel());
- openDevices.add(currentPort - 1);
}
+ final var first = openDevices.get(0);
+ final var last = openDevices.isEmpty() ? null : openDevices.get(openDevices.size() - 1);
if (openDevices.size() == configuration.getDeviceCount()) {
- LOG.info("All simulated devices started successfully from port {} to {}",
- configuration.getStartingPort(), currentPort - 1);
- } else if (openDevices.size() == 0) {
+ LOG.info("All simulated devices started successfully from port {} to {}", first, last);
+ } else if (openDevices.isEmpty()) {
LOG.warn("No simulated devices started.");
} else {
- LOG.warn("Not all simulated devices started successfully. Started devices ar on ports {}", openDevices);
+ LOG.warn("Not all simulated devices started successfully. Started devices are on ports {} to {}",
+ first, last);
}
-
return openDevices;
}
- private SshProxyServerConfiguration getSshConfiguration(final InetSocketAddress bindingAddress,
- final LocalAddress tcpLocalAddress, final KeyPairProvider keyPairProvider) {
- return new SshProxyServerConfigurationBuilder()
- .setBindingAddress(bindingAddress)
- .setLocalAddress(tcpLocalAddress)
- .setAuthenticator(configuration.getAuthProvider())
- .setPublickeyAuthenticator(configuration.getPublickeyAuthenticator())
- .setKeyPairProvider(keyPairProvider)
- .setIdleTimeout(Integer.MAX_VALUE)
- .createSshProxyServerConfiguration();
+ private static ServerFactoryManagerConfigurator createServerFactoryManagerConfigurator(
+ final Configuration configuration) {
+ final var authProvider = configuration.getAuthProvider();
+ final var publicKeyAuthenticator = configuration.getPublickeyAuthenticator();
+ return factoryManager -> {
+ final var authFactoriesListBuilder = ImmutableList.<UserAuthFactory>builder();
+ authFactoriesListBuilder.add(new UserAuthPasswordFactory());
+ factoryManager.setPasswordAuthenticator(
+ (usr, pass, session) -> authProvider.authenticated(usr, pass));
+ if (publicKeyAuthenticator != null) {
+ final var factory = new UserAuthPublicKeyFactory();
+ factory.setSignatureFactories(factoryManager.getSignatureFactories());
+ authFactoriesListBuilder.add(factory);
+ factoryManager.setPublickeyAuthenticator(publicKeyAuthenticator);
+ }
+ factoryManager.setUserAuthFactories(authFactoriesListBuilder.build());
+ factoryManager.setKeyPairProvider(new VirtualKeyPairProvider());
+ };
}
private Set<Capability> parseSchemasToModuleCapabilities(final SharedSchemaRepository consumer) {
PotentialSchemaSource.Costs.IMMEDIATE.getValue()));
}
- private static InetSocketAddress getAddress(final String ip, final int port) {
+ private static IpAddress getIpAddress(final Configuration configuration) {
try {
- return new InetSocketAddress(Inet4Address.getByName(ip), port);
+ return IetfInetUtil.ipAddressFor(InetAddress.getByName(configuration.getIp()));
} catch (final UnknownHostException e) {
- throw new IllegalArgumentException("Cannot resolve address " + ip, e);
+ throw new IllegalArgumentException("Cannot resolve address " + configuration.getIp(), e);
+ }
+ }
+
+ private static int getStartingPort(final Configuration configuration) {
+ final int startingPort = configuration.getStartingPort();
+ if (startingPort > 0 && startingPort < 65536) {
+ return startingPort;
+ }
+ // find available port
+ try {
+ final var socket = new ServerSocket(0);
+ final int port = socket.getLocalPort();
+ socket.close();
+ return port;
+ } catch (IOException e) {
+ throw new IllegalStateException("Cannot find available port", e);
}
}
+ private static TcpServerGrouping connectionParams(final IpAddress address, final int port) {
+ return new TcpServerParametersBuilder().setLocalAddress(address)
+ .setLocalPort(new PortNumber(Uint16.valueOf(port))).build();
+ }
+
@Override
public void close() {
- for (final SshProxyServer sshWrapper : sshWrappers) {
+ for (final var server : servers) {
try {
- sshWrapper.close();
- } catch (final IOException e) {
- LOG.debug("Wrapper {} failed to close", sshWrapper, e);
+ server.shutdown().get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.debug("Exception on simulated device shutdown", e);
}
}
- for (final Channel deviceCh : devicesChannels) {
- deviceCh.close();
- }
- nettyThreadgroup.shutdownGracefully();
- minaTimerExecutor.shutdownNow();
- nioExecutor.shutdownNow();
+ sshTransportStackFactory.close();
}
}