/* * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.akka.segjournal; import static org.opendaylight.controller.akka.segjournal.SegmentedFileJournal.STORAGE_MAX_ENTRY_SIZE; import static org.opendaylight.controller.akka.segjournal.SegmentedFileJournal.STORAGE_MAX_ENTRY_SIZE_DEFAULT; import static org.opendaylight.controller.akka.segjournal.SegmentedFileJournal.STORAGE_MAX_SEGMENT_SIZE; import static org.opendaylight.controller.akka.segjournal.SegmentedFileJournal.STORAGE_MAX_SEGMENT_SIZE_DEFAULT; import static org.opendaylight.controller.akka.segjournal.SegmentedFileJournal.STORAGE_MAX_UNFLUSHED_BYTES; import static org.opendaylight.controller.akka.segjournal.SegmentedFileJournal.STORAGE_MEMORY_MAPPED; import com.google.common.base.Stopwatch; import com.google.common.base.Ticker; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import io.atomix.storage.journal.StorageLevel; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.HashMap; import java.util.Map; import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.impl.Arguments; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; @SuppressWarnings("RegexpSinglelineJava") final class BenchmarkUtils { static final String PROG_NAME = "segjourlan-benchmark"; static final String BENCHMARK_USE_CURRENT = "current"; static final String BENCHMARK_NUMBER_OF_MESSAGES = "messages-num"; static final String BENCHMARK_PAYLOAD_SIZE = "payload-size"; static final String BENCHMARK_PAYLOAD_SIZE_DEFAULT = "10K"; static final String CURRENT_CONFIG_RESOURCE = "/initial/factory-akka.conf"; static final String CURRENT_CONFIG_PATH = "odl-cluster-data.akka.persistence.journal.segmented-file"; private static final String[] BYTE_SFX = {"G", "M", "K"}; private static final int[] BYTE_THRESH = {1024 * 1024 * 1024, 1024 * 1024, 1024}; record BenchmarkConfig(StorageLevel storage, File workingDir, int maxEntrySize, int maxSegmentSize, int maxUnflushedBytes, int payloadSize, int messagesNum) { } private BenchmarkUtils() { // utility class } static BenchmarkConfig buildConfig(final String[] args) { final var parser = getArgumentParser(); final var paramsMap = new HashMap(); try { parser.parseArgs(args, paramsMap); } catch (ArgumentParserException e) { parser.handleError(e); System.exit(1); return null; } return toConfig(paramsMap); } private static ArgumentParser getArgumentParser() { final var parser = ArgumentParsers.newArgumentParser(PROG_NAME).defaultHelp(true); parser.description("Performs asynchronous write to segmented journal, collects and prints variety of metrics"); parser.addArgument("--current") .type(Boolean.class).setDefault(Boolean.FALSE) .action(Arguments.storeConst()).setConst(Boolean.TRUE) .dest(BENCHMARK_USE_CURRENT) .help("indicates base configuration to be taken from current cluster configuration, " + "all other arguments excepting 'requests' and 'payload size' will be ignored"); parser.addArgument("--memory-mapped") .type(Boolean.class).setDefault(Boolean.FALSE) .action(Arguments.storeConst()).setConst(Boolean.TRUE) .dest(STORAGE_MEMORY_MAPPED) .help("indicates mapping journal segments to memory, otherwise file system is used"); parser.addArgument("-e", "--max-entry-size") .type(String.class).setDefault(formatBytes(STORAGE_MAX_ENTRY_SIZE_DEFAULT)) .dest(STORAGE_MAX_ENTRY_SIZE) .help("max entry size, bytes format"); parser.addArgument("-s", "--max-segment-size") .type(String.class).setDefault(formatBytes(STORAGE_MAX_SEGMENT_SIZE_DEFAULT)) .dest(STORAGE_MAX_SEGMENT_SIZE) .help("max segment size, bytes "); parser.addArgument("-u", "--max-unflushed-bytes") .type(String.class) .dest(STORAGE_MAX_UNFLUSHED_BYTES) .help("max unflushed bytes, bytes format, " + "if not defined the value is taken from 'max-entry-size'"); parser.addArgument("-n", "--messages-num") .type(Integer.class).required(true) .dest(BENCHMARK_NUMBER_OF_MESSAGES) .setDefault(10_000) .help("number of messages to write"); parser.addArgument("-p", "--payload-size") .type(String.class).setDefault(BENCHMARK_PAYLOAD_SIZE_DEFAULT) .dest(BENCHMARK_PAYLOAD_SIZE) .help("median for request payload size, bytes format supported, " + "actual size is variable 80% to 120% from defined median value"); return parser; } static BenchmarkConfig toConfig(final Map paramsMap) { final var inputConfig = ConfigFactory.parseMap(paramsMap); final var finalConfig = (Boolean) paramsMap.get(BENCHMARK_USE_CURRENT) ? currentConfig().withFallback(inputConfig) : inputConfig; final var benchmarkConfig = new BenchmarkConfig( finalConfig.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK, createTempDirectory(), bytes(finalConfig, STORAGE_MAX_ENTRY_SIZE), bytes(finalConfig, STORAGE_MAX_SEGMENT_SIZE), finalConfig.hasPath(STORAGE_MAX_UNFLUSHED_BYTES) ? bytes(finalConfig, STORAGE_MAX_UNFLUSHED_BYTES) : bytes(finalConfig, STORAGE_MAX_ENTRY_SIZE), bytes(finalConfig, BENCHMARK_PAYLOAD_SIZE), finalConfig.getInt(BENCHMARK_NUMBER_OF_MESSAGES) ); // validate if (benchmarkConfig.payloadSize > benchmarkConfig.maxEntrySize) { printAndExit("payloadSize should be less than maxEntrySize"); } return benchmarkConfig; } private static int bytes(final Config config, final String key) { final var bytesLong = config.getBytes(key); if (bytesLong <= 0 || bytesLong > Integer.MAX_VALUE) { printAndExit( key + " value (" + bytesLong + ") is invalid, expected in range 1 .. " + Integer.MAX_VALUE); } return bytesLong.intValue(); } static Config currentConfig() { try (var in = BenchmarkUtils.class.getResourceAsStream(CURRENT_CONFIG_RESOURCE)) { final var content = new String(in.readAllBytes(), StandardCharsets.UTF_8); final var globalConfig = ConfigFactory.parseString(content); final var currentConfig = globalConfig.getConfig(CURRENT_CONFIG_PATH); System.out.println("Current configuration loaded from " + CURRENT_CONFIG_RESOURCE); return currentConfig; } catch (IOException e) { printAndExit("Error loading current configuration from resource " + CURRENT_CONFIG_RESOURCE, e); return null; } } private static File createTempDirectory() { try { return Files.createTempDirectory(PROG_NAME).toFile(); } catch (IOException e) { printAndExit("Cannot create temp directory", e); } return null; } private static void printAndExit(final String message) { printAndExit(message, null); } private static void printAndExit(final String message, final Exception exception) { System.err.println(message); if (exception != null) { exception.printStackTrace(System.err); } System.exit(1); } static String formatBytes(int bytes) { for (int i = 0; i < 3; i++) { if (bytes > BYTE_THRESH[i]) { return bytes / BYTE_THRESH[i] + BYTE_SFX[i]; } } return String.valueOf(bytes); } static String formatNanos(final double nanos) { return formatNanos(Math.round(nanos)); } static String formatNanos(final long nanos) { return Stopwatch.createStarted(new Ticker() { boolean started; @Override public long read() { if (started) { return nanos; } started = true; return 0; } }).toString(); } static String toMetricId(final String metricKey) { return metricKey.substring(metricKey.lastIndexOf('.') + 1); } }