From: Ruslan Kashapov Date: Wed, 27 Mar 2024 08:52:41 +0000 (+0200) Subject: Improve segmented journal actor metrics X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=HEAD;hp=ae6c61499e2c7c76e0406ce397657cd31ddd4d3f Improve segmented journal actor metrics Update write time marked on actual flush not on flush request. JIRA: CONTROLLER-2108 Change-Id: I92a66ae775cbae6aeea69bddf654df741f473dbd Signed-off-by: Ruslan Kashapov Signed-off-by: Robert Varga --- diff --git a/.readthedocs.yml b/.readthedocs.yml new file mode 100644 index 0000000000..48b1206393 --- /dev/null +++ b/.readthedocs.yml @@ -0,0 +1,21 @@ +# .readthedocs.yml +# Read the Docs configuration file +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required +version: 2 + +build: + os: ubuntu-22.04 + tools: + python: "3.11" + jobs: + post_checkout: + - git fetch --unshallow || true + +sphinx: + configuration: docs/conf.py + +python: + install: + - requirements: docs/requirements.txt diff --git a/INFO.yaml b/INFO.yaml new file mode 100644 index 0000000000..229dd9db4a --- /dev/null +++ b/INFO.yaml @@ -0,0 +1,68 @@ +--- +project: 'controller' +project_creation_date: '2015-01-08' +project_category: '' +lifecycle_state: 'Incubation' +project_lead: &odl_controller_ptl + name: 'Robert Varga' + email: 'nite@hq.sk' + company: 'hq' + id: 'rovarga' + timezone: 'Unknown/Unknown' +primary_contact: *odl_controller_ptl +issue_tracking: + type: 'jira' + url: 'https://jira.opendaylight.org/projects/' + key: 'controller' +mailing_list: + type: 'groups.io' + url: 'kernel-dev@lists.opendaylight.org' + tag: '[]' +realtime_discussion: + type: 'irc' + server: 'freenode.net' + channel: '#opendaylight' +meetings: + - type: 'gotomeeting+irc' + agenda: 'https://wiki.opendaylight.org' + url: '' + server: 'freenode.net' + channel: '#opendaylight' + repeats: '' + time: '' +repositories: + - controller +committers: + - <<: *odl_controller_ptl + - name: 'Stephen Kitt' + email: 'skitt@redhat.com' + company: 'Redhat' + id: 'skitt' + timezone: 'Unknown/Unknown' + - name: 'Tom Pantelis' + email: 'tompantelis@gmail.com' + company: '' + id: 'tpantelis' + timezone: 'Unknown/Unknown' + - name: 'Ed Warnicke' + email: 'hagbard@gmail.com' + company: '' + id: 'hagbard' + timezone: 'Unknown/Unknown' + - name: 'Michael Vorburger' + email: 'mike@vorburger.ch' + company: 'vorburger' + id: 'vorburger' + timezone: 'Unknown/Unknown' + - name: 'Anil Vishnoi' + email: 'avishnoi@redhat.com' + company: 'Redhat' + id: 'Avishnoi' + timezone: 'Unknown/Unknown' +tsc: + # yamllint disable rule:line-length + approval: 'https://meetings.opendaylight.org/opendaylight-meeting/2015/tsc/opendaylight-meeting-tsc.2015-01-08-18.00.txt' + changes: + - type: '' + name: '' + link: '' diff --git a/features/config-netty/pom.xml b/akka/pom.xml similarity index 56% rename from features/config-netty/pom.xml rename to akka/pom.xml index 12cd79ea66..afd11d7410 100644 --- a/features/config-netty/pom.xml +++ b/akka/pom.xml @@ -1,34 +1,32 @@ +--> 4.0.0 org.opendaylight.odlparent odlparent-lite - 6.0.1 + 13.0.11 org.opendaylight.controller - features-config-netty-aggregator - 0.12.0-SNAPSHOT + akka-aggregator + 9.0.3-SNAPSHOT pom + + true + true + + - features-config-netty - odl-config-netty + repackaged-akka-jar + repackaged-akka - - - scm:git:http://git.opendaylight.org/gerrit/controller.git - scm:git:ssh://git.opendaylight.org:29418/controller.git - HEAD - https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL - diff --git a/akka/repackaged-akka-jar/pom.xml b/akka/repackaged-akka-jar/pom.xml new file mode 100644 index 0000000000..6c62c5d247 --- /dev/null +++ b/akka/repackaged-akka-jar/pom.xml @@ -0,0 +1,139 @@ + + + + + 4.0.0 + + + org.opendaylight.odlparent + odlparent + 13.0.11 + + + + org.opendaylight.controller + repackaged-akka-jar + jar + 9.0.3-SNAPSHOT + ${project.artifactId} + + + + true + + + + + + com.typesafe.akka + akka-actor_2.13 + 2.6.21 + + + com.typesafe.akka + akka-actor-typed_2.13 + 2.6.21 + + + com.typesafe.akka + akka-cluster_2.13 + 2.6.21 + + + com.typesafe.akka + akka-cluster-typed_2.13 + 2.6.21 + + + com.typesafe.akka + akka-osgi_2.13 + 2.6.21 + + + com.typesafe.akka + akka-persistence_2.13 + 2.6.21 + + + com.typesafe.akka + akka-protobuf_2.13 + 2.6.21 + + + com.typesafe.akka + akka-remote_2.13 + 2.6.21 + + + com.typesafe.akka + akka-slf4j_2.13 + 2.6.21 + + + com.typesafe.akka + akka-stream_2.13 + 2.6.21 + + + + + + + maven-dependency-plugin + + + unpack-license + + + true + + + + + + maven-shade-plugin + + + package + + shade + + + false + true + true + true + + + com.typesafe.akka + + + + + com.typesafe.akka:* + + META-INF/MANIFEST.MF + reference.conf + + + + + + + + + maven-source-plugin + + + true + + + + + diff --git a/akka/repackaged-akka-jar/src/main/resources/LICENSE b/akka/repackaged-akka-jar/src/main/resources/LICENSE new file mode 100644 index 0000000000..c7d5a563cc --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/LICENSE @@ -0,0 +1,212 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--------------- + +Licenses for dependency projects can be found here: +[http://akka.io/docs/akka/snapshot/project/licenses.html] + +--------------- + +akka-protobuf contains the sources of Google protobuf 2.5.0 runtime support, +moved into the source package `akka.protobuf` so as to avoid version conflicts. +For license information see COPYING.protobuf diff --git a/akka/repackaged-akka-jar/src/main/resources/actor_reference.conf b/akka/repackaged-akka-jar/src/main/resources/actor_reference.conf new file mode 100644 index 0000000000..d41cb39ae4 --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/actor_reference.conf @@ -0,0 +1,1351 @@ +#################################### +# Akka Actor Reference Config File # +#################################### + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + +# Akka version, checked against the runtime version of Akka. Loaded from generated conf file. +include "version" + +akka { + # Home directory of Akka, modules in the deploy directory will be loaded + home = "" + + # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs + # to STDOUT) + loggers = ["akka.event.Logging$DefaultLogger"] + + # Filter of log events that is used by the LoggingAdapter before + # publishing log events to the eventStream. It can perform + # fine grained filtering based on the log source. The default + # implementation filters on the `loglevel`. + # FQCN of the LoggingFilter. The Class of the FQCN must implement + # akka.event.LoggingFilter and have a public constructor with + # (akka.actor.ActorSystem.Settings, akka.event.EventStream) parameters. + logging-filter = "akka.event.DefaultLoggingFilter" + + # Specifies the default loggers dispatcher + loggers-dispatcher = "akka.actor.default-dispatcher" + + # Loggers are created and registered synchronously during ActorSystem + # start-up, and since they are actors, this timeout is used to bound the + # waiting time + logger-startup-timeout = 5s + + # Log level used by the configured loggers (see "loggers") as soon + # as they have been started; before that, see "stdout-loglevel" + # Options: OFF, ERROR, WARNING, INFO, DEBUG + loglevel = "INFO" + + # Log level for the very basic logger activated during ActorSystem startup. + # This logger prints the log messages to stdout (System.out). + # Options: OFF, ERROR, WARNING, INFO, DEBUG + stdout-loglevel = "WARNING" + + # Log the complete configuration at INFO level when the actor system is started. + # This is useful when you are uncertain of what configuration is used. + log-config-on-start = off + + # Log at info level when messages are sent to dead letters, or published to + # eventStream as `DeadLetter`, `Dropped` or `UnhandledMessage`. + # Possible values: + # on: all dead letters are logged + # off: no logging of dead letters + # n: positive integer, number of dead letters that will be logged + log-dead-letters = 10 + + # Possibility to turn off logging of dead letters while the actor system + # is shutting down. Logging is only done when enabled by 'log-dead-letters' + # setting. + log-dead-letters-during-shutdown = off + + # When log-dead-letters is enabled, this will re-enable the logging after configured duration. + # infinite: suspend the logging forever; + # or a duration (eg: 5 minutes), after which the logging will be re-enabled. + log-dead-letters-suspend-duration = 5 minutes + + # List FQCN of extensions which shall be loaded at actor system startup. + # Library extensions are regular extensions that are loaded at startup and are + # available for third party library authors to enable auto-loading of extensions when + # present on the classpath. This is done by appending entries: + # 'library-extensions += "Extension"' in the library `reference.conf`. + # + # Should not be set by end user applications in 'application.conf', use the extensions property for that + # + library-extensions = ${?akka.library-extensions} ["akka.serialization.SerializationExtension$"] + + # List FQCN of extensions which shall be loaded at actor system startup. + # Should be on the format: 'extensions = ["foo", "bar"]' etc. + # See the Akka Documentation for more info about Extensions + extensions = [] + + # Toggles whether threads created by this ActorSystem should be daemons or not + daemonic = off + + # JVM shutdown, System.exit(-1), in case of a fatal error, + # such as OutOfMemoryError + jvm-exit-on-fatal-error = on + + # Akka installs JVM shutdown hooks by default, e.g. in CoordinatedShutdown and Artery. This property will + # not disable user-provided hooks registered using `CoordinatedShutdown#addCancellableJvmShutdownHook`. + # This property is related to `akka.coordinated-shutdown.run-by-jvm-shutdown-hook` below. + # This property makes it possible to disable all such hooks if the application itself + # or a higher level framework such as Play prefers to install the JVM shutdown hook and + # terminate the ActorSystem itself, with or without using CoordinatedShutdown. + jvm-shutdown-hooks = on + + # Version must be the same across all modules and if they are different the startup + # will fail. It's possible but not recommended, to disable this check, and only log a warning, + # by setting this property to `off`. + fail-mixed-versions = on + + # Some modules (remoting only right now) can emit custom events to the Java Flight Recorder if running + # on JDK 11 or later. If you for some reason do not want that, it can be disabled and switched to no-ops + # with this toggle. + java-flight-recorder { + enabled = true + } + + actor { + + # Either one of "local", "remote" or "cluster" or the + # FQCN of the ActorRefProvider to be used; the below is the built-in default, + # note that "remote" and "cluster" requires the akka-remote and akka-cluster + # artifacts to be on the classpath. + provider = "local" + + # The guardian "/user" will use this class to obtain its supervisorStrategy. + # It needs to be a subclass of akka.actor.SupervisorStrategyConfigurator. + # In addition to the default there is akka.actor.StoppingSupervisorStrategy. + guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy" + + # Timeout for Extension creation and a few other potentially blocking + # initialization tasks. + creation-timeout = 20s + + # Serializes and deserializes (non-primitive) messages to ensure immutability, + # this is only intended for testing. + serialize-messages = off + + # Serializes and deserializes creators (in Props) to ensure that they can be + # sent over the network, this is only intended for testing. Purely local deployments + # as marked with deploy.scope == LocalScope are exempt from verification. + serialize-creators = off + + # If serialize-messages or serialize-creators are enabled classes that starts with + # a prefix listed here are not verified. + no-serialization-verification-needed-class-prefix = ["akka."] + + # Timeout for send operations to top-level actors which are in the process + # of being started. This is only relevant if using a bounded mailbox or the + # CallingThreadDispatcher for a top-level actor. + unstarted-push-timeout = 10s + + # TypedActor deprecated since 2.6.0. + typed { + # Default timeout for the deprecated TypedActor (not the new actor APIs in 2.6) + # methods with non-void return type. + timeout = 5s + } + + # Mapping between ´deployment.router' short names to fully qualified class names + router.type-mapping { + from-code = "akka.routing.NoRouter" + round-robin-pool = "akka.routing.RoundRobinPool" + round-robin-group = "akka.routing.RoundRobinGroup" + random-pool = "akka.routing.RandomPool" + random-group = "akka.routing.RandomGroup" + balancing-pool = "akka.routing.BalancingPool" + smallest-mailbox-pool = "akka.routing.SmallestMailboxPool" + broadcast-pool = "akka.routing.BroadcastPool" + broadcast-group = "akka.routing.BroadcastGroup" + scatter-gather-pool = "akka.routing.ScatterGatherFirstCompletedPool" + scatter-gather-group = "akka.routing.ScatterGatherFirstCompletedGroup" + tail-chopping-pool = "akka.routing.TailChoppingPool" + tail-chopping-group = "akka.routing.TailChoppingGroup" + consistent-hashing-pool = "akka.routing.ConsistentHashingPool" + consistent-hashing-group = "akka.routing.ConsistentHashingGroup" + } + + deployment { + + # deployment id pattern - on the format: /parent/child etc. + default { + + # The id of the dispatcher to use for this actor. + # If undefined or empty the dispatcher specified in code + # (Props.withDispatcher) is used, or default-dispatcher if not + # specified at all. + dispatcher = "" + + # The id of the mailbox to use for this actor. + # If undefined or empty the default mailbox of the configured dispatcher + # is used or if there is no mailbox configuration the mailbox specified + # in code (Props.withMailbox) is used. + # If there is a mailbox defined in the configured dispatcher then that + # overrides this setting. + mailbox = "" + + # routing (load-balance) scheme to use + # - available: "from-code", "round-robin", "random", "smallest-mailbox", + # "scatter-gather", "broadcast" + # - or: Fully qualified class name of the router class. + # The class must extend akka.routing.CustomRouterConfig and + # have a public constructor with com.typesafe.config.Config + # and optional akka.actor.DynamicAccess parameter. + # - default is "from-code"; + # Whether or not an actor is transformed to a Router is decided in code + # only (Props.withRouter). The type of router can be overridden in the + # configuration; specifying "from-code" means that the values specified + # in the code shall be used. + # In case of routing, the actors to be routed to can be specified + # in several ways: + # - nr-of-instances: will create that many children + # - routees.paths: will route messages to these paths using ActorSelection, + # i.e. will not create children + # - resizer: dynamically resizable number of routees as specified in + # resizer below + router = "from-code" + + # number of children to create in case of a router; + # this setting is ignored if routees.paths is given + nr-of-instances = 1 + + # within is the timeout used for routers containing future calls + within = 5 seconds + + # number of virtual nodes per node for consistent-hashing router + virtual-nodes-factor = 10 + + tail-chopping-router { + # interval is duration between sending message to next routee + interval = 10 milliseconds + } + + routees { + # Alternatively to giving nr-of-instances you can specify the full + # paths of those actors which should be routed to. This setting takes + # precedence over nr-of-instances + paths = [] + } + + # To use a dedicated dispatcher for the routees of the pool you can + # define the dispatcher configuration inline with the property name + # 'pool-dispatcher' in the deployment section of the router. + # For example: + # pool-dispatcher { + # fork-join-executor.parallelism-min = 5 + # fork-join-executor.parallelism-max = 5 + # } + + # Routers with dynamically resizable number of routees; this feature is + # enabled by including (parts of) this section in the deployment + resizer { + + enabled = off + + # The fewest number of routees the router should ever have. + lower-bound = 1 + + # The most number of routees the router should ever have. + # Must be greater than or equal to lower-bound. + upper-bound = 10 + + # Threshold used to evaluate if a routee is considered to be busy + # (under pressure). Implementation depends on this value (default is 1). + # 0: number of routees currently processing a message. + # 1: number of routees currently processing a message has + # some messages in mailbox. + # > 1: number of routees with at least the configured pressure-threshold + # messages in their mailbox. Note that estimating mailbox size of + # default UnboundedMailbox is O(N) operation. + pressure-threshold = 1 + + # Percentage to increase capacity whenever all routees are busy. + # For example, 0.2 would increase 20% (rounded up), i.e. if current + # capacity is 6 it will request an increase of 2 more routees. + rampup-rate = 0.2 + + # Minimum fraction of busy routees before backing off. + # For example, if this is 0.3, then we'll remove some routees only when + # less than 30% of routees are busy, i.e. if current capacity is 10 and + # 3 are busy then the capacity is unchanged, but if 2 or less are busy + # the capacity is decreased. + # Use 0.0 or negative to avoid removal of routees. + backoff-threshold = 0.3 + + # Fraction of routees to be removed when the resizer reaches the + # backoffThreshold. + # For example, 0.1 would decrease 10% (rounded up), i.e. if current + # capacity is 9 it will request an decrease of 1 routee. + backoff-rate = 0.1 + + # Number of messages between resize operation. + # Use 1 to resize before each message. + messages-per-resize = 10 + } + + # Routers with dynamically resizable number of routees based on + # performance metrics. + # This feature is enabled by including (parts of) this section in + # the deployment, cannot be enabled together with default resizer. + optimal-size-exploring-resizer { + + enabled = off + + # The fewest number of routees the router should ever have. + lower-bound = 1 + + # The most number of routees the router should ever have. + # Must be greater than or equal to lower-bound. + upper-bound = 10 + + # probability of doing a ramping down when all routees are busy + # during exploration. + chance-of-ramping-down-when-full = 0.2 + + # Interval between each resize attempt + action-interval = 5s + + # If the routees have not been fully utilized (i.e. all routees busy) + # for such length, the resizer will downsize the pool. + downsize-after-underutilized-for = 72h + + # Duration exploration, the ratio between the largest step size and + # current pool size. E.g. if the current pool size is 50, and the + # explore-step-size is 0.1, the maximum pool size change during + # exploration will be +- 5 + explore-step-size = 0.1 + + # Probability of doing an exploration v.s. optimization. + chance-of-exploration = 0.4 + + # When downsizing after a long streak of underutilization, the resizer + # will downsize the pool to the highest utiliziation multiplied by a + # a downsize ratio. This downsize ratio determines the new pools size + # in comparison to the highest utilization. + # E.g. if the highest utilization is 10, and the down size ratio + # is 0.8, the pool will be downsized to 8 + downsize-ratio = 0.8 + + # When optimizing, the resizer only considers the sizes adjacent to the + # current size. This number indicates how many adjacent sizes to consider. + optimization-range = 16 + + # The weight of the latest metric over old metrics when collecting + # performance metrics. + # E.g. if the last processing speed is 10 millis per message at pool + # size 5, and if the new processing speed collected is 6 millis per + # message at pool size 5. Given a weight of 0.3, the metrics + # representing pool size 5 will be 6 * 0.3 + 10 * 0.7, i.e. 8.8 millis + # Obviously, this number should be between 0 and 1. + weight-of-latest-metric = 0.5 + } + } + + "/IO-DNS/inet-address" { + mailbox = "unbounded" + router = "consistent-hashing-pool" + nr-of-instances = 4 + } + + "/IO-DNS/inet-address/*" { + dispatcher = "akka.actor.default-blocking-io-dispatcher" + } + + "/IO-DNS/async-dns" { + mailbox = "unbounded" + router = "round-robin-pool" + nr-of-instances = 1 + } + } + + default-dispatcher { + # Must be one of the following + # Dispatcher, PinnedDispatcher, or a FQCN to a class inheriting + # MessageDispatcherConfigurator with a public constructor with + # both com.typesafe.config.Config parameter and + # akka.dispatch.DispatcherPrerequisites parameters. + # PinnedDispatcher must be used together with executor=thread-pool-executor. + type = "Dispatcher" + + # Which kind of ExecutorService to use for this dispatcher + # Valid options: + # - "default-executor" requires a "default-executor" section + # - "fork-join-executor" requires a "fork-join-executor" section + # - "thread-pool-executor" requires a "thread-pool-executor" section + # - "affinity-pool-executor" requires an "affinity-pool-executor" section + # - A FQCN of a class extending ExecutorServiceConfigurator + executor = "default-executor" + + # This will be used if you have set "executor = "default-executor"". + # If an ActorSystem is created with a given ExecutionContext, this + # ExecutionContext will be used as the default executor for all + # dispatchers in the ActorSystem configured with + # executor = "default-executor". Note that "default-executor" + # is the default value for executor, and therefore used if not + # specified otherwise. If no ExecutionContext is given, + # the executor configured in "fallback" will be used. + default-executor { + fallback = "fork-join-executor" + } + + # This will be used if you have set "executor = "affinity-pool-executor"" + # Underlying thread pool implementation is akka.dispatch.affinity.AffinityPool. + # This executor is classified as "ApiMayChange". + affinity-pool-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 4 + + # The parallelism factor is used to determine thread pool size using the + # following formula: ceil(available processors * factor). Resulting size + # is then bounded by the parallelism-min and parallelism-max values. + parallelism-factor = 0.8 + + # Max number of threads to cap factor-based parallelism number to. + parallelism-max = 64 + + # Each worker in the pool uses a separate bounded MPSC queue. This value + # indicates the upper bound of the queue. Whenever an attempt to enqueue + # a task is made and the queue does not have capacity to accommodate + # the task, the rejection handler created by the rejection handler specified + # in "rejection-handler" is invoked. + task-queue-size = 512 + + # FQCN of the Rejection handler used in the pool. + # Must have an empty public constructor and must + # implement akka.actor.affinity.RejectionHandlerFactory. + rejection-handler = "akka.dispatch.affinity.ThrowOnOverflowRejectionHandler" + + # Level of CPU time used, on a scale between 1 and 10, during backoff/idle. + # The tradeoff is that to have low latency more CPU time must be used to be + # able to react quickly on incoming messages or send as fast as possible after + # backoff backpressure. + # Level 1 strongly prefer low CPU consumption over low latency. + # Level 10 strongly prefer low latency over low CPU consumption. + idle-cpu-level = 5 + + # FQCN of the akka.dispatch.affinity.QueueSelectorFactory. + # The Class of the FQCN must have a public constructor with a + # (com.typesafe.config.Config) parameter. + # A QueueSelectorFactory create instances of akka.dispatch.affinity.QueueSelector, + # that is responsible for determining which task queue a Runnable should be enqueued in. + queue-selector = "akka.dispatch.affinity.FairDistributionHashCache" + + # When using the "akka.dispatch.affinity.FairDistributionHashCache" queue selector + # internally the AffinityPool uses two methods to determine which task + # queue to allocate a Runnable to: + # - map based - maintains a round robin counter and a map of Runnable + # hashcodes to queues that they have been associated with. This ensures + # maximum fairness in terms of work distribution, meaning that each worker + # will get approximately equal amount of mailboxes to execute. This is suitable + # in cases where we have a small number of actors that will be scheduled on + # the pool and we want to ensure the maximum possible utilization of the + # available threads. + # - hash based - the task - queue in which the runnable should go is determined + # by using an uniformly distributed int to int hash function which uses the + # hash code of the Runnable as an input. This is preferred in situations where we + # have enough number of distinct actors to ensure statistically uniform + # distribution of work across threads or we are ready to sacrifice the + # former for the added benefit of avoiding map look-ups. + fair-work-distribution { + # The value serves as a threshold which determines the point at which the + # pool switches from the first to the second work distribution schemes. + # For example, if the value is set to 128, the pool can observe up to + # 128 unique actors and schedule their mailboxes using the map based + # approach. Once this number is reached the pool switches to hash based + # task distribution mode. If the value is set to 0, the map based + # work distribution approach is disabled and only the hash based is + # used irrespective of the number of unique actors. Valid range is + # 0 to 2048 (inclusive) + threshold = 128 + } + } + + # This will be used if you have set "executor = "fork-join-executor"" + # Underlying thread pool implementation is java.util.concurrent.ForkJoinPool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 8 + + # The parallelism factor is used to determine thread pool size using the + # following formula: ceil(available processors * factor). Resulting size + # is then bounded by the parallelism-min and parallelism-max values. + parallelism-factor = 1.0 + + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 64 + + # Setting to "FIFO" to use queue like peeking mode which "poll" or "LIFO" to use stack + # like peeking mode which "pop". + task-peeking-mode = "FIFO" + } + + # This will be used if you have set "executor = "thread-pool-executor"" + # Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor + thread-pool-executor { + # Keep alive time for threads + keep-alive-time = 60s + + # Define a fixed thread pool size with this property. The corePoolSize + # and the maximumPoolSize of the ThreadPoolExecutor will be set to this + # value, if it is defined. Then the other pool-size properties will not + # be used. + # + # Valid values are: `off` or a positive integer. + fixed-pool-size = off + + # Min number of threads to cap factor-based corePoolSize number to + core-pool-size-min = 8 + + # The core-pool-size-factor is used to determine corePoolSize of the + # ThreadPoolExecutor using the following formula: + # ceil(available processors * factor). + # Resulting size is then bounded by the core-pool-size-min and + # core-pool-size-max values. + core-pool-size-factor = 3.0 + + # Max number of threads to cap factor-based corePoolSize number to + core-pool-size-max = 64 + + # Minimum number of threads to cap factor-based maximumPoolSize number to + max-pool-size-min = 8 + + # The max-pool-size-factor is used to determine maximumPoolSize of the + # ThreadPoolExecutor using the following formula: + # ceil(available processors * factor) + # The maximumPoolSize will not be less than corePoolSize. + # It is only used if using a bounded task queue. + max-pool-size-factor = 3.0 + + # Max number of threads to cap factor-based maximumPoolSize number to + max-pool-size-max = 64 + + # Specifies the bounded capacity of the task queue (< 1 == unbounded) + task-queue-size = -1 + + # Specifies which type of task queue will be used, can be "array" or + # "linked" (default) + task-queue-type = "linked" + + # Allow core threads to time out + allow-core-timeout = on + } + + # How long time the dispatcher will wait for new actors until it shuts down + shutdown-timeout = 1s + + # Throughput defines the number of messages that are processed in a batch + # before the thread is returned to the pool. Set to 1 for as fair as possible. + throughput = 5 + + # Throughput deadline for Dispatcher, set to 0 or negative for no deadline + throughput-deadline-time = 0ms + + # For BalancingDispatcher: If the balancing dispatcher should attempt to + # schedule idle actors using the same dispatcher when a message comes in, + # and the dispatchers ExecutorService is not fully busy already. + attempt-teamwork = on + + # If this dispatcher requires a specific type of mailbox, specify the + # fully-qualified class name here; the actually created mailbox will + # be a subtype of this type. The empty string signifies no requirement. + mailbox-requirement = "" + } + + # Default separate internal dispatcher to run Akka internal tasks and actors on + # protecting them against starvation because of accidental blocking in user actors (which run on the + # default dispatcher) + internal-dispatcher { + type = "Dispatcher" + executor = "fork-join-executor" + throughput = 5 + fork-join-executor { + parallelism-min = 4 + parallelism-factor = 1.0 + parallelism-max = 64 + } + } + + default-blocking-io-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + throughput = 1 + + thread-pool-executor { + fixed-pool-size = 16 + } + } + + default-mailbox { + # FQCN of the MailboxType. The Class of the FQCN must have a public + # constructor with + # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.UnboundedMailbox" + + # If the mailbox is bounded then it uses this setting to determine its + # capacity. The provided value must be positive. + # NOTICE: + # Up to version 2.1 the mailbox type was determined based on this setting; + # this is no longer the case, the type must explicitly be a bounded mailbox. + mailbox-capacity = 1000 + + # If the mailbox is bounded then this is the timeout for enqueueing + # in case the mailbox is full. Negative values signify infinite + # timeout, which should be avoided as it bears the risk of dead-lock. + mailbox-push-timeout-time = 10s + + # For Actor with Stash: The default capacity of the stash. + # If negative (or zero) then an unbounded stash is used (default) + # If positive then a bounded stash is used and the capacity is set using + # the property + stash-capacity = -1 + } + + mailbox { + # Mapping between message queue semantics and mailbox configurations. + # Used by akka.dispatch.RequiresMessageQueue[T] to enforce different + # mailbox types on actors. + # If your Actor implements RequiresMessageQueue[T], then when you create + # an instance of that actor its mailbox type will be decided by looking + # up a mailbox configuration via T in this mapping + requirements { + "akka.dispatch.UnboundedMessageQueueSemantics" = + akka.actor.mailbox.unbounded-queue-based + "akka.dispatch.BoundedMessageQueueSemantics" = + akka.actor.mailbox.bounded-queue-based + "akka.dispatch.DequeBasedMessageQueueSemantics" = + akka.actor.mailbox.unbounded-deque-based + "akka.dispatch.UnboundedDequeBasedMessageQueueSemantics" = + akka.actor.mailbox.unbounded-deque-based + "akka.dispatch.BoundedDequeBasedMessageQueueSemantics" = + akka.actor.mailbox.bounded-deque-based + "akka.dispatch.MultipleConsumerSemantics" = + akka.actor.mailbox.unbounded-queue-based + "akka.dispatch.ControlAwareMessageQueueSemantics" = + akka.actor.mailbox.unbounded-control-aware-queue-based + "akka.dispatch.UnboundedControlAwareMessageQueueSemantics" = + akka.actor.mailbox.unbounded-control-aware-queue-based + "akka.dispatch.BoundedControlAwareMessageQueueSemantics" = + akka.actor.mailbox.bounded-control-aware-queue-based + "akka.event.LoggerMessageQueueSemantics" = + akka.actor.mailbox.logger-queue + } + + unbounded-queue-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public + # constructor with (akka.actor.ActorSystem.Settings, + # com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.UnboundedMailbox" + } + + bounded-queue-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public + # constructor with (akka.actor.ActorSystem.Settings, + # com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.BoundedMailbox" + } + + unbounded-deque-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public + # constructor with (akka.actor.ActorSystem.Settings, + # com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" + } + + bounded-deque-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public + # constructor with (akka.actor.ActorSystem.Settings, + # com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox" + } + + unbounded-control-aware-queue-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public + # constructor with (akka.actor.ActorSystem.Settings, + # com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox" + } + + bounded-control-aware-queue-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public + # constructor with (akka.actor.ActorSystem.Settings, + # com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.BoundedControlAwareMailbox" + } + + # The LoggerMailbox will drain all messages in the mailbox + # when the system is shutdown and deliver them to the StandardOutLogger. + # Do not change this unless you know what you are doing. + logger-queue { + mailbox-type = "akka.event.LoggerMailboxType" + } + } + + debug { + # enable function of Actor.loggable(), which is to log any received message + # at DEBUG level, see the “Testing Actor Systems” section of the Akka + # Documentation at https://akka.io/docs + receive = off + + # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill etc.) + autoreceive = off + + # enable DEBUG logging of actor lifecycle changes + lifecycle = off + + # enable DEBUG logging of all LoggingFSMs for events, transitions and timers + fsm = off + + # enable DEBUG logging of subscription changes on the eventStream + event-stream = off + + # enable DEBUG logging of unhandled messages + unhandled = off + + # enable WARN logging of misconfigured routers + router-misconfiguration = off + } + + # SECURITY BEST-PRACTICE is to disable java serialization for its multiple + # known attack surfaces. + # + # This setting is a short-cut to + # - using DisabledJavaSerializer instead of JavaSerializer + # + # Completely disable the use of `akka.serialization.JavaSerialization` by the + # Akka Serialization extension, instead DisabledJavaSerializer will + # be inserted which will fail explicitly if attempts to use java serialization are made. + # + # The log messages emitted by such serializer SHOULD be treated as potential + # attacks which the serializer prevented, as they MAY indicate an external operator + # attempting to send malicious messages intending to use java serialization as attack vector. + # The attempts are logged with the SECURITY marker. + # + # Please note that this option does not stop you from manually invoking java serialization + # + allow-java-serialization = on + + # Log warnings when the Java serialization is used to serialize messages. + # Java serialization is not very performant and should not be used in production + # environments unless you don't care about performance and security. In that case + # you can turn this off. + warn-about-java-serializer-usage = on + + # To be used with the above warn-about-java-serializer-usage + # When warn-about-java-serializer-usage = on, and this warn-on-no-serialization-verification = off, + # warnings are suppressed for classes extending NoSerializationVerificationNeeded + # to reduce noise. + warn-on-no-serialization-verification = on + + # Entries for pluggable serializers and their bindings. + serializers { + java = "akka.serialization.JavaSerializer" + bytes = "akka.serialization.ByteArraySerializer" + primitive-long = "akka.serialization.LongSerializer" + primitive-int = "akka.serialization.IntSerializer" + primitive-string = "akka.serialization.StringSerializer" + primitive-bytestring = "akka.serialization.ByteStringSerializer" + primitive-boolean = "akka.serialization.BooleanSerializer" + } + + # Class to Serializer binding. You only need to specify the name of an + # interface or abstract base class of the messages. In case of ambiguity it + # is using the most specific configured class, or giving a warning and + # choosing the “first” one. + # + # To disable one of the default serializers, assign its class to "none", like + # "java.io.Serializable" = none + serialization-bindings { + "[B" = bytes + "java.io.Serializable" = java + + "java.lang.String" = primitive-string + "akka.util.ByteString$ByteString1C" = primitive-bytestring + "akka.util.ByteString$ByteString1" = primitive-bytestring + "akka.util.ByteString$ByteStrings" = primitive-bytestring + "java.lang.Long" = primitive-long + "scala.Long" = primitive-long + "java.lang.Integer" = primitive-int + "scala.Int" = primitive-int + "java.lang.Boolean" = primitive-boolean + "scala.Boolean" = primitive-boolean + } + + # Configuration namespace of serialization identifiers. + # Each serializer implementation must have an entry in the following format: + # `akka.actor.serialization-identifiers."FQCN" = ID` + # where `FQCN` is fully qualified class name of the serializer implementation + # and `ID` is globally unique serializer identifier number. + # Identifier values from 0 to 40 are reserved for Akka internal usage. + serialization-identifiers { + "akka.serialization.JavaSerializer" = 1 + "akka.serialization.ByteArraySerializer" = 4 + + primitive-long = 18 + primitive-int = 19 + primitive-string = 20 + primitive-bytestring = 21 + primitive-boolean = 35 + } + + } + + serialization.protobuf { + # deprecated, use `allowed-classes` instead + whitelist-class = [ + "com.google.protobuf.GeneratedMessage", + "com.google.protobuf.GeneratedMessageV3", + "scalapb.GeneratedMessageCompanion", + "akka.protobuf.GeneratedMessage", + "akka.protobufv3.internal.GeneratedMessageV3" + ] + + # Additional classes that are allowed even if they are not defined in `serialization-bindings`. + # It can be exact class name or name of super class or interfaces (one level). + # This is useful when a class is not used for serialization any more and therefore removed + # from `serialization-bindings`, but should still be possible to deserialize. + allowed-classes = ${akka.serialization.protobuf.whitelist-class} + + } + + # Used to set the behavior of the scheduler. + # Changing the default values may change the system behavior drastically so make + # sure you know what you're doing! See the Scheduler section of the Akka + # Documentation for more details. + scheduler { + # The LightArrayRevolverScheduler is used as the default scheduler in the + # system. It does not execute the scheduled tasks on exact time, but on every + # tick, it will run everything that is (over)due. You can increase or decrease + # the accuracy of the execution timing by specifying smaller or larger tick + # duration. If you are scheduling a lot of tasks you should consider increasing + # the ticks per wheel. + # Note that it might take up to 1 tick to stop the Timer, so setting the + # tick-duration to a high value will make shutting down the actor system + # take longer. + tick-duration = 10ms + + # The timer uses a circular wheel of buckets to store the timer tasks. + # This should be set such that the majority of scheduled timeouts (for high + # scheduling frequency) will be shorter than one rotation of the wheel + # (ticks-per-wheel * ticks-duration) + # THIS MUST BE A POWER OF TWO! + ticks-per-wheel = 512 + + # This setting selects the timer implementation which shall be loaded at + # system start-up. + # The class given here must implement the akka.actor.Scheduler interface + # and offer a public constructor which takes three arguments: + # 1) com.typesafe.config.Config + # 2) akka.event.LoggingAdapter + # 3) java.util.concurrent.ThreadFactory + implementation = akka.actor.LightArrayRevolverScheduler + + # When shutting down the scheduler, there will typically be a thread which + # needs to be stopped, and this timeout determines how long to wait for + # that to happen. In case of timeout the shutdown of the actor system will + # proceed without running possibly still enqueued tasks. + shutdown-timeout = 5s + } + + io { + + # By default the select loops run on dedicated threads, hence using a + # PinnedDispatcher + pinned-dispatcher { + type = "PinnedDispatcher" + executor = "thread-pool-executor" + thread-pool-executor.allow-core-timeout = off + } + + tcp { + + # The number of selectors to stripe the served channels over; each of + # these will use one select loop on the selector-dispatcher. + nr-of-selectors = 1 + + # Maximum number of open channels supported by this TCP module; there is + # no intrinsic general limit, this setting is meant to enable DoS + # protection by limiting the number of concurrently connected clients. + # Also note that this is a "soft" limit; in certain cases the implementation + # will accept a few connections more or a few less than the number configured + # here. Must be an integer > 0 or "unlimited". + max-channels = 256000 + + # When trying to assign a new connection to a selector and the chosen + # selector is at full capacity, retry selector choosing and assignment + # this many times before giving up + selector-association-retries = 10 + + # The maximum number of connection that are accepted in one go, + # higher numbers decrease latency, lower numbers increase fairness on + # the worker-dispatcher + batch-accept-limit = 10 + + # The number of bytes per direct buffer in the pool used to read or write + # network data from the kernel. + direct-buffer-size = 128 KiB + + # The maximal number of direct buffers kept in the direct buffer pool for + # reuse. + direct-buffer-pool-limit = 1000 + + # The duration a connection actor waits for a `Register` message from + # its commander before aborting the connection. + register-timeout = 5s + + # The maximum number of bytes delivered by a `Received` message. Before + # more data is read from the network the connection actor will try to + # do other work. + # The purpose of this setting is to impose a smaller limit than the + # configured receive buffer size. When using value 'unlimited' it will + # try to read all from the receive buffer. + max-received-message-size = unlimited + + # Enable fine grained logging of what goes on inside the implementation. + # Be aware that this may log more than once per message sent to the actors + # of the tcp implementation. + trace-logging = off + + # Fully qualified config path which holds the dispatcher configuration + # to be used for running the select() calls in the selectors + selector-dispatcher = "akka.io.pinned-dispatcher" + + # Fully qualified config path which holds the dispatcher configuration + # for the read/write worker actors + worker-dispatcher = "akka.actor.internal-dispatcher" + + # Fully qualified config path which holds the dispatcher configuration + # for the selector management actors + management-dispatcher = "akka.actor.internal-dispatcher" + + # Fully qualified config path which holds the dispatcher configuration + # on which file IO tasks are scheduled + file-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" + + # The maximum number of bytes (or "unlimited") to transfer in one batch + # when using `WriteFile` command which uses `FileChannel.transferTo` to + # pipe files to a TCP socket. On some OS like Linux `FileChannel.transferTo` + # may block for a long time when network IO is faster than file IO. + # Decreasing the value may improve fairness while increasing may improve + # throughput. + file-io-transferTo-limit = 512 KiB + + # The number of times to retry the `finishConnect` call after being notified about + # OP_CONNECT. Retries are needed if the OP_CONNECT notification doesn't imply that + # `finishConnect` will succeed, which is the case on Android. + finish-connect-retries = 5 + + # On Windows connection aborts are not reliably detected unless an OP_READ is + # registered on the selector _after_ the connection has been reset. This + # workaround enables an OP_CONNECT which forces the abort to be visible on Windows. + # Enabling this setting on other platforms than Windows will cause various failures + # and undefined behavior. + # Possible values of this key are on, off and auto where auto will enable the + # workaround if Windows is detected automatically. + windows-connection-abort-workaround-enabled = off + } + + udp { + + # The number of selectors to stripe the served channels over; each of + # these will use one select loop on the selector-dispatcher. + nr-of-selectors = 1 + + # Maximum number of open channels supported by this UDP module Generally + # UDP does not require a large number of channels, therefore it is + # recommended to keep this setting low. + max-channels = 4096 + + # The select loop can be used in two modes: + # - setting "infinite" will select without a timeout, hogging a thread + # - setting a positive timeout will do a bounded select call, + # enabling sharing of a single thread between multiple selectors + # (in this case you will have to use a different configuration for the + # selector-dispatcher, e.g. using "type=Dispatcher" with size 1) + # - setting it to zero means polling, i.e. calling selectNow() + select-timeout = infinite + + # When trying to assign a new connection to a selector and the chosen + # selector is at full capacity, retry selector choosing and assignment + # this many times before giving up + selector-association-retries = 10 + + # The maximum number of datagrams that are read in one go, + # higher numbers decrease latency, lower numbers increase fairness on + # the worker-dispatcher + receive-throughput = 3 + + # The number of bytes per direct buffer in the pool used to read or write + # network data from the kernel. + direct-buffer-size = 128 KiB + + # The maximal number of direct buffers kept in the direct buffer pool for + # reuse. + direct-buffer-pool-limit = 1000 + + # Enable fine grained logging of what goes on inside the implementation. + # Be aware that this may log more than once per message sent to the actors + # of the tcp implementation. + trace-logging = off + + # Fully qualified config path which holds the dispatcher configuration + # to be used for running the select() calls in the selectors + selector-dispatcher = "akka.io.pinned-dispatcher" + + # Fully qualified config path which holds the dispatcher configuration + # for the read/write worker actors + worker-dispatcher = "akka.actor.internal-dispatcher" + + # Fully qualified config path which holds the dispatcher configuration + # for the selector management actors + management-dispatcher = "akka.actor.internal-dispatcher" + } + + udp-connected { + + # The number of selectors to stripe the served channels over; each of + # these will use one select loop on the selector-dispatcher. + nr-of-selectors = 1 + + # Maximum number of open channels supported by this UDP module Generally + # UDP does not require a large number of channels, therefore it is + # recommended to keep this setting low. + max-channels = 4096 + + # The select loop can be used in two modes: + # - setting "infinite" will select without a timeout, hogging a thread + # - setting a positive timeout will do a bounded select call, + # enabling sharing of a single thread between multiple selectors + # (in this case you will have to use a different configuration for the + # selector-dispatcher, e.g. using "type=Dispatcher" with size 1) + # - setting it to zero means polling, i.e. calling selectNow() + select-timeout = infinite + + # When trying to assign a new connection to a selector and the chosen + # selector is at full capacity, retry selector choosing and assignment + # this many times before giving up + selector-association-retries = 10 + + # The maximum number of datagrams that are read in one go, + # higher numbers decrease latency, lower numbers increase fairness on + # the worker-dispatcher + receive-throughput = 3 + + # The number of bytes per direct buffer in the pool used to read or write + # network data from the kernel. + direct-buffer-size = 128 KiB + + # The maximal number of direct buffers kept in the direct buffer pool for + # reuse. + direct-buffer-pool-limit = 1000 + + # Enable fine grained logging of what goes on inside the implementation. + # Be aware that this may log more than once per message sent to the actors + # of the tcp implementation. + trace-logging = off + + # Fully qualified config path which holds the dispatcher configuration + # to be used for running the select() calls in the selectors + selector-dispatcher = "akka.io.pinned-dispatcher" + + # Fully qualified config path which holds the dispatcher configuration + # for the read/write worker actors + worker-dispatcher = "akka.actor.internal-dispatcher" + + # Fully qualified config path which holds the dispatcher configuration + # for the selector management actors + management-dispatcher = "akka.actor.internal-dispatcher" + } + + dns { + # Fully qualified config path which holds the dispatcher configuration + # for the manager and resolver router actors. + # For actual router configuration see akka.actor.deployment./IO-DNS/* + dispatcher = "akka.actor.internal-dispatcher" + + # Name of the subconfig at path akka.io.dns, see inet-address below + # + # Change to `async-dns` to use the new "native" DNS resolver, + # which is also capable of resolving SRV records. + resolver = "inet-address" + + # To-be-deprecated DNS resolver implementation which uses the Java InetAddress to resolve DNS records. + # To be replaced by `akka.io.dns.async` which implements the DNS protocol natively and without blocking (which InetAddress does) + inet-address { + # Must implement akka.io.DnsProvider + provider-object = "akka.io.InetAddressDnsProvider" + + # To set the time to cache name resolutions + # Possible values: + # default: sun.net.InetAddressCachePolicy.get() and getNegative() + # forever: cache forever + # never: no caching + # n [time unit]: positive timeout with unit, for example 30s + positive-ttl = default + negative-ttl = default + + # How often to sweep out expired cache entries. + # Note that this interval has nothing to do with TTLs + cache-cleanup-interval = 120s + } + + async-dns { + provider-object = "akka.io.dns.internal.AsyncDnsProvider" + + # Set upper bound for caching successfully resolved dns entries + # if the DNS record has a smaller TTL value than the setting that + # will be used. Default is to use the record TTL with no cap. + # Possible values: + # forever: always use the minimum TTL from the found records + # never: never cache + # n [time unit] = cap the caching to this value + positive-ttl = forever + + # Set how long the fact that a DNS record could not be found is + # cached. If a new resolution is done while the fact is cached it will + # be failed and not result in an actual DNS resolution. Default is + # to never cache. + # Possible values: + # never: never cache + # forever: cache a missing DNS record forever (you probably will not want to do this) + # n [time unit] = cache for this long + negative-ttl = never + + # Configures nameservers to query during DNS resolution. + # Defaults to the nameservers that would be used by the JVM by default. + # Set to a list of IPs to override the servers, e.g. [ "8.8.8.8", "8.8.4.4" ] for Google's servers + # If multiple are defined then they are tried in order until one responds + nameservers = default + + # The time that a request is allowed to live before being discarded + # given no reply. The lower bound of this should always be the amount + # of time to reasonably expect a DNS server to reply within. + # If multiple name servers are provided then each gets this long to response before trying + # the next one + resolve-timeout = 5s + + # How often to sweep out expired cache entries. + # Note that this interval has nothing to do with TTLs + cache-cleanup-interval = 120s + + # Configures the list of search domains. + # Defaults to a system dependent lookup (on Unix like OSes, will attempt to parse /etc/resolv.conf, on + # other platforms, will not make any attempt to lookup the search domains). Set to a single domain, or + # a list of domains, eg, [ "example.com", "example.net" ]. + search-domains = default + + # Any hosts that have a number of dots less than this will not be looked up directly, instead, a search on + # the search domains will be tried first. This corresponds to the ndots option in /etc/resolv.conf, see + # https://linux.die.net/man/5/resolver for more info. + # Defaults to a system dependent lookup (on Unix like OSes, will attempt to parse /etc/resolv.conf, on + # other platforms, will default to 1). + ndots = default + } + } + } + + + # CoordinatedShutdown is an extension that will perform registered + # tasks in the order that is defined by the phases. It is started + # by calling CoordinatedShutdown(system).run(). This can be triggered + # by different things, for example: + # - JVM shutdown hook will by default run CoordinatedShutdown + # - Cluster node will automatically run CoordinatedShutdown when it + # sees itself as Exiting + # - A management console or other application specific command can + # run CoordinatedShutdown + coordinated-shutdown { + # The timeout that will be used for a phase if not specified with + # 'timeout' in the phase + default-phase-timeout = 5 s + + # Terminate the ActorSystem in the last phase actor-system-terminate. + terminate-actor-system = on + + # Exit the JVM (System.exit(0)) in the last phase actor-system-terminate + # if this is set to 'on'. It is done after termination of the + # ActorSystem if terminate-actor-system=on, otherwise it is done + # immediately when the last phase is reached. + exit-jvm = off + + # Exit status to use on System.exit(int) when 'exit-jvm' is 'on'. + exit-code = 0 + + # Run the coordinated shutdown when the JVM process exits, e.g. + # via kill SIGTERM signal (SIGINT ctrl-c doesn't work). + # This property is related to `akka.jvm-shutdown-hooks` above. + run-by-jvm-shutdown-hook = on + + # Run the coordinated shutdown when ActorSystem.terminate is called. + # Enabling this and disabling terminate-actor-system is not a supported + # combination (will throw ConfigurationException at startup). + run-by-actor-system-terminate = on + + # When Coordinated Shutdown is triggered an instance of `Reason` is + # required. That value can be used to override the default settings. + # Only 'exit-jvm', 'exit-code' and 'terminate-actor-system' may be + # overridden depending on the reason. + reason-overrides { + # Overrides are applied using the `reason.getClass.getName`. + # Overrides the `exit-code` when the `Reason` is a cluster + # Downing or a Cluster Join Unsuccessful event + "akka.actor.CoordinatedShutdown$ClusterDowningReason$" { + exit-code = -1 + } + "akka.actor.CoordinatedShutdown$ClusterJoinUnsuccessfulReason$" { + exit-code = -1 + } + } + + #//#coordinated-shutdown-phases + # CoordinatedShutdown is enabled by default and will run the tasks that + # are added to these phases by individual Akka modules and user logic. + # + # The phases are ordered as a DAG by defining the dependencies between the phases + # to make sure shutdown tasks are run in the right order. + # + # In general user tasks belong in the first few phases, but there may be use + # cases where you would want to hook in new phases or register tasks later in + # the DAG. + # + # Each phase is defined as a named config section with the + # following optional properties: + # - timeout=15s: Override the default-phase-timeout for this phase. + # - recover=off: If the phase fails the shutdown is aborted + # and depending phases will not be executed. + # - enabled=off: Skip all tasks registered in this phase. DO NOT use + # this to disable phases unless you are absolutely sure what the + # consequences are. Many of the built in tasks depend on other tasks + # having been executed in earlier phases and may break if those are disabled. + # depends-on=[]: Run the phase after the given phases + phases { + + # The first pre-defined phase that applications can add tasks to. + # Note that more phases can be added in the application's + # configuration by overriding this phase with an additional + # depends-on. + before-service-unbind { + } + + # Stop accepting new incoming connections. + # This is where you can register tasks that makes a server stop accepting new connections. Already + # established connections should be allowed to continue and complete if possible. + service-unbind { + depends-on = [before-service-unbind] + } + + # Wait for requests that are in progress to be completed. + # This is where you register tasks that will wait for already established connections to complete, potentially + # also first telling them that it is time to close down. + service-requests-done { + depends-on = [service-unbind] + } + + # Final shutdown of service endpoints. + # This is where you would add tasks that forcefully kill connections that are still around. + service-stop { + depends-on = [service-requests-done] + } + + # Phase for custom application tasks that are to be run + # after service shutdown and before cluster shutdown. + before-cluster-shutdown { + depends-on = [service-stop] + } + + # Graceful shutdown of the Cluster Sharding regions. + # This phase is not meant for users to add tasks to. + cluster-sharding-shutdown-region { + timeout = 10 s + depends-on = [before-cluster-shutdown] + } + + # Emit the leave command for the node that is shutting down. + # This phase is not meant for users to add tasks to. + cluster-leave { + depends-on = [cluster-sharding-shutdown-region] + } + + # Shutdown cluster singletons + # This is done as late as possible to allow the shard region shutdown triggered in + # the "cluster-sharding-shutdown-region" phase to complete before the shard coordinator is shut down. + # This phase is not meant for users to add tasks to. + cluster-exiting { + timeout = 10 s + depends-on = [cluster-leave] + } + + # Wait until exiting has been completed + # This phase is not meant for users to add tasks to. + cluster-exiting-done { + depends-on = [cluster-exiting] + } + + # Shutdown the cluster extension + # This phase is not meant for users to add tasks to. + cluster-shutdown { + depends-on = [cluster-exiting-done] + } + + # Phase for custom application tasks that are to be run + # after cluster shutdown and before ActorSystem termination. + before-actor-system-terminate { + depends-on = [cluster-shutdown] + } + + # Last phase. See terminate-actor-system and exit-jvm above. + # Don't add phases that depends on this phase because the + # dispatcher and scheduler of the ActorSystem have been shutdown. + # This phase is not meant for users to add tasks to. + actor-system-terminate { + timeout = 10 s + depends-on = [before-actor-system-terminate] + } + } + #//#coordinated-shutdown-phases + } + + #//#circuit-breaker-default + # Configuration for circuit breakers created with the APIs accepting an id to + # identify or look up the circuit breaker. + # Note: Circuit breakers created without ids are not affected by this configuration. + # A child configuration section with the same name as the circuit breaker identifier + # will be used, with fallback to the `akka.circuit-breaker.default` section. + circuit-breaker { + + # Default configuration that is used if a configuration section + # with the circuit breaker identifier is not defined. + default { + # Number of failures before opening the circuit. + max-failures = 10 + + # Duration of time after which to consider a call a failure. + call-timeout = 10s + + # Duration of time in open state after which to attempt to close + # the circuit, by first entering the half-open state. + reset-timeout = 15s + + # The upper bound of reset-timeout + max-reset-timeout = 36500d + + # Exponential backoff + # For details see https://en.wikipedia.org/wiki/Exponential_backoff + exponential-backoff = 1.0 + + # Additional random delay based on this factor is added to backoff + # For example 0.2 adds up to 20% delay + # In order to skip this additional delay set as 0 + random-factor = 0.0 + + # A allowlist of fqcn of Exceptions that the CircuitBreaker + # should not consider failures. By default all exceptions are + # considered failures. + exception-allowlist = [] + } + } + #//#circuit-breaker-default + +} diff --git a/akka/repackaged-akka-jar/src/main/resources/actor_typed_reference.conf b/akka/repackaged-akka-jar/src/main/resources/actor_typed_reference.conf new file mode 100644 index 0000000000..d34d52aeef --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/actor_typed_reference.conf @@ -0,0 +1,129 @@ +akka.actor.typed { + + # List FQCN of `akka.actor.typed.ExtensionId`s which shall be loaded at actor system startup. + # Should be on the format: 'extensions = ["com.example.MyExtId1", "com.example.MyExtId2"]' etc. + # See the Akka Documentation for more info about Extensions + extensions = [] + + # List FQCN of extensions which shall be loaded at actor system startup. + # Library extensions are regular extensions that are loaded at startup and are + # available for third party library authors to enable auto-loading of extensions when + # present on the classpath. This is done by appending entries: + # 'library-extensions += "Extension"' in the library `reference.conf`. + # + # Should not be set by end user applications in 'application.conf', use the extensions property for that + # + library-extensions = ${?akka.actor.typed.library-extensions} [] + + # Receptionist is started eagerly to allow clustered receptionist to gather remote registrations early on. + library-extensions += "akka.actor.typed.receptionist.Receptionist$" + + # While an actor is restarted (waiting for backoff to expire and children to stop) + # incoming messages and signals are stashed, and delivered later to the newly restarted + # behavior. This property defines the capacity in number of messages of the stash + # buffer. If the capacity is exceed then additional incoming messages are dropped. + restart-stash-capacity = 1000 + + # Typed mailbox defaults to the single consumber mailbox as balancing dispatcher is not supported + default-mailbox { + mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + } +} + +# Load typed extensions by a classic extension. +akka.library-extensions += "akka.actor.typed.internal.adapter.ActorSystemAdapter$LoadTypedExtensions" + +akka.actor { + serializers { + typed-misc = "akka.actor.typed.internal.MiscMessageSerializer" + service-key = "akka.actor.typed.internal.receptionist.ServiceKeySerializer" + } + + serialization-identifiers { + "akka.actor.typed.internal.MiscMessageSerializer" = 24 + "akka.actor.typed.internal.receptionist.ServiceKeySerializer" = 26 + } + + serialization-bindings { + "akka.actor.typed.ActorRef" = typed-misc + "akka.actor.typed.internal.adapter.ActorRefAdapter" = typed-misc + "akka.actor.typed.internal.receptionist.DefaultServiceKey" = service-key + } +} + +# When using Akka Typed (having akka-actor-typed in classpath) the +# akka.event.slf4j.Slf4jLogger is enabled instead of the DefaultLogger +# even though it has not been explicitly defined in `akka.loggers` +# configuration. +# +# Slf4jLogger will be used for all Akka classic logging via eventStream, +# including logging from Akka internals. The Slf4jLogger is then using +# an ordinary org.slf4j.Logger to emit the log events. +# +# The Slf4jLoggingFilter is also enabled automatically. +# +# This behavior can be disabled by setting this property to `off`. +akka.use-slf4j = on + +akka.reliable-delivery { + producer-controller { + + # To avoid head of line blocking from serialization and transfer + # of large messages this can be enabled. + # Large messages are chunked into pieces of the given size in bytes. The + # chunked messages are sent separatetely and assembled on the consumer side. + # Serialization and deserialization is performed by the ProducerController and + # ConsumerController respectively instead of in the remote transport layer. + chunk-large-messages = off + + durable-queue { + # The ProducerController uses this timeout for the requests to + # the durable queue. If there is no reply within the timeout it + # will be retried. + request-timeout = 3s + + # The ProducerController retries requests to the durable queue this + # number of times before failing. + retry-attempts = 10 + + # The ProducerController retries sending the first message with this interval + # until it has been confirmed. + resend-first-interval = 1s + } + } + + consumer-controller { + # Number of messages in flight between ProducerController and + # ConsumerController. The ConsumerController requests for more messages + # when half of the window has been used. + flow-control-window = 50 + + # The ConsumerController resends flow control messages to the + # ProducerController with the resend-interval-min, and increasing + # it gradually to resend-interval-max when idle. + resend-interval-min = 2s + resend-interval-max = 30s + + # If this is enabled lost messages will not be resent, but flow control is used. + # This can be more efficient since messages don't have to be + # kept in memory in the `ProducerController` until they have been + # confirmed, but the drawback is that lost messages will not be delivered. + only-flow-control = false + } + + work-pulling { + producer-controller = ${akka.reliable-delivery.producer-controller} + producer-controller { + # Limit of how many messages that can be buffered when there + # is no demand from the consumer side. + buffer-size = 1000 + + # Ask timeout for sending message to worker until receiving Ack from worker + internal-ask-timeout = 60s + + # Chunked messages not implemented for work-pulling yet. Override to not + # propagate property from akka.reliable-delivery.producer-controller. + chunk-large-messages = off + } + } +} diff --git a/akka/repackaged-akka-jar/src/main/resources/cluster_reference.conf b/akka/repackaged-akka-jar/src/main/resources/cluster_reference.conf new file mode 100644 index 0000000000..6fa0e94586 --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/cluster_reference.conf @@ -0,0 +1,504 @@ +###################################### +# Akka Cluster Reference Config File # +###################################### + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + +akka { + + cluster { + # Initial contact points of the cluster. + # The nodes to join automatically at startup. + # Comma separated full URIs defined by a string on the form of + # "akka://system@hostname:port" + # Leave as empty if the node is supposed to be joined manually. + seed-nodes = [] + + # How long to wait for one of the seed nodes to reply to initial join request. + # When this is the first seed node and there is no positive reply from the other + # seed nodes within this timeout it will join itself to bootstrap the cluster. + # When this is not the first seed node the join attempts will be performed with + # this interval. + seed-node-timeout = 5s + + # If a join request fails it will be retried after this period. + # Disable join retry by specifying "off". + retry-unsuccessful-join-after = 10s + + # The joining of given seed nodes will by default be retried indefinitely until + # a successful join. That process can be aborted if unsuccessful by defining this + # timeout. When aborted it will run CoordinatedShutdown, which by default will + # terminate the ActorSystem. CoordinatedShutdown can also be configured to exit + # the JVM. It is useful to define this timeout if the seed-nodes are assembled + # dynamically and a restart with new seed-nodes should be tried after unsuccessful + # attempts. + shutdown-after-unsuccessful-join-seed-nodes = off + + # Time margin after which shards or singletons that belonged to a downed/removed + # partition are created in surviving partition. The purpose of this margin is that + # in case of a network partition the persistent actors in the non-surviving partitions + # must be stopped before corresponding persistent actors are started somewhere else. + # This is useful if you implement downing strategies that handle network partitions, + # e.g. by keeping the larger side of the partition and shutting down the smaller side. + # Disable with "off" or specify a duration to enable. + # + # When using the `akka.cluster.sbr.SplitBrainResolver` as downing provider it will use + # the akka.cluster.split-brain-resolver.stable-after as the default down-removal-margin + # if this down-removal-margin is undefined. + down-removal-margin = off + + # Pluggable support for downing of nodes in the cluster. + # If this setting is left empty the `NoDowning` provider is used and no automatic downing will be performed. + # + # If specified the value must be the fully qualified class name of a subclass of + # `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem` + downing-provider-class = "" + + # Artery only setting + # When a node has been gracefully removed, let this time pass (to allow for example + # cluster singleton handover to complete) and then quarantine the removed node. + quarantine-removed-node-after = 5s + + # If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network + # split. This feature allows the leader to accept 'Joining' members to be 'WeaklyUp' + # so they become part of the cluster even during a network split. The leader will + # move `Joining` members to 'WeaklyUp' after this configured duration without convergence. + # The leader will move 'WeaklyUp' members to 'Up' status once convergence has been reached. + allow-weakly-up-members = 7s + + # The roles of this member. List of strings, e.g. roles = ["A", "B"]. + # The roles are part of the membership information and can be used by + # routers or other services to distribute work to certain member types, + # e.g. front-end and back-end nodes. + # Roles are not allowed to start with "dc-" as that is reserved for the + # special role assigned from the data-center a node belongs to (see the + # multi-data-center section below) + roles = [] + + # Run the coordinated shutdown from phase 'cluster-shutdown' when the cluster + # is shutdown for other reasons than when leaving, e.g. when downing. This + # will terminate the ActorSystem when the cluster extension is shutdown. + run-coordinated-shutdown-when-down = on + + role { + # Minimum required number of members of a certain role before the leader + # changes member status of 'Joining' members to 'Up'. Typically used together + # with 'Cluster.registerOnMemberUp' to defer some action, such as starting + # actors, until the cluster has reached a certain size. + # E.g. to require 2 nodes with role 'frontend' and 3 nodes with role 'backend': + # frontend.min-nr-of-members = 2 + # backend.min-nr-of-members = 3 + #.min-nr-of-members = 1 + } + + # Application version of the deployment. Used by rolling update features + # to distinguish between old and new nodes. The typical convention is to use + # 3 digit version numbers `major.minor.patch`, but 1 or two digits are also + # supported. + # + # If no `.` is used it is interpreted as a single digit version number or as + # plain alphanumeric if it couldn't be parsed as a number. + # + # It may also have a qualifier at the end for 2 or 3 digit version numbers such + # as "1.2-RC1". + # For 1 digit with qualifier, 1-RC1, it is interpreted as plain alphanumeric. + # + # It has support for https://github.com/dwijnand/sbt-dynver format with `+` or + # `-` separator. The number of commits from the tag is handled as a numeric part. + # For example `1.0.0+3-73475dce26` is less than `1.0.10+10-ed316bd024` (3 < 10). + app-version = "0.0.0" + + # Minimum required number of members before the leader changes member status + # of 'Joining' members to 'Up'. Typically used together with + # 'Cluster.registerOnMemberUp' to defer some action, such as starting actors, + # until the cluster has reached a certain size. + min-nr-of-members = 1 + + # Enable/disable info level logging of cluster events. + # These are logged with logger name `akka.cluster.Cluster`. + log-info = on + + # Enable/disable verbose info-level logging of cluster events + # for temporary troubleshooting. Defaults to 'off'. + # These are logged with logger name `akka.cluster.Cluster`. + log-info-verbose = off + + # Enable or disable JMX MBeans for management of the cluster + jmx.enabled = on + + # Enable or disable multiple JMX MBeans in the same JVM + # If this is disabled, the MBean Object name is "akka:type=Cluster" + # If this is enabled, them MBean Object names become "akka:type=Cluster,port=$clusterPortNumber" + jmx.multi-mbeans-in-same-jvm = off + + # how long should the node wait before starting the periodic tasks + # maintenance tasks? + periodic-tasks-initial-delay = 1s + + # how often should the node send out gossip information? + gossip-interval = 1s + + # discard incoming gossip messages if not handled within this duration + gossip-time-to-live = 2s + + # how often should the leader perform maintenance tasks? + leader-actions-interval = 1s + + # how often should the node move nodes, marked as unreachable by the failure + # detector, out of the membership ring? + unreachable-nodes-reaper-interval = 1s + + # How often the current internal stats should be published. + # A value of 0s can be used to always publish the stats, when it happens. + # Disable with "off". + publish-stats-interval = off + + # The id of the dispatcher to use for cluster actors. + # If specified you need to define the settings of the actual dispatcher. + use-dispatcher = "akka.actor.internal-dispatcher" + + # Gossip to random node with newer or older state information, if any with + # this probability. Otherwise Gossip to any random live node. + # Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always. + gossip-different-view-probability = 0.8 + + # Reduced the above probability when the number of nodes in the cluster + # greater than this value. + reduce-gossip-different-view-probability = 400 + + # When a node is removed the removal is marked with a tombstone + # which is kept at least this long, after which it is pruned, if there is a partition + # longer than this it could lead to removed nodes being re-added to the cluster + prune-gossip-tombstones-after = 24h + + # Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf + # [Hayashibara et al]) used by the cluster subsystem to detect unreachable + # members. + # The default PhiAccrualFailureDetector will trigger if there are no heartbeats within + # the duration heartbeat-interval + acceptable-heartbeat-pause + threshold_adjustment, + # i.e. around 5.5 seconds with default settings. + failure-detector { + + # FQCN of the failure detector implementation. + # It must implement akka.remote.FailureDetector and have + # a public constructor with a com.typesafe.config.Config and + # akka.actor.EventStream parameter. + implementation-class = "akka.remote.PhiAccrualFailureDetector" + + # How often keep-alive heartbeat messages should be sent to each connection. + heartbeat-interval = 1 s + + # Defines the failure detector threshold. + # A low threshold is prone to generate many wrong suspicions but ensures + # a quick detection in the event of a real crash. Conversely, a high + # threshold generates fewer mistakes but needs more time to detect + # actual crashes. + threshold = 8.0 + + # Number of the samples of inter-heartbeat arrival times to adaptively + # calculate the failure timeout for connections. + max-sample-size = 1000 + + # Minimum standard deviation to use for the normal distribution in + # AccrualFailureDetector. Too low standard deviation might result in + # too much sensitivity for sudden, but normal, deviations in heartbeat + # inter arrival times. + min-std-deviation = 100 ms + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # This margin is important to be able to survive sudden, occasional, + # pauses in heartbeat arrivals, due to for example garbage collect or + # network drop. + acceptable-heartbeat-pause = 3 s + + # Number of member nodes that each member will send heartbeat messages to, + # i.e. each node will be monitored by this number of other nodes. + monitored-by-nr-of-members = 9 + + # After the heartbeat request has been sent the first failure detection + # will start after this period, even though no heartbeat message has + # been received. + expected-response-after = 1 s + + } + + # Configures multi-dc specific heartbeating and other mechanisms, + # many of them have a direct counter-part in "one datacenter mode", + # in which case these settings would not be used at all - they only apply, + # if your cluster nodes are configured with at-least 2 different `akka.cluster.data-center` values. + multi-data-center { + + # Defines which data center this node belongs to. It is typically used to make islands of the + # cluster that are colocated. This can be used to make the cluster aware that it is running + # across multiple availability zones or regions. It can also be used for other logical + # grouping of nodes. + self-data-center = "default" + + + # Try to limit the number of connections between data centers. Used for gossip and heartbeating. + # This will not limit connections created for the messaging of the application. + # If the cluster does not span multiple data centers, this value has no effect. + cross-data-center-connections = 5 + + # The n oldest nodes in a data center will choose to gossip to another data center with + # this probability. Must be a value between 0.0 and 1.0 where 0.0 means never, 1.0 means always. + # When a data center is first started (nodes < 5) a higher probability is used so other data + # centers find out about the new nodes more quickly + cross-data-center-gossip-probability = 0.2 + + failure-detector { + # FQCN of the failure detector implementation. + # It must implement akka.remote.FailureDetector and have + # a public constructor with a com.typesafe.config.Config and + # akka.actor.EventStream parameter. + implementation-class = "akka.remote.DeadlineFailureDetector" + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # This margin is important to be able to survive sudden, occasional, + # pauses in heartbeat arrivals, due to for example garbage collect or + # network drop. + acceptable-heartbeat-pause = 10 s + + # How often keep-alive heartbeat messages should be sent to each connection. + heartbeat-interval = 3 s + + # After the heartbeat request has been sent the first failure detection + # will start after this period, even though no heartbeat message has + # been received. + expected-response-after = 1 s + } + } + + # If the tick-duration of the default scheduler is longer than the + # tick-duration configured here a dedicated scheduler will be used for + # periodic tasks of the cluster, otherwise the default scheduler is used. + # See akka.scheduler settings for more details. + scheduler { + tick-duration = 33ms + ticks-per-wheel = 512 + } + + debug { + # Log heartbeat events (very verbose, useful mostly when debugging heartbeating issues). + # These are logged with logger name `akka.cluster.ClusterHeartbeat`. + verbose-heartbeat-logging = off + + # log verbose details about gossip + verbose-gossip-logging = off + } + + configuration-compatibility-check { + + # Enforce configuration compatibility checks when joining a cluster. + # Set to off to allow joining nodes to join a cluster even when configuration incompatibilities are detected or + # when the cluster does not support this feature. Compatibility checks are always performed and warning and + # error messages are logged. + # + # This is particularly useful for rolling updates on clusters that do not support that feature. Since the old + # cluster won't be able to send the compatibility confirmation to the joining node, the joining node won't be able + # to 'know' if its allowed to join. + enforce-on-join = on + + # Add named entry to this section with fully qualified class name of the JoinConfigCompatChecker + # to enable. + # Checkers defined in reference.conf can be disabled by application by using empty string value + # for the named entry. + checkers { + akka-cluster = "akka.cluster.JoinConfigCompatCheckCluster" + } + + # Some configuration properties might not be appropriate to transfer between nodes + # and such properties can be excluded from the configuration compatibility check by adding + # the paths of the properties to this list. Sensitive paths are grouped by key. Modules and third-party libraries + # can define their own set of sensitive paths without clashing with each other (as long they use unique keys). + # + # All properties starting with the paths defined here are excluded, i.e. you can add the path of a whole + # section here to skip everything inside that section. + sensitive-config-paths { + akka = [ + "user.home", "user.name", "user.dir", + "socksNonProxyHosts", "http.nonProxyHosts", "ftp.nonProxyHosts", + "akka.remote.secure-cookie", + "akka.remote.classic.netty.ssl.security", + # Pre 2.6 path, keep around to avoid sending things misconfigured with old paths + "akka.remote.netty.ssl.security", + "akka.remote.artery.ssl" + ] + } + + } + } + + actor.deployment.default.cluster { + # enable cluster aware router that deploys to nodes in the cluster + enabled = off + + # Maximum number of routees that will be deployed on each cluster + # member node. + # Note that max-total-nr-of-instances defines total number of routees, but + # number of routees per node will not be exceeded, i.e. if you + # define max-total-nr-of-instances = 50 and max-nr-of-instances-per-node = 2 + # it will deploy 2 routees per new member in the cluster, up to + # 25 members. + max-nr-of-instances-per-node = 1 + + # Maximum number of routees that will be deployed, in total + # on all nodes. See also description of max-nr-of-instances-per-node. + # For backwards compatibility reasons, nr-of-instances + # has the same purpose as max-total-nr-of-instances for cluster + # aware routers and nr-of-instances (if defined by user) takes + # precedence over max-total-nr-of-instances. + max-total-nr-of-instances = 10000 + + # Defines if routees are allowed to be located on the same node as + # the head router actor, or only on remote nodes. + # Useful for master-worker scenario where all routees are remote. + allow-local-routees = on + + # Use members with all specified roles, or all members if undefined or empty. + use-roles = [] + + # Deprecated, since Akka 2.5.4, replaced by use-roles + # Use members with specified role, or all members if undefined or empty. + use-role = "" + } + + # Protobuf serializer for cluster messages + actor { + serializers { + akka-cluster = "akka.cluster.protobuf.ClusterMessageSerializer" + } + + serialization-bindings { + "akka.cluster.ClusterMessage" = akka-cluster + "akka.cluster.routing.ClusterRouterPool" = akka-cluster + } + + serialization-identifiers { + "akka.cluster.protobuf.ClusterMessageSerializer" = 5 + } + + } + +} + +#//#split-brain-resolver + +# To enable the split brain resolver you first need to enable the provider in your application.conf: +# akka.cluster.downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + +akka.cluster.split-brain-resolver { + # Select one of the available strategies (see descriptions below): + # static-quorum, keep-majority, keep-oldest, down-all, lease-majority + active-strategy = keep-majority + + #//#stable-after + # Time margin after which shards or singletons that belonged to a downed/removed + # partition are created in surviving partition. The purpose of this margin is that + # in case of a network partition the persistent actors in the non-surviving partitions + # must be stopped before corresponding persistent actors are started somewhere else. + # This is useful if you implement downing strategies that handle network partitions, + # e.g. by keeping the larger side of the partition and shutting down the smaller side. + # Decision is taken by the strategy when there has been no membership or + # reachability changes for this duration, i.e. the cluster state is stable. + stable-after = 20s + #//#stable-after + + # When reachability observations by the failure detector are changed the SBR decisions + # are deferred until there are no changes within the 'stable-after' duration. + # If this continues for too long it might be an indication of an unstable system/network + # and it could result in delayed or conflicting decisions on separate sides of a network + # partition. + # As a precaution for that scenario all nodes are downed if no decision is made within + # `stable-after + down-all-when-unstable` from the first unreachability event. + # The measurement is reset if all unreachable have been healed, downed or removed, or + # if there are no changes within `stable-after * 2`. + # The value can be on, off, or a duration. + # By default it is 'on' and then it is derived to be 3/4 of stable-after, but not less than + # 4 seconds. + down-all-when-unstable = on + +} +#//#split-brain-resolver + +# Down the unreachable nodes if the number of remaining nodes are greater than or equal to +# the given 'quorum-size'. Otherwise down the reachable nodes, i.e. it will shut down that +# side of the partition. In other words, the 'size' defines the minimum number of nodes +# that the cluster must have to be operational. If there are unreachable nodes when starting +# up the cluster, before reaching this limit, the cluster may shutdown itself immediately. +# This is not an issue if you start all nodes at approximately the same time. +# +# Note that you must not add more members to the cluster than 'quorum-size * 2 - 1', because +# then both sides may down each other and thereby form two separate clusters. For example, +# quorum-size configured to 3 in a 6 node cluster may result in a split where each side +# consists of 3 nodes each, i.e. each side thinks it has enough nodes to continue by +# itself. A warning is logged if this recommendation is violated. +#//#static-quorum +akka.cluster.split-brain-resolver.static-quorum { + # minimum number of nodes that the cluster must have + quorum-size = undefined + + # if the 'role' is defined the decision is based only on members with that 'role' + role = "" +} +#//#static-quorum + +# Down the unreachable nodes if the current node is in the majority part based the last known +# membership information. Otherwise down the reachable nodes, i.e. the own part. If the +# the parts are of equal size the part containing the node with the lowest address is kept. +# Note that if there are more than two partitions and none is in majority each part +# will shutdown itself, terminating the whole cluster. +#//#keep-majority +akka.cluster.split-brain-resolver.keep-majority { + # if the 'role' is defined the decision is based only on members with that 'role' + role = "" +} +#//#keep-majority + +# Down the part that does not contain the oldest member (current singleton). +# +# There is one exception to this rule if 'down-if-alone' is defined to 'on'. +# Then, if the oldest node has partitioned from all other nodes the oldest +# will down itself and keep all other nodes running. The strategy will not +# down the single oldest node when it is the only remaining node in the cluster. +# +# Note that if the oldest node crashes the others will remove it from the cluster +# when 'down-if-alone' is 'on', otherwise they will down themselves if the +# oldest node crashes, i.e. shutdown the whole cluster together with the oldest node. +#//#keep-oldest +akka.cluster.split-brain-resolver.keep-oldest { + # Enable downing of the oldest node when it is partitioned from all other nodes + down-if-alone = on + + # if the 'role' is defined the decision is based only on members with that 'role', + # i.e. using the oldest member (singleton) within the nodes with that role + role = "" +} +#//#keep-oldest + +# Keep the part that can acquire the lease, and down the other part. +# Best effort is to keep the side that has most nodes, i.e. the majority side. +# This is achieved by adding a delay before trying to acquire the lease on the +# minority side. +#//#lease-majority +akka.cluster.split-brain-resolver.lease-majority { + lease-implementation = "" + + # The recommended format for the lease name is "-akka-sbr". + # When lease-name is not defined, the name will be set to "-akka-sbr" + lease-name = "" + + # This delay is used on the minority side before trying to acquire the lease, + # as an best effort to try to keep the majority side. + acquire-lease-delay-for-minority = 2s + + # Release the lease after this duration. + release-after = 40s + + # If the 'role' is defined the majority/minority is based only on members with that 'role'. + role = "" +} +#//#lease-majority diff --git a/akka/repackaged-akka-jar/src/main/resources/cluster_tools_reference.conf b/akka/repackaged-akka-jar/src/main/resources/cluster_tools_reference.conf new file mode 100644 index 0000000000..783326f185 --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/cluster_tools_reference.conf @@ -0,0 +1,231 @@ +############################################ +# Akka Cluster Tools Reference Config File # +############################################ + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + +# //#pub-sub-ext-config +# Settings for the DistributedPubSub extension +akka.cluster.pub-sub { + # Actor name of the mediator actor, /system/distributedPubSubMediator + name = distributedPubSubMediator + + # Start the mediator on members tagged with this role. + # All members are used if undefined or empty. + role = "" + + # The routing logic to use for 'Send' + # Possible values: random, round-robin, broadcast + routing-logic = random + + # How often the DistributedPubSubMediator should send out gossip information + gossip-interval = 1s + + # Removed entries are pruned after this duration + removed-time-to-live = 120s + + # Maximum number of elements to transfer in one message when synchronizing the registries. + # Next chunk will be transferred in next round of gossip. + max-delta-elements = 3000 + + # When a message is published to a topic with no subscribers send it to the dead letters. + send-to-dead-letters-when-no-subscribers = on + + # The id of the dispatcher to use for DistributedPubSubMediator actors. + # If specified you need to define the settings of the actual dispatcher. + use-dispatcher = "akka.actor.internal-dispatcher" +} +# //#pub-sub-ext-config + +# Protobuf serializer for cluster DistributedPubSubMeditor messages +akka.actor { + serializers { + akka-pubsub = "akka.cluster.pubsub.protobuf.DistributedPubSubMessageSerializer" + } + serialization-bindings { + "akka.cluster.pubsub.DistributedPubSubMessage" = akka-pubsub + "akka.cluster.pubsub.DistributedPubSubMediator$Internal$SendToOneSubscriber" = akka-pubsub + } + serialization-identifiers { + "akka.cluster.pubsub.protobuf.DistributedPubSubMessageSerializer" = 9 + } +} + + +# //#receptionist-ext-config +# Settings for the ClusterClientReceptionist extension +akka.cluster.client.receptionist { + # Actor name of the ClusterReceptionist actor, /system/receptionist + name = receptionist + + # Start the receptionist on members tagged with this role. + # All members are used if undefined or empty. + role = "" + + # The receptionist will send this number of contact points to the client + number-of-contacts = 3 + + # The actor that tunnel response messages to the client will be stopped + # after this time of inactivity. + response-tunnel-receive-timeout = 30s + + # The id of the dispatcher to use for ClusterReceptionist actors. + # If specified you need to define the settings of the actual dispatcher. + use-dispatcher = "akka.actor.internal-dispatcher" + + # How often failure detection heartbeat messages should be received for + # each ClusterClient + heartbeat-interval = 2s + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # The ClusterReceptionist is using the akka.remote.DeadlineFailureDetector, which + # will trigger if there are no heartbeats within the duration + # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with + # the default settings. + acceptable-heartbeat-pause = 13s + + # Failure detection checking interval for checking all ClusterClients + failure-detection-interval = 2s +} +# //#receptionist-ext-config + +# //#cluster-client-config +# Settings for the ClusterClient +akka.cluster.client { + # Actor paths of the ClusterReceptionist actors on the servers (cluster nodes) + # that the client will try to contact initially. It is mandatory to specify + # at least one initial contact. + # Comma separated full actor paths defined by a string on the form of + # "akka://system@hostname:port/system/receptionist" + initial-contacts = [] + + # Interval at which the client retries to establish contact with one of + # ClusterReceptionist on the servers (cluster nodes) + establishing-get-contacts-interval = 3s + + # Interval at which the client will ask the ClusterReceptionist for + # new contact points to be used for next reconnect. + refresh-contacts-interval = 60s + + # How often failure detection heartbeat messages should be sent + heartbeat-interval = 2s + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # The ClusterClient is using the akka.remote.DeadlineFailureDetector, which + # will trigger if there are no heartbeats within the duration + # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with + # the default settings. + acceptable-heartbeat-pause = 13s + + # If connection to the receptionist is not established the client will buffer + # this number of messages and deliver them the connection is established. + # When the buffer is full old messages will be dropped when new messages are sent + # via the client. Use 0 to disable buffering, i.e. messages will be dropped + # immediately if the location of the singleton is unknown. + # Maximum allowed buffer size is 10000. + buffer-size = 1000 + + # If connection to the receiptionist is lost and the client has not been + # able to acquire a new connection for this long the client will stop itself. + # This duration makes it possible to watch the cluster client and react on a more permanent + # loss of connection with the cluster, for example by accessing some kind of + # service registry for an updated set of initial contacts to start a new cluster client with. + # If this is not wanted it can be set to "off" to disable the timeout and retry + # forever. + reconnect-timeout = off +} +# //#cluster-client-config + +# Protobuf serializer for ClusterClient messages +akka.actor { + serializers { + akka-cluster-client = "akka.cluster.client.protobuf.ClusterClientMessageSerializer" + } + serialization-bindings { + "akka.cluster.client.ClusterClientMessage" = akka-cluster-client + } + serialization-identifiers { + "akka.cluster.client.protobuf.ClusterClientMessageSerializer" = 15 + } +} + +# //#singleton-config +akka.cluster.singleton { + # The actor name of the child singleton actor. + singleton-name = "singleton" + + # Singleton among the nodes tagged with specified role. + # If the role is not specified it's a singleton among all nodes in the cluster. + role = "" + + # When a node is becoming oldest it sends hand-over request to previous oldest, + # that might be leaving the cluster. This is retried with this interval until + # the previous oldest confirms that the hand over has started or the previous + # oldest member is removed from the cluster (+ akka.cluster.down-removal-margin). + hand-over-retry-interval = 1s + + # The number of retries are derived from hand-over-retry-interval and + # akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin), + # but it will never be less than this property. + # After the hand over retries and it's still not able to exchange the hand over messages + # with the previous oldest it will restart itself by throwing ClusterSingletonManagerIsStuck, + # to start from a clean state. After that it will still not start the singleton instance + # until the previous oldest node has been removed from the cluster. + # On the other side, on the previous oldest node, the same number of retries - 3 are used + # and after that the singleton instance is stopped. + # For large clusters it might be necessary to increase this to avoid too early timeouts while + # gossip dissemination of the Leaving to Exiting phase occurs. For normal leaving scenarios + # it will not be a quicker hand over by reducing this value, but in extreme failure scenarios + # the recovery might be faster. + min-number-of-hand-over-retries = 15 + + # Config path of the lease to be taken before creating the singleton actor + # if the lease is lost then the actor is restarted and it will need to re-acquire the lease + # the default is no lease + use-lease = "" + + # The interval between retries for acquiring the lease + lease-retry-interval = 5s +} +# //#singleton-config + +# //#singleton-proxy-config +akka.cluster.singleton-proxy { + # The actor name of the singleton actor that is started by the ClusterSingletonManager + singleton-name = ${akka.cluster.singleton.singleton-name} + + # The role of the cluster nodes where the singleton can be deployed. + # Corresponding to the role used by the `ClusterSingletonManager`. If the role is not + # specified it's a singleton among all nodes in the cluster, and the `ClusterSingletonManager` + # must then also be configured in same way. + role = "" + + # Interval at which the proxy will try to resolve the singleton instance. + singleton-identification-interval = 1s + + # If the location of the singleton is unknown the proxy will buffer this + # number of messages and deliver them when the singleton is identified. + # When the buffer is full old messages will be dropped when new messages are + # sent via the proxy. + # Use 0 to disable buffering, i.e. messages will be dropped immediately if + # the location of the singleton is unknown. + # Maximum allowed buffer size is 10000. + buffer-size = 1000 +} +# //#singleton-proxy-config + +# Serializer for cluster ClusterSingleton messages +akka.actor { + serializers { + akka-singleton = "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer" + } + serialization-bindings { + "akka.cluster.singleton.ClusterSingletonMessage" = akka-singleton + } + serialization-identifiers { + "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer" = 14 + } +} diff --git a/akka/repackaged-akka-jar/src/main/resources/cluster_typed_reference.conf b/akka/repackaged-akka-jar/src/main/resources/cluster_typed_reference.conf new file mode 100644 index 0000000000..4cd45a5d24 --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/cluster_typed_reference.conf @@ -0,0 +1,66 @@ +############################################ +# Akka Cluster Typed Reference Config File # +############################################ + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + +akka.cluster.typed.receptionist { + # Updates with Distributed Data are done with this consistency level. + # Possible values: local, majority, all, 2, 3, 4 (n) + write-consistency = local + + # Period task to remove actor references that are hosted by removed nodes, + # in case of abrupt termination. + pruning-interval = 3 s + + # The periodic task to remove actor references that are hosted by removed nodes + # will only remove entries older than this duration. The reason for this + # is to avoid removing entries of nodes that haven't been visible as joining. + prune-removed-older-than = 60 s + + # Shard the services over this many Distributed Data keys, with large amounts of different + # service keys storing all of them in the same Distributed Data entry would lead to large updates + # etc. instead the keys are sharded across this number of keys. This must be the same on all nodes + # in a cluster, changing it requires a full cluster restart (stopping all nodes before starting them again) + distributed-key-count = 5 + + # Settings for the Distributed Data replicator used by Receptionist. + # Same layout as akka.cluster.distributed-data. + distributed-data = ${akka.cluster.distributed-data} + # make sure that by default it's for all roles (Play loads config in different way) + distributed-data.role = "" +} + +akka.cluster.ddata.typed { + # The timeout to use for ask operations in ReplicatorMessageAdapter. + # This should be longer than the timeout given in Replicator.WriteConsistency and + # Replicator.ReadConsistency. The replicator will always send a reply within those + # timeouts so the unexpected ask timeout should not occur, but for cleanup in a + # failure situation it must still exist. + # If askUpdate, askGet or askDelete takes longer then this timeout a + # java.util.concurrent.TimeoutException will be thrown by the requesting actor and + # may be handled by supervision. + replicator-message-adapter-unexpected-ask-timeout = 20 s +} + +akka { + actor { + serialization-identifiers { + "akka.cluster.typed.internal.AkkaClusterTypedSerializer" = 28 + "akka.cluster.typed.internal.delivery.ReliableDeliverySerializer" = 36 + } + serializers { + typed-cluster = "akka.cluster.typed.internal.AkkaClusterTypedSerializer" + reliable-delivery = "akka.cluster.typed.internal.delivery.ReliableDeliverySerializer" + } + serialization-bindings { + "akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster + "akka.actor.typed.internal.pubsub.TopicImpl$MessagePublished" = typed-cluster + "akka.actor.typed.delivery.internal.DeliverySerializable" = reliable-delivery + } + } + cluster.configuration-compatibility-check.checkers { + receptionist = "akka.cluster.typed.internal.receptionist.ClusterReceptionistConfigCompatChecker" + } +} diff --git a/akka/repackaged-akka-jar/src/main/resources/distributed_data_reference.conf b/akka/repackaged-akka-jar/src/main/resources/distributed_data_reference.conf new file mode 100644 index 0000000000..f716157bd5 --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/distributed_data_reference.conf @@ -0,0 +1,159 @@ +############################################## +# Akka Distributed DataReference Config File # +############################################## + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + + +#//#distributed-data +# Settings for the DistributedData extension +akka.cluster.distributed-data { + # Actor name of the Replicator actor, /system/ddataReplicator + name = ddataReplicator + + # Replicas are running on members tagged with this role. + # All members are used if undefined or empty. + role = "" + + # How often the Replicator should send out gossip information + gossip-interval = 2 s + + # How often the subscribers will be notified of changes, if any + notify-subscribers-interval = 500 ms + + # Logging of data with payload size in bytes larger than + # this value. Maximum detected size per key is logged once, + # with an increase threshold of 10%. + # It can be disabled by setting the property to off. + log-data-size-exceeding = 10 KiB + + # Maximum number of entries to transfer in one round of gossip exchange when + # synchronizing the replicas. Next chunk will be transferred in next round of gossip. + # The actual number of data entries in each Gossip message is dynamically + # adjusted to not exceed the maximum remote message size (maximum-frame-size). + max-delta-elements = 500 + + # The id of the dispatcher to use for Replicator actors. + # If specified you need to define the settings of the actual dispatcher. + use-dispatcher = "akka.actor.internal-dispatcher" + + # How often the Replicator checks for pruning of data associated with + # removed cluster nodes. If this is set to 'off' the pruning feature will + # be completely disabled. + pruning-interval = 120 s + + # How long time it takes to spread the data to all other replica nodes. + # This is used when initiating and completing the pruning process of data associated + # with removed cluster nodes. The time measurement is stopped when any replica is + # unreachable, but it's still recommended to configure this with certain margin. + # It should be in the magnitude of minutes even though typical dissemination time + # is shorter (grows logarithmic with number of nodes). There is no advantage of + # setting this too low. Setting it to large value will delay the pruning process. + max-pruning-dissemination = 300 s + + # The markers of that pruning has been performed for a removed node are kept for this + # time and thereafter removed. If and old data entry that was never pruned is somehow + # injected and merged with existing data after this time the value will not be correct. + # This would be possible (although unlikely) in the case of a long network partition. + # It should be in the magnitude of hours. For durable data it is configured by + # 'akka.cluster.distributed-data.durable.pruning-marker-time-to-live'. + pruning-marker-time-to-live = 6 h + + # Serialized Write and Read messages are cached when they are sent to + # several nodes. If no further activity they are removed from the cache + # after this duration. + serializer-cache-time-to-live = 10s + + # Update and Get operations are sent to oldest nodes first. + # This is useful together with Cluster Singleton, which is running on oldest nodes. + prefer-oldest = off + + # Settings for delta-CRDT + delta-crdt { + # enable or disable delta-CRDT replication + enabled = on + + # Some complex deltas grow in size for each update and above this + # threshold such deltas are discarded and sent as full state instead. + # This is number of elements or similar size hint, not size in bytes. + max-delta-size = 50 + } + + durable { + # List of keys that are durable. Prefix matching is supported by using * at the + # end of a key. + keys = [] + + # The markers of that pruning has been performed for a removed node are kept for this + # time and thereafter removed. If and old data entry that was never pruned is + # injected and merged with existing data after this time the value will not be correct. + # This would be possible if replica with durable data didn't participate in the pruning + # (e.g. it was shutdown) and later started after this time. A durable replica should not + # be stopped for longer time than this duration and if it is joining again after this + # duration its data should first be manually removed (from the lmdb directory). + # It should be in the magnitude of days. Note that there is a corresponding setting + # for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'. + pruning-marker-time-to-live = 10 d + + # Fully qualified class name of the durable store actor. It must be a subclass + # of akka.actor.Actor and handle the protocol defined in + # akka.cluster.ddata.DurableStore. The class must have a constructor with + # com.typesafe.config.Config parameter. + store-actor-class = akka.cluster.ddata.LmdbDurableStore + + use-dispatcher = akka.cluster.distributed-data.durable.pinned-store + + pinned-store { + executor = thread-pool-executor + type = PinnedDispatcher + } + + # Config for the LmdbDurableStore + lmdb { + # Directory of LMDB file. There are two options: + # 1. A relative or absolute path to a directory that ends with 'ddata' + # the full name of the directory will contain name of the ActorSystem + # and its remote port. + # 2. Otherwise the path is used as is, as a relative or absolute path to + # a directory. + # + # When running in production you may want to configure this to a specific + # path (alt 2), since the default directory contains the remote port of the + # actor system to make the name unique. If using a dynamically assigned + # port (0) it will be different each time and the previously stored data + # will not be loaded. + dir = "ddata" + + # Size in bytes of the memory mapped file. + map-size = 100 MiB + + # Accumulate changes before storing improves performance with the + # risk of losing the last writes if the JVM crashes. + # The interval is by default set to 'off' to write each update immediately. + # Enabling write behind by specifying a duration, e.g. 200ms, is especially + # efficient when performing many writes to the same key, because it is only + # the last value for each key that will be serialized and stored. + # write-behind-interval = 200 ms + write-behind-interval = off + } + } + +} +#//#distributed-data + +# Protobuf serializer for cluster DistributedData messages +akka.actor { + serializers { + akka-data-replication = "akka.cluster.ddata.protobuf.ReplicatorMessageSerializer" + akka-replicated-data = "akka.cluster.ddata.protobuf.ReplicatedDataSerializer" + } + serialization-bindings { + "akka.cluster.ddata.Replicator$ReplicatorMessage" = akka-data-replication + "akka.cluster.ddata.ReplicatedDataSerialization" = akka-replicated-data + } + serialization-identifiers { + "akka.cluster.ddata.protobuf.ReplicatedDataSerializer" = 11 + "akka.cluster.ddata.protobuf.ReplicatorMessageSerializer" = 12 + } +} diff --git a/akka/repackaged-akka-jar/src/main/resources/persistence_reference.conf b/akka/repackaged-akka-jar/src/main/resources/persistence_reference.conf new file mode 100644 index 0000000000..db9ae1ecbe --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/persistence_reference.conf @@ -0,0 +1,355 @@ +########################################################### +# Akka Persistence Extension Reference Configuration File # +########################################################### + +# This is the reference config file that contains all the default settings. +# Make your edits in your application.conf in order to override these settings. + +# Directory of persistence journal and snapshot store plugins is available at the +# Akka Community Projects page https://akka.io/community/ + +# Default persistence extension settings. +akka.persistence { + + # When starting many persistent actors at the same time the journal + # and its data store is protected from being overloaded by limiting number + # of recoveries that can be in progress at the same time. When + # exceeding the limit the actors will wait until other recoveries have + # been completed. + max-concurrent-recoveries = 50 + + # Fully qualified class name providing a default internal stash overflow strategy. + # It needs to be a subclass of akka.persistence.StashOverflowStrategyConfigurator. + # The default strategy throws StashOverflowException. + internal-stash-overflow-strategy = "akka.persistence.ThrowExceptionConfigurator" + journal { + # Absolute path to the journal plugin configuration entry used by + # persistent actor by default. + # Persistent actor can override `journalPluginId` method + # in order to rely on a different journal plugin. + plugin = "" + # List of journal plugins to start automatically. Use "" for the default journal plugin. + auto-start-journals = [] + } + snapshot-store { + # Absolute path to the snapshot plugin configuration entry used by + # persistent actor by default. + # Persistent actor can override `snapshotPluginId` method + # in order to rely on a different snapshot plugin. + # It is not mandatory to specify a snapshot store plugin. + # If you don't use snapshots you don't have to configure it. + # Note that Cluster Sharding is using snapshots, so if you + # use Cluster Sharding you need to define a snapshot store plugin. + plugin = "" + # List of snapshot stores to start automatically. Use "" for the default snapshot store. + auto-start-snapshot-stores = [] + } + # used as default-snapshot store if no plugin configured + # (see `akka.persistence.snapshot-store`) + no-snapshot-store { + class = "akka.persistence.snapshot.NoSnapshotStore" + } + # Default reliable delivery settings. + at-least-once-delivery { + # Interval between re-delivery attempts. + redeliver-interval = 5s + # Maximum number of unconfirmed messages that will be sent in one + # re-delivery burst. + redelivery-burst-limit = 10000 + # After this number of delivery attempts a + # `ReliableRedelivery.UnconfirmedWarning`, message will be sent to the actor. + warn-after-number-of-unconfirmed-attempts = 5 + # Maximum number of unconfirmed messages that an actor with + # AtLeastOnceDelivery is allowed to hold in memory. + max-unconfirmed-messages = 100000 + } + # Default persistent extension thread pools. + dispatchers { + # Dispatcher used by every plugin which does not declare explicit + # `plugin-dispatcher` field. + default-plugin-dispatcher { + type = PinnedDispatcher + executor = "thread-pool-executor" + } + # Default dispatcher for message replay. + default-replay-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-max = 8 + } + } + # Default dispatcher for streaming snapshot IO + default-stream-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-max = 8 + } + } + } + + # Fallback settings for journal plugin configurations. + # These settings are used if they are not defined in plugin config section. + journal-plugin-fallback { + + # Fully qualified class name providing journal plugin api implementation. + # It is mandatory to specify this property. + # The class must have a constructor without parameters or constructor with + # one `com.typesafe.config.Config` parameter. + class = "" + + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + + # Dispatcher for message replay. + replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" + + # Removed: used to be the Maximum size of a persistent message batch written to the journal. + # Now this setting is without function, PersistentActor will write as many messages + # as it has accumulated since the last write. + max-message-batch-size = 200 + + # If there is more time in between individual events gotten from the journal + # recovery than this the recovery will fail. + # Note that it also affects reading the snapshot before replaying events on + # top of it, even though it is configured for the journal. + recovery-event-timeout = 30s + + circuit-breaker { + max-failures = 10 + call-timeout = 10s + reset-timeout = 30s + } + + # The replay filter can detect a corrupt event stream by inspecting + # sequence numbers and writerUuid when replaying events. + replay-filter { + # What the filter should do when detecting invalid events. + # Supported values: + # `repair-by-discard-old` : discard events from old writers, + # warning is logged + # `fail` : fail the replay, error is logged + # `warn` : log warning but emit events untouched + # `off` : disable this feature completely + mode = repair-by-discard-old + + # It uses a look ahead buffer for analyzing the events. + # This defines the size (in number of events) of the buffer. + window-size = 100 + + # How many old writerUuid to remember + max-old-writers = 10 + + # Set this to `on` to enable detailed debug logging of each + # replayed event. + debug = off + } + } + + # Fallback settings for snapshot store plugin configurations + # These settings are used if they are not defined in plugin config section. + snapshot-store-plugin-fallback { + + # Fully qualified class name providing snapshot store plugin api + # implementation. It is mandatory to specify this property if + # snapshot store is enabled. + # The class must have a constructor without parameters or constructor with + # one `com.typesafe.config.Config` parameter. + class = "" + + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + + circuit-breaker { + max-failures = 5 + call-timeout = 20s + reset-timeout = 60s + } + + # Set this to true if successful loading of snapshot is not necessary. + # This can be useful when it is alright to ignore snapshot in case of + # for example deserialization errors. When snapshot loading fails it will instead + # recover by replaying all events. + # Don't set to true if events are deleted because that would + # result in wrong recovered state if snapshot load fails. + snapshot-is-optional = false + + } + + fsm { + # PersistentFSM saves snapshots after this number of persistent + # events. Snapshots are used to reduce recovery times. + # When you disable this feature, specify snapshot-after = off. + # To enable the feature, specify a number like snapshot-after = 1000 + # which means a snapshot is taken after persisting every 1000 events. + snapshot-after = off + } + + # DurableStateStore settings + state { + # Absolute path to the KeyValueStore plugin configuration entry used by + # DurableStateBehavior actors by default. + # DurableStateBehavior can override `durableStateStorePluginId` method (`withDurableStateStorePluginId`) + # in order to rely on a different plugin. + plugin = "" + } + + # Fallback settings for DurableStateStore plugin configurations + # These settings are used if they are not defined in plugin config section. + state-plugin-fallback { + recovery-timeout = 30s + } +} + +# Protobuf serialization for the persistent extension messages. +akka.actor { + serializers { + akka-persistence-message = "akka.persistence.serialization.MessageSerializer" + akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer" + } + serialization-bindings { + "akka.persistence.serialization.Message" = akka-persistence-message + "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot + } + serialization-identifiers { + "akka.persistence.serialization.MessageSerializer" = 7 + "akka.persistence.serialization.SnapshotSerializer" = 8 + } +} + + +################################################### +# Persistence plugins included with the extension # +################################################### + +# In-memory journal plugin. +akka.persistence.journal.inmem { + # Class name of the plugin. + class = "akka.persistence.journal.inmem.InmemJournal" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.actor.default-dispatcher" + + # Turn this on to test serialization of the events + test-serialization = off +} + +# Local file system snapshot store plugin. +akka.persistence.snapshot-store.local { + # Class name of the plugin. + class = "akka.persistence.snapshot.local.LocalSnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + # Dispatcher for streaming snapshot IO. + stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher" + # Storage location of snapshot files. + dir = "snapshots" + # Number load attempts when recovering from the latest snapshot fails + # yet older snapshot files are available. Each recovery attempt will try + # to recover using an older than previously failed-on snapshot file + # (if any are present). If all attempts fail the recovery will fail and + # the persistent actor will be stopped. + max-load-attempts = 3 +} + +# LevelDB journal plugin. +# Note: this plugin requires explicit LevelDB dependency, see below. +akka.persistence.journal.leveldb { + # Class name of the plugin. + class = "akka.persistence.journal.leveldb.LeveldbJournal" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + # Dispatcher for message replay. + replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" + # Storage location of LevelDB files. + dir = "journal" + # Use fsync on write. + fsync = on + # Verify checksum on read. + checksum = off + # Native LevelDB (via JNI) or LevelDB Java port. + native = on + # Number of deleted messages per persistence id that will trigger journal compaction + compaction-intervals { + } +} + +# Shared LevelDB journal plugin (for testing only). +# Note: this plugin requires explicit LevelDB dependency, see below. +akka.persistence.journal.leveldb-shared { + # Class name of the plugin. + class = "akka.persistence.journal.leveldb.SharedLeveldbJournal" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.actor.default-dispatcher" + # Timeout for async journal operations. + timeout = 10s + store { + # Dispatcher for shared store actor. + store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + # Dispatcher for message replay. + replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" + # Storage location of LevelDB files. + dir = "journal" + # Use fsync on write. + fsync = on + # Verify checksum on read. + checksum = off + # Native LevelDB (via JNI) or LevelDB Java port. + native = on + # Number of deleted messages per persistence id that will trigger journal compaction + compaction-intervals { + } + } +} + +akka.persistence.journal.proxy { + # Class name of the plugin. + class = "akka.persistence.journal.PersistencePluginProxy" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.actor.default-dispatcher" + # Set this to on in the configuration of the ActorSystem + # that will host the target journal + start-target-journal = off + # The journal plugin config path to use for the target journal + target-journal-plugin = "" + # The address of the proxy to connect to from other nodes. Optional setting. + target-journal-address = "" + # Initialization timeout of target lookup + init-timeout = 10s +} + +akka.persistence.snapshot-store.proxy { + # Class name of the plugin. + class = "akka.persistence.journal.PersistencePluginProxy" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.actor.default-dispatcher" + # Set this to on in the configuration of the ActorSystem + # that will host the target snapshot-store + start-target-snapshot-store = off + # The journal plugin config path to use for the target snapshot-store + target-snapshot-store-plugin = "" + # The address of the proxy to connect to from other nodes. Optional setting. + target-snapshot-store-address = "" + # Initialization timeout of target lookup + init-timeout = 10s +} + +# LevelDB persistence requires the following dependency declarations: +# +# SBT: +# "org.iq80.leveldb" % "leveldb" % "0.7" +# "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8" +# +# Maven: +# +# org.iq80.leveldb +# leveldb +# 0.7 +# +# +# org.fusesource.leveldbjni +# leveldbjni-all +# 1.8 +# diff --git a/akka/repackaged-akka-jar/src/main/resources/reference.conf b/akka/repackaged-akka-jar/src/main/resources/reference.conf new file mode 100644 index 0000000000..7e60da8488 --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/reference.conf @@ -0,0 +1,9 @@ +include "actor_reference.conf" +include "actor_typed_reference.conf" +include "cluster_reference.conf" +include "cluster_tools_reference.conf" +include "cluster_typed_reference.conf" +include "distributed_data_reference.conf" +include "persistence_reference.conf" +include "remote_reference.conf" +include "stream_reference.conf" diff --git a/akka/repackaged-akka-jar/src/main/resources/remote_reference.conf b/akka/repackaged-akka-jar/src/main/resources/remote_reference.conf new file mode 100644 index 0000000000..a30bce7190 --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/remote_reference.conf @@ -0,0 +1,1261 @@ +#//#shared +##################################### +# Akka Remote Reference Config File # +##################################### + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + +# comments about akka.actor settings left out where they are already in akka- +# actor.jar, because otherwise they would be repeated in config rendering. +# +# For the configuration of the new remoting implementation (Artery) please look +# at the bottom section of this file as it is listed separately. + +akka { + + actor { + + serializers { + akka-containers = "akka.remote.serialization.MessageContainerSerializer" + akka-misc = "akka.remote.serialization.MiscMessageSerializer" + artery = "akka.remote.serialization.ArteryMessageSerializer" + proto = "akka.remote.serialization.ProtobufSerializer" + daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer" + akka-system-msg = "akka.remote.serialization.SystemMessageSerializer" + } + + serialization-bindings { + "akka.actor.ActorSelectionMessage" = akka-containers + + "akka.remote.DaemonMsgCreate" = daemon-create + + "akka.remote.artery.ArteryMessage" = artery + + # Since akka.protobuf.Message does not extend Serializable but + # GeneratedMessage does, need to use the more specific one here in order + # to avoid ambiguity. + # This is only loaded if akka-protobuf is on the classpath + # It should not be used and users should migrate to using the protobuf classes + # directly + # Remove in 2.7 + "akka.protobuf.GeneratedMessage" = proto + + "akka.protobufv3.internal.GeneratedMessageV3" = proto + + # Since com.google.protobuf.Message does not extend Serializable but + # GeneratedMessage does, need to use the more specific one here in order + # to avoid ambiguity. + # This com.google.protobuf serialization binding is only used if the class can be loaded, + # i.e. com.google.protobuf dependency has been added in the application project. + "com.google.protobuf.GeneratedMessage" = proto + "com.google.protobuf.GeneratedMessageV3" = proto + + "akka.actor.Identify" = akka-misc + "akka.actor.ActorIdentity" = akka-misc + "scala.Some" = akka-misc + "scala.None$" = akka-misc + "java.util.Optional" = akka-misc + "akka.actor.Status$Success" = akka-misc + "akka.actor.Status$Failure" = akka-misc + "akka.actor.ActorRef" = akka-misc + "akka.actor.PoisonPill$" = akka-misc + "akka.actor.Kill$" = akka-misc + "akka.remote.RemoteWatcher$Heartbeat$" = akka-misc + "akka.remote.RemoteWatcher$HeartbeatRsp" = akka-misc + "akka.Done" = akka-misc + "akka.NotUsed" = akka-misc + "akka.actor.Address" = akka-misc + "akka.remote.UniqueAddress" = akka-misc + + "akka.actor.ActorInitializationException" = akka-misc + "akka.actor.IllegalActorStateException" = akka-misc + "akka.actor.ActorKilledException" = akka-misc + "akka.actor.InvalidActorNameException" = akka-misc + "akka.actor.InvalidMessageException" = akka-misc + "java.util.concurrent.TimeoutException" = akka-misc + "akka.remote.serialization.ThrowableNotSerializableException" = akka-misc + + "akka.actor.LocalScope$" = akka-misc + "akka.remote.RemoteScope" = akka-misc + + "com.typesafe.config.impl.SimpleConfig" = akka-misc + "com.typesafe.config.Config" = akka-misc + + "akka.routing.FromConfig" = akka-misc + "akka.routing.DefaultResizer" = akka-misc + "akka.routing.BalancingPool" = akka-misc + "akka.routing.BroadcastGroup" = akka-misc + "akka.routing.BroadcastPool" = akka-misc + "akka.routing.RandomGroup" = akka-misc + "akka.routing.RandomPool" = akka-misc + "akka.routing.RoundRobinGroup" = akka-misc + "akka.routing.RoundRobinPool" = akka-misc + "akka.routing.ScatterGatherFirstCompletedGroup" = akka-misc + "akka.routing.ScatterGatherFirstCompletedPool" = akka-misc + "akka.routing.SmallestMailboxPool" = akka-misc + "akka.routing.TailChoppingGroup" = akka-misc + "akka.routing.TailChoppingPool" = akka-misc + "akka.remote.routing.RemoteRouterConfig" = akka-misc + + "akka.pattern.StatusReply" = akka-misc + + "akka.dispatch.sysmsg.SystemMessage" = akka-system-msg + + # Java Serializer is by default used for exceptions and will by default + # not be allowed to be serialized, but in certain cases they are replaced + # by `akka.remote.serialization.ThrowableNotSerializableException` if + # no specific serializer has been defined: + # - when wrapped in `akka.actor.Status.Failure` for ask replies + # - when wrapped in system messages for exceptions from remote deployed child actors + # + # It's recommended that you implement custom serializer for exceptions that are + # sent remotely, You can add binding to akka-misc (MiscMessageSerializer) for the + # exceptions that have a constructor with single message String or constructor with + # message String as first parameter and cause Throwable as second parameter. Note that it's not + # safe to add this binding for general exceptions such as IllegalArgumentException + # because it may have a subclass without required constructor. + "java.lang.Throwable" = java + } + + serialization-identifiers { + "akka.remote.serialization.ProtobufSerializer" = 2 + "akka.remote.serialization.DaemonMsgCreateSerializer" = 3 + "akka.remote.serialization.MessageContainerSerializer" = 6 + "akka.remote.serialization.MiscMessageSerializer" = 16 + "akka.remote.serialization.ArteryMessageSerializer" = 17 + + "akka.remote.serialization.SystemMessageSerializer" = 22 + + # deprecated in 2.6.0, moved to akka-actor + "akka.remote.serialization.LongSerializer" = 18 + # deprecated in 2.6.0, moved to akka-actor + "akka.remote.serialization.IntSerializer" = 19 + # deprecated in 2.6.0, moved to akka-actor + "akka.remote.serialization.StringSerializer" = 20 + # deprecated in 2.6.0, moved to akka-actor + "akka.remote.serialization.ByteStringSerializer" = 21 + } + + deployment { + + default { + + # if this is set to a valid remote address, the named actor will be + # deployed at that node e.g. "akka://sys@host:port" + remote = "" + + target { + + # A list of hostnames and ports for instantiating the children of a + # router + # The format should be on "akka://sys@host:port", where: + # - sys is the remote actor system name + # - hostname can be either hostname or IP address the remote actor + # should connect to + # - port should be the port for the remote server on the other node + # The number of actor instances to be spawned is still taken from the + # nr-of-instances setting as for local routers; the instances will be + # distributed round-robin among the given nodes. + nodes = [] + + } + } + } + } + + remote { + ### Settings shared by classic remoting and Artery (the new implementation of remoting) + + # Using remoting directly is typically not desirable, so a warning will + # be shown to make this clear. Set this setting to 'off' to suppress that + # warning. + warn-about-direct-use = on + + + # If Cluster is not used, remote watch and deployment are disabled. + # To optionally use them while not using Cluster, set to 'on'. + use-unsafe-remote-features-outside-cluster = off + + # A warning will be logged on remote watch attempts if Cluster + # is not in use and 'use-unsafe-remote-features-outside-cluster' + # is 'off'. Set this to 'off' to suppress these. + warn-unsafe-watch-outside-cluster = on + + # Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf + # [Hayashibara et al]) used for remote death watch. + # The default PhiAccrualFailureDetector will trigger if there are no heartbeats within + # the duration heartbeat-interval + acceptable-heartbeat-pause + threshold_adjustment, + # i.e. around 12.5 seconds with default settings. + watch-failure-detector { + + # FQCN of the failure detector implementation. + # It must implement akka.remote.FailureDetector and have + # a public constructor with a com.typesafe.config.Config and + # akka.actor.EventStream parameter. + implementation-class = "akka.remote.PhiAccrualFailureDetector" + + # How often keep-alive heartbeat messages should be sent to each connection. + heartbeat-interval = 1 s + + # Defines the failure detector threshold. + # A low threshold is prone to generate many wrong suspicions but ensures + # a quick detection in the event of a real crash. Conversely, a high + # threshold generates fewer mistakes but needs more time to detect + # actual crashes. + threshold = 10.0 + + # Number of the samples of inter-heartbeat arrival times to adaptively + # calculate the failure timeout for connections. + max-sample-size = 200 + + # Minimum standard deviation to use for the normal distribution in + # AccrualFailureDetector. Too low standard deviation might result in + # too much sensitivity for sudden, but normal, deviations in heartbeat + # inter arrival times. + min-std-deviation = 100 ms + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # This margin is important to be able to survive sudden, occasional, + # pauses in heartbeat arrivals, due to for example garbage collect or + # network drop. + acceptable-heartbeat-pause = 10 s + + + # How often to check for nodes marked as unreachable by the failure + # detector + unreachable-nodes-reaper-interval = 1s + + # After the heartbeat request has been sent the first failure detection + # will start after this period, even though no heartbeat mesage has + # been received. + expected-response-after = 1 s + + } + + # remote deployment configuration section + deployment { + # deprecated, use `enable-allow-list` + enable-whitelist = off + + # If true, will only allow specific classes listed in `allowed-actor-classes` to be instanciated on this + # system via remote deployment + enable-allow-list = ${akka.remote.deployment.enable-whitelist} + + + # deprecated, use `allowed-actor-classes` + whitelist = [] + + allowed-actor-classes = ${akka.remote.deployment.whitelist} + } + + ### Default dispatcher for the remoting subsystem + default-remote-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 0.5 + parallelism-max = 16 + } + throughput = 10 + } + #//#shared + } + +} + +akka { + + remote { + #//#classic + classic { + + ### Configuration for classic remoting. Classic remoting is deprecated, use artery. + + + # If set to a nonempty string remoting will use the given dispatcher for + # its internal actors otherwise the default dispatcher is used. Please note + # that since remoting can load arbitrary 3rd party drivers (see + # "enabled-transport" and "adapters" entries) it is not guaranteed that + # every module will respect this setting. + use-dispatcher = "akka.remote.default-remote-dispatcher" + + # Settings for the failure detector to monitor connections. + # For TCP it is not important to have fast failure detection, since + # most connection failures are captured by TCP itself. + # The default DeadlineFailureDetector will trigger if there are no heartbeats within + # the duration heartbeat-interval + acceptable-heartbeat-pause, i.e. 124 seconds + # with the default settings. + transport-failure-detector { + + # FQCN of the failure detector implementation. + # It must implement akka.remote.FailureDetector and have + # a public constructor with a com.typesafe.config.Config and + # akka.actor.EventStream parameter. + implementation-class = "akka.remote.DeadlineFailureDetector" + + # How often keep-alive heartbeat messages should be sent to each connection. + heartbeat-interval = 4 s + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # A margin to the `heartbeat-interval` is important to be able to survive sudden, + # occasional, pauses in heartbeat arrivals, due to for example garbage collect or + # network drop. + acceptable-heartbeat-pause = 120 s + } + + + # Timeout after which the startup of the remoting subsystem is considered + # to be failed. Increase this value if your transport drivers (see the + # enabled-transports section) need longer time to be loaded. + startup-timeout = 10 s + + # Timout after which the graceful shutdown of the remoting subsystem is + # considered to be failed. After the timeout the remoting system is + # forcefully shut down. Increase this value if your transport drivers + # (see the enabled-transports section) need longer time to stop properly. + shutdown-timeout = 10 s + + # Before shutting down the drivers, the remoting subsystem attempts to flush + # all pending writes. This setting controls the maximum time the remoting is + # willing to wait before moving on to shut down the drivers. + flush-wait-on-shutdown = 2 s + + # Reuse inbound connections for outbound messages + use-passive-connections = on + + # Controls the backoff interval after a refused write is reattempted. + # (Transports may refuse writes if their internal buffer is full) + backoff-interval = 5 ms + + # Acknowledgment timeout of management commands sent to the transport stack. + command-ack-timeout = 30 s + + # The timeout for outbound associations to perform the handshake. + # If the transport is akka.remote.classic.netty.tcp or akka.remote.classic.netty.ssl + # the configured connection-timeout for the transport will be used instead. + handshake-timeout = 15 s + + ### Security settings + + # Enable untrusted mode for full security of server managed actors, prevents + # system messages to be send by clients, e.g. messages like 'Create', + # 'Suspend', 'Resume', 'Terminate', 'Supervise', 'Link' etc. + untrusted-mode = off + + # When 'untrusted-mode=on' inbound actor selections are by default discarded. + # Actors with paths defined in this list are granted permission to receive actor + # selections messages. + # E.g. trusted-selection-paths = ["/user/receptionist", "/user/namingService"] + trusted-selection-paths = [] + + ### Logging + + # If this is "on", Akka will log all inbound messages at DEBUG level, + # if off then they are not logged + log-received-messages = off + + # If this is "on", Akka will log all outbound messages at DEBUG level, + # if off then they are not logged + log-sent-messages = off + + # Sets the log granularity level at which Akka logs remoting events. This setting + # can take the values OFF, ERROR, WARNING, INFO, DEBUG, or ON. For compatibility + # reasons the setting "on" will default to "debug" level. Please note that the effective + # logging level is still determined by the global logging level of the actor system: + # for example debug level remoting events will be only logged if the system + # is running with debug level logging. + # Failures to deserialize received messages also fall under this flag. + log-remote-lifecycle-events = on + + # Logging of message types with payload size in bytes larger than + # this value. Maximum detected size per message type is logged once, + # with an increase threshold of 10%. + # By default this feature is turned off. Activate it by setting the property to + # a value in bytes, such as 1000b. Note that for all messages larger than this + # limit there will be extra performance and scalability cost. + log-frame-size-exceeding = off + + # Log warning if the number of messages in the backoff buffer in the endpoint + # writer exceeds this limit. It can be disabled by setting the value to off. + log-buffer-size-exceeding = 50000 + + # After failed to establish an outbound connection, the remoting will mark the + # address as failed. This configuration option controls how much time should + # be elapsed before reattempting a new connection. While the address is + # gated, all messages sent to the address are delivered to dead-letters. + # Since this setting limits the rate of reconnects setting it to a + # very short interval (i.e. less than a second) may result in a storm of + # reconnect attempts. + retry-gate-closed-for = 5 s + + # After catastrophic communication failures that result in the loss of system + # messages or after the remote DeathWatch triggers the remote system gets + # quarantined to prevent inconsistent behavior. + # This setting controls how long the Quarantine marker will be kept around + # before being removed to avoid long-term memory leaks. + # WARNING: DO NOT change this to a small value to re-enable communication with + # quarantined nodes. Such feature is not supported and any behavior between + # the affected systems after lifting the quarantine is undefined. + prune-quarantine-marker-after = 5 d + + # If system messages have been exchanged between two systems (i.e. remote death + # watch or remote deployment has been used) a remote system will be marked as + # quarantined after the two system has no active association, and no + # communication happens during the time configured here. + # The only purpose of this setting is to avoid storing system message redelivery + # data (sequence number state, etc.) for an undefined amount of time leading to long + # term memory leak. Instead, if a system has been gone for this period, + # or more exactly + # - there is no association between the two systems (TCP connection, if TCP transport is used) + # - neither side has been attempting to communicate with the other + # - there are no pending system messages to deliver + # for the amount of time configured here, the remote system will be quarantined and all state + # associated with it will be dropped. + # + # Maximum value depends on the scheduler's max limit (default 248 days) and if configured + # to a longer duration this feature will effectively be disabled. Setting the value to + # 'off' will also disable the feature. Note that if disabled there is a risk of a long + # term memory leak. + quarantine-after-silence = 2 d + + # This setting defines the maximum number of unacknowledged system messages + # allowed for a remote system. If this limit is reached the remote system is + # declared to be dead and its UID marked as tainted. + system-message-buffer-size = 20000 + + # This setting defines the maximum idle time after an individual + # acknowledgement for system messages is sent. System message delivery + # is guaranteed by explicit acknowledgement messages. These acks are + # piggybacked on ordinary traffic messages. If no traffic is detected + # during the time period configured here, the remoting will send out + # an individual ack. + system-message-ack-piggyback-timeout = 0.3 s + + # This setting defines the time after internal management signals + # between actors (used for DeathWatch and supervision) that have not been + # explicitly acknowledged or negatively acknowledged are resent. + # Messages that were negatively acknowledged are always immediately + # resent. + resend-interval = 2 s + + # Maximum number of unacknowledged system messages that will be resent + # each 'resend-interval'. If you watch many (> 1000) remote actors you can + # increase this value to for example 600, but a too large limit (e.g. 10000) + # may flood the connection and might cause false failure detection to trigger. + # Test such a configuration by watching all actors at the same time and stop + # all watched actors at the same time. + resend-limit = 200 + + # WARNING: this setting should not be not changed unless all of its consequences + # are properly understood which assumes experience with remoting internals + # or expert advice. + # This setting defines the time after redelivery attempts of internal management + # signals are stopped to a remote system that has been not confirmed to be alive by + # this system before. + initial-system-message-delivery-timeout = 3 m + + ### Transports and adapters + + # List of the transport drivers that will be loaded by the remoting. + # A list of fully qualified config paths must be provided where + # the given configuration path contains a transport-class key + # pointing to an implementation class of the Transport interface. + # If multiple transports are provided, the address of the first + # one will be used as a default address. + enabled-transports = ["akka.remote.classic.netty.tcp"] + + # Transport drivers can be augmented with adapters by adding their + # name to the applied-adapters setting in the configuration of a + # transport. The available adapters should be configured in this + # section by providing a name, and the fully qualified name of + # their corresponding implementation. The class given here + # must implement akka.akka.remote.transport.TransportAdapterProvider + # and have public constructor without parameters. + adapters { + gremlin = "akka.remote.transport.FailureInjectorProvider" + trttl = "akka.remote.transport.ThrottlerProvider" + } + + ### Default configuration for the Netty based transport drivers + + netty.tcp { + # The class given here must implement the akka.remote.transport.Transport + # interface and offer a public constructor which takes two arguments: + # 1) akka.actor.ExtendedActorSystem + # 2) com.typesafe.config.Config + transport-class = "akka.remote.transport.netty.NettyTransport" + + # Transport drivers can be augmented with adapters by adding their + # name to the applied-adapters list. The last adapter in the + # list is the adapter immediately above the driver, while + # the first one is the top of the stack below the standard + # Akka protocol + applied-adapters = [] + + # The default remote server port clients should connect to. + # Default is 2552 (AKKA), use 0 if you want a random available port + # This port needs to be unique for each actor system on the same machine. + port = 2552 + + # The hostname or ip clients should connect to. + # InetAddress.getLocalHost.getHostAddress is used if empty + hostname = "" + + # Use this setting to bind a network interface to a different port + # than remoting protocol expects messages at. This may be used + # when running akka nodes in a separated networks (under NATs or docker containers). + # Use 0 if you want a random available port. Examples: + # + # akka.remote.classic.netty.tcp.port = 2552 + # akka.remote.classic.netty.tcp.bind-port = 2553 + # Network interface will be bound to the 2553 port, but remoting protocol will + # expect messages sent to port 2552. + # + # akka.remote.classic.netty.tcp.port = 0 + # akka.remote.classic.netty.tcp.bind-port = 0 + # Network interface will be bound to a random port, and remoting protocol will + # expect messages sent to the bound port. + # + # akka.remote.classic.netty.tcp.port = 2552 + # akka.remote.classic.netty.tcp.bind-port = 0 + # Network interface will be bound to a random port, but remoting protocol will + # expect messages sent to port 2552. + # + # akka.remote.classic.netty.tcp.port = 0 + # akka.remote.classic.netty.tcp.bind-port = 2553 + # Network interface will be bound to the 2553 port, and remoting protocol will + # expect messages sent to the bound port. + # + # akka.remote.classic.netty.tcp.port = 2552 + # akka.remote.classic.netty.tcp.bind-port = "" + # Network interface will be bound to the 2552 port, and remoting protocol will + # expect messages sent to the bound port. + # + # akka.remote.classic.netty.tcp.port if empty + bind-port = "" + + # Use this setting to bind a network interface to a different hostname or ip + # than remoting protocol expects messages at. + # Use "0.0.0.0" to bind to all interfaces. + # akka.remote.classic.netty.tcp.hostname if empty + bind-hostname = "" + + # Enables SSL support on this transport + enable-ssl = false + + # Sets the connectTimeoutMillis of all outbound connections, + # i.e. how long a connect may take until it is timed out + connection-timeout = 15 s + + # If set to "" then the specified dispatcher + # will be used to accept inbound connections, and perform IO. If "" then + # dedicated threads will be used. + # Please note that the Netty driver only uses this configuration and does + # not read the "akka.remote.use-dispatcher" entry. Instead it has to be + # configured manually to point to the same dispatcher if needed. + use-dispatcher-for-io = "" + + # Sets the high water mark for the in and outbound sockets, + # set to 0b for platform default + write-buffer-high-water-mark = 0b + + # Sets the low water mark for the in and outbound sockets, + # set to 0b for platform default + write-buffer-low-water-mark = 0b + + # Sets the send buffer size of the Sockets, + # set to 0b for platform default + send-buffer-size = 256000b + + # Sets the receive buffer size of the Sockets, + # set to 0b for platform default + receive-buffer-size = 256000b + + # Maximum message size the transport will accept, but at least + # 32000 bytes. + # Please note that UDP does not support arbitrary large datagrams, + # so this setting has to be chosen carefully when using UDP. + # Both send-buffer-size and receive-buffer-size settings has to + # be adjusted to be able to buffer messages of maximum size. + maximum-frame-size = 128000b + + # Sets the size of the connection backlog + backlog = 4096 + + # Enables the TCP_NODELAY flag, i.e. disables Nagle’s algorithm + tcp-nodelay = on + + # Enables TCP Keepalive, subject to the O/S kernel’s configuration + tcp-keepalive = on + + # Enables SO_REUSEADDR, which determines when an ActorSystem can open + # the specified listen port (the meaning differs between *nix and Windows) + # Valid values are "on", "off" and "off-for-windows" + # due to the following Windows bug: https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4476378 + # "off-for-windows" of course means that it's "on" for all other platforms + tcp-reuse-addr = off-for-windows + + # Used to configure the number of I/O worker threads on server sockets + server-socket-worker-pool { + # Min number of threads to cap factor-based number to + pool-size-min = 2 + + # The pool size factor is used to determine thread pool size + # using the following formula: ceil(available processors * factor). + # Resulting size is then bounded by the pool-size-min and + # pool-size-max values. + pool-size-factor = 1.0 + + # Max number of threads to cap factor-based number to + pool-size-max = 2 + } + + # Used to configure the number of I/O worker threads on client sockets + client-socket-worker-pool { + # Min number of threads to cap factor-based number to + pool-size-min = 2 + + # The pool size factor is used to determine thread pool size + # using the following formula: ceil(available processors * factor). + # Resulting size is then bounded by the pool-size-min and + # pool-size-max values. + pool-size-factor = 1.0 + + # Max number of threads to cap factor-based number to + pool-size-max = 2 + } + + + } + + netty.ssl = ${akka.remote.classic.netty.tcp} + netty.ssl = { + # Enable SSL/TLS encryption. + # This must be enabled on both the client and server to work. + enable-ssl = true + + # Factory of SSLEngine. + # Must implement akka.remote.transport.netty.SSLEngineProvider and have a public + # constructor with an ActorSystem parameter. + # The default ConfigSSLEngineProvider is configured by properties in section + # akka.remote.classic.netty.ssl.security + # + # The SSLEngineProvider can also be defined via ActorSystemSetup with + # SSLEngineProviderSetup when starting the ActorSystem. That is useful when + # the SSLEngineProvider implementation requires other external constructor + # parameters or is created before the ActorSystem is created. + # If such SSLEngineProviderSetup is defined this config property is not used. + ssl-engine-provider = akka.remote.transport.netty.ConfigSSLEngineProvider + + security { + # This is the Java Key Store used by the server connection + key-store = "keystore" + + # This password is used for decrypting the key store + key-store-password = "changeme" + + # This password is used for decrypting the key + key-password = "changeme" + + # This is the Java Key Store used by the client connection + trust-store = "truststore" + + # This password is used for decrypting the trust store + trust-store-password = "changeme" + + # Protocol to use for SSL encryption. + protocol = "TLSv1.2" + + # Example: ["TLS_DHE_RSA_WITH_AES_128_GCM_SHA256", + # "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", + # "TLS_DHE_RSA_WITH_AES_256_GCM_SHA384", + # "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"] + # When doing rolling upgrades, make sure to include both the algorithm used + # by old nodes and the preferred algorithm. + # If you use a JDK 8 prior to 8u161 you need to install + # the JCE Unlimited Strength Jurisdiction Policy Files to use AES 256. + # More info here: + # https://www.oracle.com/java/technologies/javase-jce-all-downloads.html + enabled-algorithms = ["TLS_DHE_RSA_WITH_AES_256_GCM_SHA384", + "TLS_RSA_WITH_AES_128_CBC_SHA"] + + # There are two options, and the default SecureRandom is recommended: + # "" or "SecureRandom" => (default) + # "SHA1PRNG" => Can be slow because of blocking issues on Linux + # + # Setting a value here may require you to supply the appropriate cipher + # suite (see enabled-algorithms section above) + random-number-generator = "" + + # Require mutual authentication between TLS peers + # + # Without mutual authentication only the peer that actively establishes a connection (TLS client side) + # checks if the passive side (TLS server side) sends over a trusted certificate. With the flag turned on, + # the passive side will also request and verify a certificate from the connecting peer. + # + # To prevent man-in-the-middle attacks this setting is enabled by default. + # + # Note: Nodes that are configured with this setting to 'on' might not be able to receive messages from nodes that + # run on older versions of akka-remote. This is because in versions of Akka < 2.4.12 the active side of the remoting + # connection will not send over certificates even if asked. + # + # However, starting with Akka 2.4.12, even with this setting "off", the active side (TLS client side) + # will use the given key-store to send over a certificate if asked. A rolling upgrade from versions of + # Akka < 2.4.12 can therefore work like this: + # - upgrade all nodes to an Akka version >= 2.4.12, in the best case the latest version, but keep this setting at "off" + # - then switch this flag to "on" and do again a rolling upgrade of all nodes + # The first step ensures that all nodes will send over a certificate when asked to. The second + # step will ensure that all nodes finally enforce the secure checking of client certificates. + require-mutual-authentication = on + } + } + + ### Default configuration for the failure injector transport adapter + + gremlin { + # Enable debug logging of the failure injector transport adapter + debug = off + } + + backoff-remote-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + parallelism-max = 2 + } + } + } + } +} +#//#classic + +#//#artery +akka { + + remote { + + ### Configuration for Artery, the new implementation of remoting + artery { + + # Disable artery with this flag + enabled = on + + # Select the underlying transport implementation. + # + # Possible values: aeron-udp, tcp, tls-tcp + # See https://doc.akka.io/docs/akka/current/remoting-artery.html#selecting-a-transport for the tradeoffs + # for each transport + transport = tcp + + # Canonical address is the address other clients should connect to. + # Artery transport will expect messages to this address. + canonical { + + # The default remote server port clients should connect to. + # Default is 25520, use 0 if you want a random available port + # This port needs to be unique for each actor system on the same machine. + port = 25520 + + # Hostname clients should connect to. Can be set to an ip, hostname + # or one of the following special values: + # "" InetAddress.getLocalHost.getHostAddress + # "" InetAddress.getLocalHost.getHostName + # + hostname = "" + } + + # Use these settings to bind a network interface to a different address + # than artery expects messages at. This may be used when running Akka + # nodes in a separated networks (under NATs or in containers). If canonical + # and bind addresses are different, then network configuration that relays + # communications from canonical to bind addresses is expected. + bind { + + # Port to bind a network interface to. Can be set to a port number + # of one of the following special values: + # 0 random available port + # "" akka.remote.artery.canonical.port + # + port = "" + + # Hostname to bind a network interface to. Can be set to an ip, hostname + # or one of the following special values: + # "0.0.0.0" all interfaces + # "" akka.remote.artery.canonical.hostname + # "" InetAddress.getLocalHost.getHostAddress + # "" InetAddress.getLocalHost.getHostName + # + hostname = "" + + # Time to wait for Aeron/TCP to bind + bind-timeout = 3s + } + + + # Actor paths to use the large message stream for when a message + # is sent to them over remoting. The large message stream dedicated + # is separate from "normal" and system messages so that sending a + # large message does not interfere with them. + # Entries should be the full path to the actor. Wildcards in the form of "*" + # can be supplied at any place and matches any name at that segment - + # "/user/supervisor/actor/*" will match any direct child to actor, + # while "/supervisor/*/child" will match any grandchild to "supervisor" that + # has the name "child" + # Entries have to be specified on both the sending and receiving side. + # Messages sent to ActorSelections will not be passed through the large message + # stream, to pass such messages through the large message stream the selections + # but must be resolved to ActorRefs first. + large-message-destinations = [] + + # Enable untrusted mode, which discards inbound system messages, PossiblyHarmful and + # ActorSelection messages. E.g. remote watch and remote deployment will not work. + # ActorSelection messages can be enabled for specific paths with the trusted-selection-paths + untrusted-mode = off + + # When 'untrusted-mode=on' inbound actor selections are by default discarded. + # Actors with paths defined in this list are granted permission to receive actor + # selections messages. + # E.g. trusted-selection-paths = ["/user/receptionist", "/user/namingService"] + trusted-selection-paths = [] + + # If this is "on", all inbound remote messages will be logged at DEBUG level, + # if off then they are not logged + log-received-messages = off + + # If this is "on", all outbound remote messages will be logged at DEBUG level, + # if off then they are not logged + log-sent-messages = off + + # Logging of message types with payload size in bytes larger than + # this value. Maximum detected size per message type is logged once, + # with an increase threshold of 10%. + # By default this feature is turned off. Activate it by setting the property to + # a value in bytes, such as 1000b. Note that for all messages larger than this + # limit there will be extra performance and scalability cost. + log-frame-size-exceeding = off + + advanced { + + # Maximum serialized message size, including header data. + maximum-frame-size = 256 KiB + + # Direct byte buffers are reused in a pool with this maximum size. + # Each buffer has the size of 'maximum-frame-size'. + # This is not a hard upper limit on number of created buffers. Additional + # buffers will be created if needed, e.g. when using many outbound + # associations at the same time. Such additional buffers will be garbage + # collected, which is not as efficient as reusing buffers in the pool. + buffer-pool-size = 128 + + # Maximum serialized message size for the large messages, including header data. + # If the value of akka.remote.artery.transport is set to aeron-udp, it is currently + # restricted to 1/8th the size of a term buffer that can be configured by setting the + # 'aeron.term.buffer.length' system property. + # See 'large-message-destinations'. + maximum-large-frame-size = 2 MiB + + # Direct byte buffers for the large messages are reused in a pool with this maximum size. + # Each buffer has the size of 'maximum-large-frame-size'. + # See 'large-message-destinations'. + # This is not a hard upper limit on number of created buffers. Additional + # buffers will be created if needed, e.g. when using many outbound + # associations at the same time. Such additional buffers will be garbage + # collected, which is not as efficient as reusing buffers in the pool. + large-buffer-pool-size = 32 + + # For enabling testing features, such as blackhole in akka-remote-testkit. + test-mode = off + + # Settings for the materializer that is used for the remote streams. + materializer = ${akka.stream.materializer} + + # Remoting will use the given dispatcher for the ordinary and large message + # streams. + use-dispatcher = "akka.remote.default-remote-dispatcher" + + # Remoting will use the given dispatcher for the control stream. + # It can be good to not use the same dispatcher for the control stream as + # the dispatcher for the ordinary message stream so that heartbeat messages + # are not disturbed. + use-control-stream-dispatcher = "akka.actor.internal-dispatcher" + + + # Total number of inbound lanes, shared among all inbound associations. A value + # greater than 1 means that deserialization can be performed in parallel for + # different destination actors. The selection of lane is based on consistent + # hashing of the recipient ActorRef to preserve message ordering per receiver. + # Lowest latency can be achieved with inbound-lanes=1 because of one less + # asynchronous boundary. + inbound-lanes = 4 + + # Number of outbound lanes for each outbound association. A value greater than 1 + # means that serialization and other work can be performed in parallel for different + # destination actors. The selection of lane is based on consistent hashing of the + # recipient ActorRef to preserve message ordering per receiver. Note that messages + # for different destination systems (hosts) are handled by different streams also + # when outbound-lanes=1. Lowest latency can be achieved with outbound-lanes=1 + # because of one less asynchronous boundary. + outbound-lanes = 1 + + # Size of the send queue for outgoing messages. Messages will be dropped if + # the queue becomes full. This may happen if you send a burst of many messages + # without end-to-end flow control. Note that there is one such queue per + # outbound association. The trade-off of using a larger queue size is that + # it consumes more memory, since the queue is based on preallocated array with + # fixed size. + outbound-message-queue-size = 3072 + + # Size of the send queue for outgoing control messages, such as system messages. + # If this limit is reached the remote system is declared to be dead and its UID + # marked as quarantined. Note that there is one such queue per outbound association. + # It is a linked queue so it will not use more memory than needed but by increasing + # too much you may risk OutOfMemoryError in the worst case. + outbound-control-queue-size = 20000 + + # Size of the send queue for outgoing large messages. Messages will be dropped if + # the queue becomes full. This may happen if you send a burst of many messages + # without end-to-end flow control. Note that there is one such queue per + # outbound association. + # It is a linked queue so it will not use more memory than needed but by increasing + # too much you may risk OutOfMemoryError, especially since the message payload + # of these messages may be large. + outbound-large-message-queue-size = 256 + + # This setting defines the maximum number of unacknowledged system messages + # allowed for a remote system. If this limit is reached the remote system is + # declared to be dead and its UID marked as quarantined. + system-message-buffer-size = 20000 + + # unacknowledged system messages are re-delivered with this interval + system-message-resend-interval = 1 second + + + + # The timeout for outbound associations to perform the initial handshake. + # This timeout must be greater than the 'image-liveness-timeout' when + # transport is aeron-udp. + handshake-timeout = 20 seconds + + # incomplete initial handshake attempt is retried with this interval + handshake-retry-interval = 1 second + + # Handshake requests are performed periodically with this interval, + # also after the handshake has been completed to be able to establish + # a new session with a restarted destination system. + inject-handshake-interval = 1 second + + + # System messages that are not acknowledged after re-sending for this period are + # dropped and will trigger quarantine. The value should be longer than the length + # of a network partition that you need to survive. + give-up-system-message-after = 6 hours + + # Outbound streams are stopped when they haven't been used for this duration. + # They are started again when new messages are sent. + stop-idle-outbound-after = 5 minutes + + # Outbound streams are quarantined when they haven't been used for this duration + # to cleanup resources used by the association, such as compression tables. + # This will cleanup association to crashed systems that didn't announce their + # termination. + # The value should be longer than the length of a network partition that you + # need to survive. + # The value must also be greater than stop-idle-outbound-after. + # Once every 1/10 of this duration an extra handshake message will be sent. + # Therfore it's also recommended to use a value that is greater than 10 times + # the stop-idle-outbound-after, since otherwise the idle streams will not be + # stopped. + quarantine-idle-outbound-after = 6 hours + + # Stop outbound stream of a quarantined association after this idle timeout, i.e. + # when not used any more. + stop-quarantined-after-idle = 3 seconds + + # After catastrophic communication failures that could result in the loss of system + # messages or after the remote DeathWatch triggers the remote system gets + # quarantined to prevent inconsistent behavior. + # This setting controls how long the quarantined association will be kept around + # before being removed to avoid long-term memory leaks. It must be quarantined + # and also unused for this duration before it's removed. When removed the historical + # information about which UIDs that were quarantined for that hostname:port is + # gone which could result in communication with a previously quarantined node + # if it wakes up again. Therfore this shouldn't be set too low. + remove-quarantined-association-after = 1 h + + # during ActorSystem termination the remoting will wait this long for + # an acknowledgment by the destination system that flushing of outstanding + # remote messages has been completed + shutdown-flush-timeout = 1 second + + # Before sending notificaiton of terminated actor (DeathWatchNotification) other messages + # will be flushed to make sure that the Terminated message arrives after other messages. + # It will wait this long for the flush acknowledgement before continuing. + # The flushing can be disabled by setting this to `off`. + death-watch-notification-flush-timeout = 3 seconds + + # See 'inbound-max-restarts' + inbound-restart-timeout = 5 seconds + + # Max number of restarts within 'inbound-restart-timeout' for the inbound streams. + # If more restarts occurs the ActorSystem will be terminated. + inbound-max-restarts = 5 + + # Retry outbound connection after this backoff. + # Only used when transport is tcp or tls-tcp. + outbound-restart-backoff = 1 second + + # See 'outbound-max-restarts' + outbound-restart-timeout = 5 seconds + + # Max number of restarts within 'outbound-restart-timeout' for the outbound streams. + # If more restarts occurs the ActorSystem will be terminated. + outbound-max-restarts = 5 + + # compression of common strings in remoting messages, like actor destinations, serializers etc + compression { + + actor-refs { + # Max number of compressed actor-refs + # Note that compression tables are "rolling" (i.e. a new table replaces the old + # compression table once in a while), and this setting is only about the total number + # of compressions within a single such table. + # Must be a positive natural number. Can be disabled with "off". + max = 256 + + # interval between new table compression advertisements. + # this means the time during which we collect heavy-hitter data and then turn it into a compression table. + advertisement-interval = 1 minute + } + manifests { + # Max number of compressed manifests + # Note that compression tables are "rolling" (i.e. a new table replaces the old + # compression table once in a while), and this setting is only about the total number + # of compressions within a single such table. + # Must be a positive natural number. Can be disabled with "off". + max = 256 + + # interval between new table compression advertisements. + # this means the time during which we collect heavy-hitter data and then turn it into a compression table. + advertisement-interval = 1 minute + } + } + + # List of fully qualified class names of remote instruments which should + # be initialized and used for monitoring of remote messages. + # The class must extend akka.remote.artery.RemoteInstrument and + # have a public constructor with empty parameters or one ExtendedActorSystem + # parameter. + # A new instance of RemoteInstrument will be created for each encoder and decoder. + # It's only called from the stage, so if it dosn't delegate to any shared instance + # it doesn't have to be thread-safe. + # Refer to `akka.remote.artery.RemoteInstrument` for more information. + instruments = ${?akka.remote.artery.advanced.instruments} [] + + # Only used when transport is aeron-udp + aeron { + # Periodically log out all Aeron counters. See https://github.com/real-logic/aeron/wiki/Monitoring-and-Debugging#counters + # Only used when transport is aeron-udp. + log-aeron-counters = false + + # Controls whether to start the Aeron media driver in the same JVM or use external + # process. Set to 'off' when using external media driver, and then also set the + # 'aeron-dir'. + # Only used when transport is aeron-udp. + embedded-media-driver = on + + # Directory used by the Aeron media driver. It's mandatory to define the 'aeron-dir' + # if using external media driver, i.e. when 'embedded-media-driver = off'. + # Embedded media driver will use a this directory, or a temporary directory if this + # property is not defined (empty). + # Only used when transport is aeron-udp. + aeron-dir = "" + + # Whether to delete aeron embedded driver directory upon driver stop. + # Only used when transport is aeron-udp. + delete-aeron-dir = yes + + # Level of CPU time used, on a scale between 1 and 10, during backoff/idle. + # The tradeoff is that to have low latency more CPU time must be used to be + # able to react quickly on incoming messages or send as fast as possible after + # backoff backpressure. + # Level 1 strongly prefer low CPU consumption over low latency. + # Level 10 strongly prefer low latency over low CPU consumption. + # Only used when transport is aeron-udp. + idle-cpu-level = 5 + + # messages that are not accepted by Aeron are dropped after retrying for this period + # Only used when transport is aeron-udp. + give-up-message-after = 60 seconds + + # Timeout after which aeron driver has not had keepalive messages + # from a client before it considers the client dead. + # Only used when transport is aeron-udp. + client-liveness-timeout = 20 seconds + + # Timout after after which an uncommitted publication will be unblocked + # Only used when transport is aeron-udp. + publication-unblock-timeout = 40 seconds + + # Timeout for each the INACTIVE and LINGER stages an aeron image + # will be retained for when it is no longer referenced. + # This timeout must be less than the 'handshake-timeout'. + # Only used when transport is aeron-udp. + image-liveness-timeout = 10 seconds + + # Timeout after which the aeron driver is considered dead + # if it does not update its C'n'C timestamp. + # Only used when transport is aeron-udp. + driver-timeout = 20 seconds + } + + # Only used when transport is tcp or tls-tcp. + tcp { + # Timeout of establishing outbound connections. + connection-timeout = 5 seconds + + # The local address that is used for the client side of the TCP connection. + outbound-client-hostname = "" + } + + } + + # SSL configuration that is used when transport=tls-tcp. + ssl { + # Factory of SSLEngine. + # Must implement akka.remote.artery.tcp.SSLEngineProvider and have a public + # constructor with an ActorSystem parameter. + # The default ConfigSSLEngineProvider is configured by properties in section + # akka.remote.artery.ssl.config-ssl-engine + ssl-engine-provider = akka.remote.artery.tcp.ConfigSSLEngineProvider + + # Config of akka.remote.artery.tcp.ConfigSSLEngineProvider + config-ssl-engine { + + # This is the Java Key Store used by the server connection + key-store = "keystore" + + # This password is used for decrypting the key store + # Use substitution from environment variables for passwords. Don't define + # real passwords in config files. key-store-password=${SSL_KEY_STORE_PASSWORD} + key-store-password = "changeme" + + # This password is used for decrypting the key + # Use substitution from environment variables for passwords. Don't define + # real passwords in config files. key-password=${SSL_KEY_PASSWORD} + key-password = "changeme" + + # This is the Java Key Store used by the client connection + trust-store = "truststore" + + # This password is used for decrypting the trust store + # Use substitution from environment variables for passwords. Don't define + # real passwords in config files. trust-store-password=${SSL_TRUST_STORE_PASSWORD} + trust-store-password = "changeme" + + # Protocol to use for SSL encryption. + protocol = "TLSv1.2" + + # Example: ["TLS_DHE_RSA_WITH_AES_128_GCM_SHA256", + # "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", + # "TLS_DHE_RSA_WITH_AES_256_GCM_SHA384", + # "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"] + # When doing rolling upgrades, make sure to include both the algorithm used + # by old nodes and the preferred algorithm. + # If you use a JDK 8 prior to 8u161 you need to install + # the JCE Unlimited Strength Jurisdiction Policy Files to use AES 256. + # More info here: + # https://www.oracle.com/java/technologies/javase-jce-all-downloads.html + enabled-algorithms = ["TLS_DHE_RSA_WITH_AES_256_GCM_SHA384", + "TLS_RSA_WITH_AES_128_CBC_SHA"] + + # There are two options, and the default SecureRandom is recommended: + # "" or "SecureRandom" => (default) + # "SHA1PRNG" => Can be slow because of blocking issues on Linux + # + # Setting a value here may require you to supply the appropriate cipher + # suite (see enabled-algorithms section above) + random-number-generator = "" + + # Require mutual authentication between TLS peers + # + # Without mutual authentication only the peer that actively establishes a connection (TLS client side) + # checks if the passive side (TLS server side) sends over a trusted certificate. With the flag turned on, + # the passive side will also request and verify a certificate from the connecting peer. + # + # To prevent man-in-the-middle attacks this setting is enabled by default. + require-mutual-authentication = on + + # Set this to `on` to verify hostnames with sun.security.util.HostnameChecker + # If possible it is recommended to have this enabled. Hostname verification is designed for + # situations where things locate each other by hostname, in scenarios where host names are dynamic + # and not known up front it can make sense to have this disabled. + hostname-verification = off + } + + # Config of akka.remote.artery.tcp.ssl.RotatingKeysSSLEngineProvider + # This engine provider reads PEM files from a mount point shared with the secret + # manager. The constructed SSLContext is cached some time (configurable) so when + # the credentials rotate the new credentials are eventually picked up. + # By default mTLS is enabled. + # This provider also includes a verification phase that runs after the TLS handshake + # phase. In this verification, both peers run an authorization and verify they are + # part of the same akka cluster. The verification happens via comparing the subject + # names in the peer's certificate with the name on the own certificate so if you + # use this SSLEngineProvider you should make sure all nodes on the cluster include + # at least one common subject name (CN or SAN). + # The Key setup this implementation supports has some limitations: + # 1. the private key must be provided on a PKCS#1 or a non-encrypted PKCS#8 PEM-formatted file + # 2. the private key must be be of an algorythm supported by `akka-pki` tools (e.g. "RSA", not "EC") + # 3. the node certificate must be issued by a root CA (not an intermediate CA) + # 4. both the node and the CA certificates must be provided in PEM-formatted files + rotating-keys-engine { + + # This is a convention that people may follow if they wish to save themselves some configuration + secret-mount-point = /var/run/secrets/akka-tls/rotating-keys-engine + + # The absolute path the PEM file with the private key. + key-file = ${akka.remote.artery.ssl.rotating-keys-engine.secret-mount-point}/tls.key + # The absolute path to the PEM file of the certificate for the private key above. + cert-file = ${akka.remote.artery.ssl.rotating-keys-engine.secret-mount-point}/tls.crt + # The absolute path to the PEM file of the certificate of the CA that emited + # the node certificate above. + ca-cert-file = ${akka.remote.artery.ssl.rotating-keys-engine.secret-mount-point}/ca.crt + + # There are two options, and the default SecureRandom is recommended: + # "" or "SecureRandom" => (default) + # "SHA1PRNG" => Can be slow because of blocking issues on Linux + # + # Setting a value here may require you to supply the appropriate cipher + # suite (see enabled-algorithms section) + random-number-generator = "" + + # Example: ["TLS_DHE_RSA_WITH_AES_128_GCM_SHA256", + # "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", + # "TLS_DHE_RSA_WITH_AES_256_GCM_SHA384", + # "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"] + # If you use a JDK 8 prior to 8u161 you need to install + # the JCE Unlimited Strength Jurisdiction Policy Files to use AES 256. + # More info here: + # https://www.oracle.com/java/technologies/javase-jce-all-downloads.html + enabled-algorithms = ["TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"] + + # Protocol to use for SSL encryption. + protocol = "TLSv1.2" + + # How long should an SSLContext instance be cached. When rotating keys and certificates, + # there must a time overlap between the old certificate/key and the new ones. The + # value of this setting should be lower than duration of that overlap. + ssl-context-cache-ttl = 5m + } + } + } + } + +} +#//#artery diff --git a/akka/repackaged-akka-jar/src/main/resources/stream_reference.conf b/akka/repackaged-akka-jar/src/main/resources/stream_reference.conf new file mode 100644 index 0000000000..66d9130e38 --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/stream_reference.conf @@ -0,0 +1,200 @@ +##################################### +# Akka Stream Reference Config File # +##################################### + +# eager creation of the system wide materializer +akka.library-extensions += "akka.stream.SystemMaterializer$" +akka { + stream { + + # Default materializer settings + materializer { + + # Initial size of buffers used in stream elements + initial-input-buffer-size = 4 + # Maximum size of buffers used in stream elements + max-input-buffer-size = 16 + + # Fully qualified config path which holds the dispatcher configuration + # or full dispatcher configuration to be used by ActorMaterializer when creating Actors. + dispatcher = "akka.actor.default-dispatcher" + + # Fully qualified config path which holds the dispatcher configuration + # or full dispatcher configuration to be used by stream operators that + # perform blocking operations + blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" + + # Cleanup leaked publishers and subscribers when they are not used within a given + # deadline + subscription-timeout { + # when the subscription timeout is reached one of the following strategies on + # the "stale" publisher: + # cancel - cancel it (via `onError` or subscribing to the publisher and + # `cancel()`ing the subscription right away + # warn - log a warning statement about the stale element (then drop the + # reference to it) + # noop - do nothing (not recommended) + mode = cancel + + # time after which a subscriber / publisher is considered stale and eligible + # for cancelation (see `akka.stream.subscription-timeout.mode`) + timeout = 5s + } + + # Enable additional troubleshooting logging at DEBUG log level + debug-logging = off + + # Maximum number of elements emitted in batch if downstream signals large demand + output-burst-limit = 1000 + + # Enable automatic fusing of all graphs that are run. For short-lived streams + # this may cause an initial runtime overhead, but most of the time fusing is + # desirable since it reduces the number of Actors that are created. + # Deprecated, since Akka 2.5.0, setting does not have any effect. + auto-fusing = on + + # Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered, + # buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed + # buffer upon stream materialization if the requested buffer size is less than this + # configuration parameter. The default is very high because failing early is better + # than failing under load. + # + # Buffers sized larger than this will dynamically grow/shrink and consume more memory + # per element than the fixed size buffers. + max-fixed-buffer-size = 1000000000 + + # Maximum number of sync messages that actor can process for stream to substream communication. + # Parameter allows to interrupt synchronous processing to get upstream/downstream messages. + # Allows to accelerate message processing that happening within same actor but keep system responsive. + sync-processing-limit = 1000 + + debug { + # Enables the fuzzing mode which increases the chance of race conditions + # by aggressively reordering events and making certain operations more + # concurrent than usual. + # This setting is for testing purposes, NEVER enable this in a production + # environment! + # To get the best results, try combining this setting with a throughput + # of 1 on the corresponding dispatchers. + fuzzing-mode = off + } + + io.tcp { + # The outgoing bytes are accumulated in a buffer while waiting for acknowledgment + # of pending write. This improves throughput for small messages (frames) without + # sacrificing latency. While waiting for the ack the stage will eagerly pull + # from upstream until the buffer exceeds this size. That means that the buffer may hold + # slightly more bytes than this limit (at most one element more). It can be set to 0 + # to disable the usage of the buffer. + write-buffer-size = 16 KiB + + # In addition to the buffering described for property write-buffer-size, try to collect + # more consecutive writes from the upstream stream producers. + # + # The rationale is to increase write efficiency by avoiding separate small + # writes to the network which is expensive to do. Merging those writes together + # (up to `write-buffer-size`) improves throughput for small writes. + # + # The idea is that a running stream may produce multiple small writes consecutively + # in one go without waiting for any external input. To probe the stream for + # data, this features delays sending a write immediately by probing the stream + # for more writes. This works by rescheduling the TCP connection stage via the + # actor mailbox of the underlying actor. Thus, before the stage is reactivated + # the upstream gets another opportunity to emit writes. + # + # When the stage is reactivated and if new writes are detected another round-trip + # is scheduled. The loop repeats until either the number of round trips given in this + # setting is reached, the buffer reaches `write-buffer-size`, or no new writes + # were detected during the last round-trip. + # + # This mechanism ensures that a write is guaranteed to be sent when the remaining stream + # becomes idle waiting for external signals. + # + # In most cases, the extra latency this mechanism introduces should be negligible, + # but depending on the stream setup it may introduce a noticeable delay, + # if the upstream continuously produces small amounts of writes in a + # blocking (CPU-bound) way. + # + # In that case, the feature can either be disabled, or the producing CPU-bound + # work can be taken off-stream to avoid excessive delays (e.g. using `mapAsync` instead of `map`). + # + # A value of 0 disables this feature. + coalesce-writes = 10 + } + + # Time to wait for async materializer creation before throwing an exception + creation-timeout = 20 seconds + + //#stream-ref + # configure defaults for SourceRef and SinkRef + stream-ref { + # Buffer of a SinkRef that is used to batch Request elements from the other side of the stream ref + # + # The buffer will be attempted to be filled eagerly even while the local stage did not request elements, + # because the delay of requesting over network boundaries is much higher. + buffer-capacity = 32 + + # Demand is signalled by sending a cumulative demand message ("requesting messages until the n-th sequence number) + # Using a cumulative demand model allows us to re-deliver the demand message in case of message loss (which should + # be very rare in any case, yet possible -- mostly under connection break-down and re-establishment). + # + # The semantics of handling and updating the demand however are in-line with what Reactive Streams dictates. + # + # In normal operation, demand is signalled in response to arriving elements, however if no new elements arrive + # within `demand-redelivery-interval` a re-delivery of the demand will be triggered, assuming that it may have gotten lost. + demand-redelivery-interval = 1 second + + # Subscription timeout, during which the "remote side" MUST subscribe (materialize) the handed out stream ref. + # This timeout does not have to be very low in normal situations, since the remote side may also need to + # prepare things before it is ready to materialize the reference. However the timeout is needed to avoid leaking + # in-active streams which are never subscribed to. + subscription-timeout = 30 seconds + + # In order to guard the receiving end of a stream ref from never terminating (since awaiting a Completion or Failed + # message) after / before a Terminated is seen, a special timeout is applied once Terminated is received by it. + # This allows us to terminate stream refs that have been targeted to other nodes which are Downed, and as such the + # other side of the stream ref would never send the "final" terminal message. + # + # The timeout specifically means the time between the Terminated signal being received and when the local SourceRef + # determines to fail itself, assuming there was message loss or a complete partition of the completion signal. + final-termination-signal-deadline = 2 seconds + } + //#stream-ref + } + + # Deprecated, left here to not break Akka HTTP which refers to it + blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" + + # Deprecated, will not be used unless user code refer to it, use 'akka.stream.materializer.blocking-io-dispatcher' + # instead, or if from code, prefer the 'ActorAttributes.IODispatcher' attribute + default-blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" + } + + # configure overrides to ssl-configuration here (to be used by akka-streams, and akka-http – i.e. when serving https connections) + ssl-config { + protocol = "TLSv1.2" + } + + actor { + + serializers { + akka-stream-ref = "akka.stream.serialization.StreamRefSerializer" + } + + serialization-bindings { + "akka.stream.SinkRef" = akka-stream-ref + "akka.stream.SourceRef" = akka-stream-ref + "akka.stream.impl.streamref.StreamRefsProtocol" = akka-stream-ref + } + + serialization-identifiers { + "akka.stream.serialization.StreamRefSerializer" = 30 + } + } +} + +# ssl configuration +# folded in from former ssl-config-akka module +ssl-config { + logger = "com.typesafe.sslconfig.akka.util.AkkaLoggerBridge" +} diff --git a/akka/repackaged-akka/pom.xml b/akka/repackaged-akka/pom.xml new file mode 100644 index 0000000000..cc222188b5 --- /dev/null +++ b/akka/repackaged-akka/pom.xml @@ -0,0 +1,233 @@ + + + + + 4.0.0 + + + org.opendaylight.controller + bundle-parent + 9.0.3-SNAPSHOT + ../../bundle-parent + + + repackaged-akka + bundle + ${project.artifactId} + + + + true + true + + + true + + + + + org.opendaylight.controller + repackaged-akka-jar + ${project.version} + provided + + + + com.typesafe + config + + + com.typesafe + ssl-config-core_2.13 + + + io.aeron + aeron-client + + + io.aeron + aeron-driver + + + io.netty + netty + 3.10.6.Final + + + org.agrona + agrona + + + org.reactivestreams + reactive-streams + + + org.lmdbjava + lmdbjava + 0.7.0 + + + com.github.jnr + jffi + + + com.github.jnr + jnr-ffi + + + com.github.jnr + jnr-constants + + + + + org.scala-lang + scala-library + + + org.scala-lang + scala-reflect + + + org.scala-lang.modules + scala-java8-compat_2.13 + + + org.scala-lang.modules + scala-parser-combinators_2.13 + + + + + + + maven-dependency-plugin + + + unpack-license + + + true + + + + unpack + compile + + unpack + + + + + org.opendaylight.controller + repackaged-akka-jar + ${project.version} + + + com.hierynomus + asn-one + 0.4.0 + + + false + true + ${project.build.directory}/classes + + + + unpack-sources + prepare-package + + unpack-dependencies + + + sources + repackaged-akka-jar + ${project.build.directory}/shaded-sources + + + + + + maven-antrun-plugin + + + move-resources + prepare-package + + run + + + + + + + + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + shaded-sources + prepare-package + + add-source + + + ${project.build.directory}/shaded-sources + + + + shaded-resources + prepare-package + + add-resource + + + + + ${project.build.directory}/resources + + + + + + + + org.apache.felix + maven-bundle-plugin + true + + + org.opendaylight.controller.repackaged.akka + + akka.*, + com.typesafe.sslconfig.akka.*, + jdk.jfr, + + + sun.misc;resolution:=optional, + sun.reflect;resolution:=optional, + org.fusesource.leveldbjni;resolution:=optional, + org.iq80.leveldb;resolution:=optional, + org.iq80.leveldb.impl;resolution:=optional, + * + + + + + + + diff --git a/opendaylight/md-sal/mdsal-artifacts/pom.xml b/artifacts/pom.xml similarity index 81% rename from opendaylight/md-sal/mdsal-artifacts/pom.xml rename to artifacts/pom.xml index b9c05e1de2..79e8d88f08 100644 --- a/opendaylight/md-sal/mdsal-artifacts/pom.xml +++ b/artifacts/pom.xml @@ -14,88 +14,37 @@ org.opendaylight.odlparent odlparent-lite - 6.0.1 + 13.0.11 org.opendaylight.controller - mdsal-artifacts - 1.11.0-SNAPSHOT + controller-artifacts + 9.0.3-SNAPSHOT pom - + ${project.groupId} - sal-common-api + repackaged-akka ${project.version} + + ${project.groupId} - sal-common-impl + atomix-storage ${project.version} + + ${project.groupId} sal-common-util ${project.version} - - ${project.groupId} - sal-core-api - ${project.version} - - - ${project.groupId} - sal-core-spi - ${project.version} - - - ${project.groupId} - sal-core-compat - ${project.version} - - - ${project.groupId} - sal-broker-impl - ${project.version} - - - ${project.groupId} - sal-binding-api - ${project.version} - - - ${project.groupId} - sal-binding-broker-impl - ${project.version} - - - ${project.groupId} - sal-binding-util - ${project.version} - - - ${project.groupId} - sal-inmemory-datastore - ${project.version} - - - ${project.groupId} - mdsal-trace-api - ${project.version} - - - ${project.groupId} - mdsal-trace-dom-impl - ${project.version} - - - ${project.groupId} - mdsal-trace-binding-impl - ${project.version} - @@ -112,50 +61,11 @@ ${project.groupId} - sal-binding-broker-impl + mdsal-it-base ${project.version} - test-jar test - - - ${project.groupId} - features-mdsal - ${project.version} - features - xml - runtime - - - ${project.groupId} - features-restconf - ${project.version} - features - xml - runtime - - - ${project.groupId} - features-mdsal-trace - ${project.version} - features - xml - runtime - - - - - org.opendaylight.controller.model - model-inventory - ${project.version} - - - org.opendaylight.controller.model - model-topology - ${project.version} - - ${project.groupId} @@ -209,17 +119,17 @@ org.opendaylight.controller cds-access-api - 1.7.0-SNAPSHOT + ${project.version} org.opendaylight.controller cds-access-client - 1.7.0-SNAPSHOT + ${project.version} org.opendaylight.controller cds-access-client - 1.7.0-SNAPSHOT + ${project.version} test-jar test @@ -233,10 +143,25 @@ sal-cluster-admin-impl ${project.version} + + org.opendaylight.controller + sal-cluster-admin-karaf-cli + ${project.version} + org.opendaylight.controller cds-dom-api - 1.7.0-SNAPSHOT + ${project.version} + + + org.opendaylight.controller + cds-mgmt-api + ${project.version} + + + ${project.groupId} + eos-dom-akka + ${project.version} @@ -269,91 +194,116 @@ runtime - + - org.opendaylight.controller - messagebus-api + org.opendaylight.controller.samples + clustering-it-config ${project.version} - org.opendaylight.controller - messagebus-spi + org.opendaylight.controller.samples + clustering-it-model ${project.version} - org.opendaylight.controller - messagebus-impl + org.opendaylight.controller.samples + clustering-it-provider ${project.version} - org.opendaylight.controller - messagebus-util + org.opendaylight.controller.samples + clustering-it-karaf-cli ${project.version} - + - org.opendaylight.controller.samples - clustering-it-config + + ${project.groupId} + sal-clustering-config ${project.version} + xml + akkaconf - org.opendaylight.controller.samples - clustering-it-model + + ${project.groupId} + sal-clustering-config ${project.version} + xml + factoryakkaconf - org.opendaylight.controller.samples - clustering-it-provider + + ${project.groupId} + sal-clustering-config ${project.version} + xml + moduleshardconf - - + ${project.groupId} - odl-mdsal-broker + sal-clustering-config ${project.version} xml + moduleconf + + + + ${project.groupId} + sal-clustering-config + ${project.version} + cfg + datastore + + + + + ${project.groupId} + features-controller + ${project.version} features + xml ${project.groupId} - odl-mdsal-broker-local + odl-controller-akka ${project.version} xml features ${project.groupId} - odl-mdsal-clustering + odl-controller-scala ${project.version} xml features ${project.groupId} - odl-mdsal-clustering-commons + odl-mdsal-broker ${project.version} xml features ${project.groupId} - odl-controller-mdsal-common + odl-controller-broker-local ${project.version} xml features ${project.groupId} - odl-controller-binding-api + odl-mdsal-clustering-commons ${project.version} xml features ${project.groupId} - odl-controller-dom-api + odl-controller-mdsal-common ${project.version} xml features @@ -381,28 +331,30 @@ ${project.groupId} - odl-message-bus-collector + odl-toaster ${project.version} xml features ${project.groupId} - odl-mdsal-model-inventory + odl-jolokia ${project.version} xml features + + ${project.groupId} - odl-controller-model-topology + features-controller-testing ${project.version} - xml features + xml ${project.groupId} - odl-toaster + odl-mdsal-benchmark ${project.version} xml features @@ -415,6 +367,36 @@ features + + + ${project.groupId} + benchmark-api + ${project.version} + + + ${project.groupId} + dsbenchmark + ${project.version} + + + ${project.groupId} + ntfbenchmark + ${project.version} + + + ${project.groupId} + rpcbenchmark + ${project.version} + + + + + ${project.groupId} + features-controller-experimental + ${project.version} + features + xml + diff --git a/atomix-storage/LICENSE b/atomix-storage/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/atomix-storage/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/atomix-storage/pom.xml b/atomix-storage/pom.xml new file mode 100644 index 0000000000..bb07137137 --- /dev/null +++ b/atomix-storage/pom.xml @@ -0,0 +1,157 @@ + + + 4.0.0 + + + org.opendaylight.controller + bundle-parent + 9.0.3-SNAPSHOT + ../bundle-parent + + + atomix-storage + Atomix Storage + bundle + + + true + false + + + + + com.google.guava + guava + + + io.netty + netty-buffer + + + io.netty + netty-common + + + org.eclipse.jdt + org.eclipse.jdt.annotation + + + com.esotericsoftware + kryo + 4.0.3 + provided + + + com.esotericsoftware + minlog + 1.3.1 + provided + + + com.esotericsoftware + reflectasm + 1.11.9 + provided + + + org.objenesis + objenesis + 2.6 + provided + + + + com.google.guava + guava-testlib + + + + + + + + maven-dependency-plugin + + + unpack-license + + true + + + + + + maven-antrun-plugin + + + copy-license + prepare-package + + run + + + + + + + + + + + maven-checkstyle-plugin + + + check-license + + check + + + true + + + + + + + org.apache.felix + maven-bundle-plugin + true + + + + io.atomix.storage.journal + + + sun.nio.ch;resolution:=optional, + sun.misc;resolution:=optional, + !COM.newmonics.*, + !android.os, + * + + + + + *;inline=true;groupId=com.esotericsoftware, + *;inline=true;groupId=org.objenesis, + + + + + + + diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java new file mode 100644 index 0000000000..767e67fa46 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import org.eclipse.jdt.annotation.NonNullByDefault; + +/** + * A {@link JournalReader} traversing only committed entries. + */ +@NonNullByDefault +final class CommitsSegmentJournalReader extends SegmentedJournalReader { + CommitsSegmentJournalReader(final SegmentedJournal journal, final JournalSegment segment) { + super(journal, segment); + } + + @Override + public T tryNext(final EntryMapper mapper) { + return getNextIndex() <= journal.getCommitIndex() ? super.tryNext(mapper) : null; + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java new file mode 100644 index 0000000000..311d16b150 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import org.eclipse.jdt.annotation.NonNull; + +/** + * A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer. + */ +final class DiskFileReader extends FileReader { + /** + * Just do not bother with IO smaller than this many bytes. + */ + private static final int MIN_IO_SIZE = 8192; + + private final FileChannel channel; + private final ByteBuffer buffer; + + // tracks where memory's first available byte maps to in terms of FileChannel.position() + private int bufferPosition; + + DiskFileReader(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) { + this(path, channel, allocateBuffer(maxSegmentSize, maxEntrySize)); + } + + // Note: take ownership of the buffer + DiskFileReader(final Path path, final FileChannel channel, final ByteBuffer buffer) { + super(path); + this.channel = requireNonNull(channel); + this.buffer = buffer.flip(); + bufferPosition = 0; + } + + static ByteBuffer allocateBuffer(final int maxSegmentSize, final int maxEntrySize) { + return ByteBuffer.allocate(chooseBufferSize(maxSegmentSize, maxEntrySize)); + } + + private static int chooseBufferSize(final int maxSegmentSize, final int maxEntrySize) { + if (maxSegmentSize <= MIN_IO_SIZE) { + // just buffer the entire segment + return maxSegmentSize; + } + + // one full entry plus its header, or MIN_IO_SIZE, which benefits the read of many small entries + final int minBufferSize = maxEntrySize + SegmentEntry.HEADER_BYTES; + return minBufferSize <= MIN_IO_SIZE ? MIN_IO_SIZE : minBufferSize; + } + + @Override + void invalidateCache() { + buffer.clear().flip(); + bufferPosition = 0; + } + + @Override + ByteBuffer read(final int position, final int size) { + // calculate logical seek distance between buffer's first byte and position and split flow between + // forward-moving and backwards-moving code paths. + final int seek = bufferPosition - position; + return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size); + } + + private @NonNull ByteBuffer forwardAndRead(final int seek, final int position, final int size) { + final int missing = buffer.limit() - seek - size; + if (missing <= 0) { + // fast path: we have the requested region + return buffer.slice(seek, size).asReadOnlyBuffer(); + } + + // We need to read more data, but let's salvage what we can: + // - set buffer position to seek, which means it points to the same as position + // - run compact, which moves everything between position and limit onto the beginning of buffer and + // sets it up to receive more bytes + // - start the read accounting for the seek + buffer.position(seek).compact(); + readAtLeast(position + seek, missing); + return setAndSlice(position, size); + } + + private @NonNull ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) { + // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and + // do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and + // read it. + buffer.clear(); + readAtLeast(position, size); + return setAndSlice(position, size); + } + + private void readAtLeast(final int readPosition, final int readAtLeast) { + final int bytesRead; + try { + bytesRead = channel.read(buffer, readPosition); + } catch (IOException e) { + throw new StorageException(e); + } + verify(bytesRead >= readAtLeast, "Short read %s, expected %s", bytesRead, readAtLeast); + buffer.flip(); + } + + private @NonNull ByteBuffer setAndSlice(final int position, final int size) { + bufferPosition = position; + return buffer.slice(0, size).asReadOnlyBuffer(); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java new file mode 100644 index 0000000000..5f468d46a1 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; + +/** + * A {@link StorageLevel#DISK} {@link FileWriter}. + */ +final class DiskFileWriter extends FileWriter { + private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]); + + private final DiskFileReader reader; + private final ByteBuffer buffer; + + DiskFileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) { + super(path, channel, maxSegmentSize, maxEntrySize); + buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize); + reader = new DiskFileReader(path, channel, buffer); + } + + @Override + DiskFileReader reader() { + return reader; + } + + @Override + MappedByteBuffer buffer() { + return null; + } + + @Override + MappedFileWriter toMapped() { + flush(); + return new MappedFileWriter(path, channel, maxSegmentSize, maxEntrySize); + } + + @Override + DiskFileWriter toDisk() { + return null; + } + + @Override + void writeEmptyHeader(final int position) { + try { + channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position); + } catch (IOException e) { + throw new StorageException(e); + } + } + + @Override + ByteBuffer startWrite(final int position, final int size) { + return buffer.clear().slice(0, size); + } + + @Override + void commitWrite(final int position, final ByteBuffer entry) { + try { + channel.write(entry, position); + } catch (IOException e) { + throw new StorageException(e); + } + } + + @Override + void flush() { + if (channel.isOpen()) { + try { + channel.force(true); + } catch (IOException e) { + throw new StorageException(e); + } + } + } + + @Override + void close() { + flush(); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java new file mode 100644 index 0000000000..fdc0597d36 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import org.eclipse.jdt.annotation.NonNull; + +/** + * An abstraction over how to read a {@link JournalSegmentFile}. + */ +abstract sealed class FileReader permits DiskFileReader, MappedFileReader { + private final Path path; + + FileReader(final Path path) { + this.path = requireNonNull(path); + } + + /** + * Invalidate any cache that is present, so that the next read is coherent with the backing file. + */ + abstract void invalidateCache(); + + /** + * Read the some bytes as specified position. The sum of position and size is guaranteed not to exceed the maximum + * segment size nor maximum entry size. + * + * @param position position to the entry header + * @param size to read + * @return resulting buffer + */ + abstract @NonNull ByteBuffer read(int position, int size); + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("path", path).toString(); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java new file mode 100644 index 0000000000..4ead89bfb3 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import org.eclipse.jdt.annotation.Nullable; + +/** + * An abstraction over how to write a {@link JournalSegmentFile}. + */ +abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter { + final Path path; + final FileChannel channel; + final int maxSegmentSize; + final int maxEntrySize; + + FileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) { + this.path = requireNonNull(path); + this.channel = requireNonNull(channel); + this.maxSegmentSize = maxSegmentSize; + this.maxEntrySize = maxEntrySize; + } + + /** + * Return the internal {@link FileReader}. + * + * @return the internal FileReader + */ + abstract FileReader reader(); + + /** + * Write {@link SegmentEntry#HEADER_BYTES} worth of zeroes at specified position. + * + * @param position position to write to + */ + abstract void writeEmptyHeader(int position); + + abstract ByteBuffer startWrite(int position, int size); + + abstract void commitWrite(int position, ByteBuffer entry); + + /** + * Flushes written entries to disk. + */ + abstract void flush(); + + /** + * Closes this writer. + */ + abstract void close(); + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("path", path).toString(); + } + + abstract @Nullable MappedByteBuffer buffer(); + + abstract @Nullable MappedFileWriter toMapped(); + + abstract @Nullable DiskFileWriter toDisk(); +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/Indexed.java b/atomix-storage/src/main/java/io/atomix/storage/journal/Indexed.java new file mode 100644 index 0000000000..5bf7e6f454 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/Indexed.java @@ -0,0 +1,44 @@ +/* + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; +import org.eclipse.jdt.annotation.NonNullByDefault; + +/** + * Indexed journal entry. + * + * @param entry type + * @param index the entry index + * @param entry the indexed entry + * @param size the serialized entry size + */ +// FIXME: it seems 'index' has to be non-zero, we should enforce that if that really is the case +// FIXME: it seems 'size' has not be non-zero, we should enforce that if that really is the case +@NonNullByDefault +public record Indexed(long index, E entry, int size) { + public Indexed { + requireNonNull(entry); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("index", index).add("entry", entry).toString(); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java new file mode 100644 index 0000000000..5e37c12222 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import java.io.Closeable; + +/** + * Journal. + * + * @author Jordan Halterman + */ +public interface Journal extends Closeable { + + /** + * Returns the journal writer. + * + * @return The journal writer. + */ + JournalWriter writer(); + + /** + * Opens a new journal reader. + * + * @param index The index at which to start the reader. + * @return A new journal reader. + */ + JournalReader openReader(long index); + + /** + * Opens a new journal reader. + * + * @param index The index at which to start the reader. + * @param mode the reader mode + * @return A new journal reader. + */ + JournalReader openReader(long index, JournalReader.Mode mode); + + /** + * Returns a boolean indicating whether the journal is open. + * + * @return Indicates whether the journal is open. + */ + boolean isOpen(); + + @Override + void close(); +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java new file mode 100644 index 0000000000..a3c6ea5366 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java @@ -0,0 +1,97 @@ +/* + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; + +/** + * Log reader. + * + * @author Jordan Halterman + */ +@NonNullByDefault +public interface JournalReader extends AutoCloseable { + /** + * Raft log reader mode. + */ + enum Mode { + /** + * Reads all entries from the log. + */ + ALL, + /** + * Reads committed entries from the log. + */ + COMMITS, + } + + /** + * A journal entry processor. Responsible for transforming entries into their internal representation. + * + * @param Entry type + * @param Internal representation type + */ + @FunctionalInterface + interface EntryMapper { + /** + * Process an entry. + * + * @param index entry index + * @param entry entry itself + * @param size entry size + * @return resulting internal representation + */ + T mapEntry(long index, E entry, int size); + } + + /** + * Returns the first index in the journal. + * + * @return the first index in the journal + */ + long getFirstIndex(); + + /** + * Returns the next reader index. + * + * @return The next reader index. + */ + long getNextIndex(); + + /** + * Try to move to the next entry. + * + * @param mapper callback to be invoked for the entry + * @return processed entry, or {@code null} + */ + @Nullable T tryNext(EntryMapper mapper); + + /** + * Resets the reader to the start. + */ + void reset(); + + /** + * Resets the reader to the given index. + * + * @param index The index to which to reset the reader. + */ + void reset(long index); + + @Override + void close(); +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java new file mode 100644 index 0000000000..02921bed2b --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java @@ -0,0 +1,270 @@ +/* + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import com.google.common.base.MoreObjects; +import io.atomix.storage.journal.index.JournalIndex; +import io.atomix.storage.journal.index.Position; +import io.atomix.storage.journal.index.SparseJournalIndex; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.eclipse.jdt.annotation.Nullable; + +/** + * Log segment. + * + * @author Jordan Halterman + */ +final class JournalSegment implements AutoCloseable { + private final JournalSegmentFile file; + private final JournalSegmentDescriptor descriptor; + private final StorageLevel storageLevel; + private final int maxEntrySize; + private final JournalIndex journalIndex; + private final Set readers = ConcurrentHashMap.newKeySet(); + private final AtomicInteger references = new AtomicInteger(); + private final FileChannel channel; + + private JournalSegmentWriter writer; + private boolean open = true; + + JournalSegment( + final JournalSegmentFile file, + final JournalSegmentDescriptor descriptor, + final StorageLevel storageLevel, + final int maxEntrySize, + final double indexDensity) { + this.file = file; + this.descriptor = descriptor; + this.storageLevel = storageLevel; + this.maxEntrySize = maxEntrySize; + journalIndex = new SparseJournalIndex(indexDensity); + try { + channel = FileChannel.open(file.file().toPath(), + StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE); + } catch (IOException e) { + throw new StorageException(e); + } + + final var fileWriter = switch (storageLevel) { + case DISK -> new DiskFileWriter(file.file().toPath(), channel, descriptor.maxSegmentSize(), maxEntrySize); + case MAPPED -> new MappedFileWriter(file.file().toPath(), channel, descriptor.maxSegmentSize(), maxEntrySize); + }; + writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex) + // relinquish mapped memory + .toFileChannel(); + } + + /** + * Returns the segment's starting index. + * + * @return The segment's starting index. + */ + long firstIndex() { + return descriptor.index(); + } + + /** + * Returns the last index in the segment. + * + * @return The last index in the segment. + */ + long lastIndex() { + return writer.getLastIndex(); + } + + /** + * Returns the size of the segment. + * + * @return the size of the segment + */ + int size() { + try { + return (int) channel.size(); + } catch (IOException e) { + throw new StorageException(e); + } + } + + /** + * Returns the segment file. + * + * @return The segment file. + */ + JournalSegmentFile file() { + return file; + } + + /** + * Returns the segment descriptor. + * + * @return The segment descriptor. + */ + JournalSegmentDescriptor descriptor() { + return descriptor; + } + + /** + * Looks up the position of the given index. + * + * @param index the index to lookup + * @return the position of the given index or a lesser index, or {@code null} + */ + @Nullable Position lookup(final long index) { + return journalIndex.lookup(index); + } + + /** + * Acquires a reference to the log segment. + */ + private void acquire() { + if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) { + writer = writer.toMapped(); + } + } + + /** + * Releases a reference to the log segment. + */ + private void release() { + if (references.decrementAndGet() == 0) { + if (storageLevel == StorageLevel.MAPPED) { + writer = writer.toFileChannel(); + } + if (!open) { + finishClose(); + } + } + } + + /** + * Acquires a reference to the segment writer. + * + * @return The segment writer. + */ + JournalSegmentWriter acquireWriter() { + checkOpen(); + acquire(); + + return writer; + } + + /** + * Releases the reference to the segment writer. + */ + void releaseWriter() { + release(); + } + + /** + * Creates a new segment reader. + * + * @return A new segment reader. + */ + JournalSegmentReader createReader() { + checkOpen(); + acquire(); + + final var buffer = writer.buffer(); + final var path = file.file().toPath(); + final var fileReader = buffer != null ? new MappedFileReader(path, buffer) + : new DiskFileReader(path, channel, descriptor.maxSegmentSize(), maxEntrySize); + final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize); + reader.setPosition(JournalSegmentDescriptor.BYTES); + readers.add(reader); + return reader; + } + + /** + * Closes a segment reader. + * + * @param reader the closed segment reader + */ + void closeReader(JournalSegmentReader reader) { + if (readers.remove(reader)) { + release(); + } + } + + /** + * Checks whether the segment is open. + */ + private void checkOpen() { + if (!open) { + throw new IllegalStateException("Segment not open"); + } + } + + /** + * Returns a boolean indicating whether the segment is open. + * + * @return indicates whether the segment is open + */ + public boolean isOpen() { + return open; + } + + /** + * Closes the segment. + */ + @Override + public void close() { + if (!open) { + return; + } + + open = false; + readers.forEach(JournalSegmentReader::close); + if (references.get() == 0) { + finishClose(); + } + } + + private void finishClose() { + writer.close(); + try { + channel.close(); + } catch (IOException e) { + throw new StorageException(e); + } + } + + /** + * Deletes the segment. + */ + void delete() { + try { + Files.deleteIfExists(file.file().toPath()); + } catch (IOException e) { + throw new StorageException(e); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("id", descriptor.id()) + .add("version", descriptor.version()) + .add("index", firstIndex()) + .toString(); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java new file mode 100644 index 0000000000..757ca3a078 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java @@ -0,0 +1,289 @@ +/* + * Copyright 2015-2022 Open Networking Foundation and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import com.google.common.annotations.VisibleForTesting; + +import java.nio.ByteBuffer; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** + * Stores information about a {@link JournalSegment} of the log. + *

+ * The segment descriptor manages metadata related to a single segment of the log. Descriptors are stored within the + * first {@code 64} bytes of each segment in the following order: + *

    + *
  • {@code id} (64-bit signed integer) - A unique segment identifier. This is a monotonically increasing number within + * each log. Segments with in-sequence identifiers should contain in-sequence indexes.
  • + *
  • {@code index} (64-bit signed integer) - The effective first index of the segment. This indicates the index at which + * the first entry should be written to the segment. Indexes are monotonically increasing thereafter.
  • + *
  • {@code version} (64-bit signed integer) - The version of the segment. Versions are monotonically increasing + * starting at {@code 1}. Versions will only be incremented whenever the segment is rewritten to another memory/disk + * space, e.g. after log compaction.
  • + *
  • {@code maxSegmentSize} (32-bit unsigned integer) - The maximum number of bytes allowed in the segment.
  • + *
  • {@code maxEntries} (32-bit signed integer) - The total number of expected entries in the segment. This is the final + * number of entries allowed within the segment both before and after compaction. This entry count is used to determine + * the count of internal indexing and deduplication facilities.
  • + *
  • {@code updated} (64-bit signed integer) - The last update to the segment in terms of milliseconds since the epoch. + * When the segment is first constructed, the {@code updated} time is {@code 0}. Once all entries in the segment have + * been committed, the {@code updated} time should be set to the current time. Log compaction should not result in a + * change to {@code updated}.
  • + *
  • {@code locked} (8-bit boolean) - A boolean indicating whether the segment is locked. Segments will be locked once + * all entries have been committed to the segment. The lock state of each segment is used to determine log compaction + * and recovery behavior.
  • + *
+ * The remainder of the 64 segment header bytes are reserved for future metadata. + * + * @author Jordan Halterman + */ +public final class JournalSegmentDescriptor { + public static final int BYTES = 64; + + // Current segment version. + @VisibleForTesting + static final int VERSION = 1; + + // The lengths of each field in the header. + private static final int VERSION_LENGTH = Integer.BYTES; // 32-bit signed integer + private static final int ID_LENGTH = Long.BYTES; // 64-bit signed integer + private static final int INDEX_LENGTH = Long.BYTES; // 64-bit signed integer + private static final int MAX_SIZE_LENGTH = Integer.BYTES; // 32-bit signed integer + private static final int MAX_ENTRIES_LENGTH = Integer.BYTES; // 32-bit signed integer + private static final int UPDATED_LENGTH = Long.BYTES; // 64-bit signed integer + + // The positions of each field in the header. + private static final int VERSION_POSITION = 0; // 0 + private static final int ID_POSITION = VERSION_POSITION + VERSION_LENGTH; // 4 + private static final int INDEX_POSITION = ID_POSITION + ID_LENGTH; // 12 + private static final int MAX_SIZE_POSITION = INDEX_POSITION + INDEX_LENGTH; // 20 + private static final int MAX_ENTRIES_POSITION = MAX_SIZE_POSITION + MAX_SIZE_LENGTH; // 24 + private static final int UPDATED_POSITION = MAX_ENTRIES_POSITION + MAX_ENTRIES_LENGTH; // 28 + + /** + * Returns a descriptor builder. + *

+ * The descriptor builder will write segment metadata to a {@code 48} byte in-memory buffer. + * + * @return The descriptor builder. + */ + public static Builder builder() { + return new Builder(ByteBuffer.allocate(BYTES)); + } + + /** + * Returns a descriptor builder for the given descriptor buffer. + * + * @param buffer The descriptor buffer. + * @return The descriptor builder. + * @throws NullPointerException if {@code buffer} is null + */ + public static Builder builder(ByteBuffer buffer) { + return new Builder(buffer); + } + + private final ByteBuffer buffer; + private final int version; + private final long id; + private final long index; + private final int maxSegmentSize; + private final int maxEntries; + private volatile long updated; + private volatile boolean locked; + + /** + * @throws NullPointerException if {@code buffer} is null + */ + public JournalSegmentDescriptor(ByteBuffer buffer) { + this.buffer = buffer; + this.version = buffer.getInt(); + this.id = buffer.getLong(); + this.index = buffer.getLong(); + this.maxSegmentSize = buffer.getInt(); + this.maxEntries = buffer.getInt(); + this.updated = buffer.getLong(); + this.locked = buffer.get() == 1; + } + + /** + * Returns the segment version. + *

+ * Versions are monotonically increasing starting at {@code 1}. + * + * @return The segment version. + */ + public int version() { + return version; + } + + /** + * Returns the segment identifier. + *

+ * The segment ID is a monotonically increasing number within each log. Segments with in-sequence identifiers should + * contain in-sequence indexes. + * + * @return The segment identifier. + */ + public long id() { + return id; + } + + /** + * Returns the segment index. + *

+ * The index indicates the index at which the first entry should be written to the segment. Indexes are monotonically + * increasing thereafter. + * + * @return The segment index. + */ + public long index() { + return index; + } + + /** + * Returns the maximum count of the segment. + * + * @return The maximum allowed count of the segment. + */ + public int maxSegmentSize() { + return maxSegmentSize; + } + + /** + * Returns the maximum number of entries allowed in the segment. + * + * @return The maximum number of entries allowed in the segment. + */ + public int maxEntries() { + return maxEntries; + } + + /** + * Returns last time the segment was updated. + *

+ * When the segment is first constructed, the {@code updated} time is {@code 0}. Once all entries in the segment have + * been committed, the {@code updated} time should be set to the current time. Log compaction should not result in a + * change to {@code updated}. + * + * @return The last time the segment was updated in terms of milliseconds since the epoch. + */ + public long updated() { + return updated; + } + + /** + * Writes an update to the descriptor. + */ + public void update(long timestamp) { + if (!locked) { + buffer.putLong(UPDATED_POSITION, timestamp); + this.updated = timestamp; + } + } + + /** + * Copies the segment to a new buffer. + */ + JournalSegmentDescriptor copyTo(ByteBuffer buffer) { + buffer.putInt(version); + buffer.putLong(id); + buffer.putLong(index); + buffer.putInt(maxSegmentSize); + buffer.putInt(maxEntries); + buffer.putLong(updated); + buffer.put(locked ? (byte) 1 : (byte) 0); + return this; + } + + @Override + public String toString() { + return toStringHelper(this) + .add("version", version) + .add("id", id) + .add("index", index) + .add("updated", updated) + .toString(); + } + + /** + * Segment descriptor builder. + */ + public static class Builder { + private final ByteBuffer buffer; + + private Builder(ByteBuffer buffer) { + this.buffer = requireNonNull(buffer, "buffer cannot be null"); + buffer.putInt(VERSION_POSITION, VERSION); + } + + /** + * Sets the segment identifier. + * + * @param id The segment identifier. + * @return The segment descriptor builder. + */ + public Builder withId(long id) { + buffer.putLong(ID_POSITION, id); + return this; + } + + /** + * Sets the segment index. + * + * @param index The segment starting index. + * @return The segment descriptor builder. + */ + public Builder withIndex(long index) { + buffer.putLong(INDEX_POSITION, index); + return this; + } + + /** + * Sets maximum count of the segment. + * + * @param maxSegmentSize The maximum count of the segment. + * @return The segment descriptor builder. + */ + public Builder withMaxSegmentSize(int maxSegmentSize) { + buffer.putInt(MAX_SIZE_POSITION, maxSegmentSize); + return this; + } + + /** + * Sets the maximum number of entries in the segment. + * + * @param maxEntries The maximum number of entries in the segment. + * @return The segment descriptor builder. + * @deprecated since 3.0.2 + */ + @Deprecated + public Builder withMaxEntries(int maxEntries) { + buffer.putInt(MAX_ENTRIES_POSITION, maxEntries); + return this; + } + + /** + * Builds the segment descriptor. + * + * @return The built segment descriptor. + */ + public JournalSegmentDescriptor build() { + buffer.rewind(); + return new JournalSegmentDescriptor(buffer); + } + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java new file mode 100644 index 0000000000..2190dee5a7 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java @@ -0,0 +1,94 @@ +/* + * Copyright 2015-2022 Open Networking Foundation and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import java.io.File; + +import static java.util.Objects.requireNonNull; + +/** + * Segment file utility. + * + * @author Jordan Halterman + */ +public final class JournalSegmentFile { + private static final char PART_SEPARATOR = '-'; + private static final char EXTENSION_SEPARATOR = '.'; + private static final String EXTENSION = "log"; + private final File file; + + /** + * Returns a boolean value indicating whether the given file appears to be a parsable segment file. + * + * @throws NullPointerException if {@code file} is null + */ + public static boolean isSegmentFile(String name, File file) { + return isSegmentFile(name, file.getName()); + } + + /** + * Returns a boolean value indicating whether the given file appears to be a parsable segment file. + * + * @param journalName the name of the journal + * @param fileName the name of the file to check + * @throws NullPointerException if {@code file} is null + */ + public static boolean isSegmentFile(String journalName, String fileName) { + requireNonNull(journalName, "journalName cannot be null"); + requireNonNull(fileName, "fileName cannot be null"); + + int partSeparator = fileName.lastIndexOf(PART_SEPARATOR); + int extensionSeparator = fileName.lastIndexOf(EXTENSION_SEPARATOR); + + if (extensionSeparator == -1 + || partSeparator == -1 + || extensionSeparator < partSeparator + || !fileName.endsWith(EXTENSION)) { + return false; + } + + for (int i = partSeparator + 1; i < extensionSeparator; i++) { + if (!Character.isDigit(fileName.charAt(i))) { + return false; + } + } + + return fileName.startsWith(journalName); + } + + /** + * Creates a segment file for the given directory, log name, segment ID, and segment version. + */ + static File createSegmentFile(String name, File directory, long id) { + return new File(directory, String.format("%s-%d.log", requireNonNull(name, "name cannot be null"), id)); + } + + /** + * @throws IllegalArgumentException if {@code file} is not a valid segment file + */ + JournalSegmentFile(File file) { + this.file = file; + } + + /** + * Returns the segment file. + * + * @return The segment file. + */ + public File file() { + return file; + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java new file mode 100644 index 0000000000..d89c720c67 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.zip.CRC32; +import org.eclipse.jdt.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class JournalSegmentReader { + private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class); + + private final JournalSegment segment; + private final FileReader fileReader; + private final int maxSegmentSize; + private final int maxEntrySize; + + private int position; + + JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, final int maxEntrySize) { + this.segment = requireNonNull(segment); + this.fileReader = requireNonNull(fileReader); + maxSegmentSize = segment.descriptor().maxSegmentSize(); + this.maxEntrySize = maxEntrySize; + } + + /** + * Return the current position. + * + * @return current position. + */ + int position() { + return position; + } + + /** + * Set the file position. + * + * @param position new position + */ + void setPosition(final int position) { + verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize, + "Invalid position %s", position); + this.position = position; + fileReader.invalidateCache(); + } + + /** + * Invalidate any cache that is present, so that the next read is coherent with the backing file. + */ + void invalidateCache() { + fileReader.invalidateCache(); + } + + /** + * Reads the next binary data block + * + * @param index entry index + * @return The binary data, or {@code null} + */ + @Nullable ByteBuf readBytes(final long index) { + // Check if there is enough in the buffer remaining + final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES; + if (remaining < 0) { + // Not enough space in the segment, there can never be another entry + return null; + } + + // Calculate maximum entry length not exceeding file size nor maxEntrySize + final var maxLength = Math.min(remaining, maxEntrySize); + final var buffer = fileReader.read(position, maxLength + SegmentEntry.HEADER_BYTES); + + // Read the entry length + final var length = buffer.getInt(0); + if (length < 1 || length > maxLength) { + // Invalid length, make sure next read re-tries + invalidateCache(); + return null; + } + + // Read the entry checksum + final int checksum = buffer.getInt(Integer.BYTES); + + // Slice off the entry's bytes + final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length); + // Compute the checksum for the entry bytes. + final var crc32 = new CRC32(); + crc32.update(entryBuffer); + + // If the stored checksum does not equal the computed checksum, do not proceed further + final var computed = (int) crc32.getValue(); + if (checksum != computed) { + LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed)); + invalidateCache(); + return null; + } + + // update position + position += SegmentEntry.HEADER_BYTES + length; + + // return bytes + entryBuffer.rewind(); + return Unpooled.buffer(length).writeBytes(entryBuffer); + } + + /** + * Close this reader. + */ + void close() { + segment.closeReader(this); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java new file mode 100644 index 0000000000..e381bc25a7 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; +import static java.util.Objects.requireNonNull; + +import io.atomix.storage.journal.index.JournalIndex; +import io.netty.buffer.ByteBuf; +import java.nio.MappedByteBuffer; +import java.util.zip.CRC32; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class JournalSegmentWriter { + private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class); + + private final FileWriter fileWriter; + final @NonNull JournalSegment segment; + private final @NonNull JournalIndex index; + final int maxSegmentSize; + final int maxEntrySize; + + private int currentPosition; + private Long lastIndex; + + JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize, + final JournalIndex index) { + this.fileWriter = requireNonNull(fileWriter); + this.segment = requireNonNull(segment); + this.index = requireNonNull(index); + maxSegmentSize = segment.descriptor().maxSegmentSize(); + this.maxEntrySize = maxEntrySize; + // adjust lastEntry value + reset(0); + } + + JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) { + segment = previous.segment; + index = previous.index; + maxSegmentSize = previous.maxSegmentSize; + maxEntrySize = previous.maxEntrySize; + lastIndex = previous.lastIndex; + currentPosition = previous.currentPosition; + this.fileWriter = requireNonNull(fileWriter); + } + + /** + * Returns the last written index. + * + * @return The last written index. + */ + long getLastIndex() { + return lastIndex != null ? lastIndex : segment.firstIndex() - 1; + } + + /** + * Returns the next index to be written. + * + * @return The next index to be written. + */ + long getNextIndex() { + return lastIndex != null ? lastIndex + 1 : segment.firstIndex(); + } + + /** + * Tries to append a binary data to the journal. + * + * @param buf binary data to append + * @return The index of appended data, or {@code null} if segment has no space + */ + Long append(final ByteBuf buf) { + final var length = buf.readableBytes(); + if (length > maxEntrySize) { + throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes (" + + maxEntrySize + ")"); + } + + // Store the entry index. + final long index = getNextIndex(); + final int position = currentPosition; + + // check space available + final int nextPosition = position + HEADER_BYTES + length; + if (nextPosition >= maxSegmentSize) { + LOG.trace("Not enough space for {} at {}", index, position); + return null; + } + + // allocate buffer and write data + final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES); + writeBuffer.put(buf.nioBuffer()); + + // Compute the checksum for the entry. + final var crc32 = new CRC32(); + crc32.update(writeBuffer.flip().position(HEADER_BYTES)); + + // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. + writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue()); + fileWriter.commitWrite(position, writeBuffer.rewind()); + + // Update the last entry with the correct index/term/length. + currentPosition = nextPosition; + lastIndex = index; + this.index.index(index, position); + + return index; + } + + /** + * Resets the head of the segment to the given index. + * + * @param index the index to which to reset the head of the segment + */ + void reset(final long index) { + // acquire ownership of cache and make sure reader does not see anything we've done once we're done + final var fileReader = fileWriter.reader(); + try { + resetWithBuffer(fileReader, index); + } finally { + // Make sure reader does not see anything we've done + fileReader.invalidateCache(); + } + } + + private void resetWithBuffer(final FileReader fileReader, final long index) { + long nextIndex = segment.firstIndex(); + + // Clear the buffer indexes and acquire ownership of the buffer + currentPosition = JournalSegmentDescriptor.BYTES; + final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize); + reader.setPosition(JournalSegmentDescriptor.BYTES); + + while (index == 0 || nextIndex <= index) { + final var buf = reader.readBytes(nextIndex); + if (buf == null) { + break; + } + + lastIndex = nextIndex; + this.index.index(nextIndex, currentPosition); + nextIndex++; + + // Update the current position for indexing. + currentPosition += HEADER_BYTES + buf.readableBytes(); + } + } + + /** + * Truncates the log to the given index. + * + * @param index The index to which to truncate the log. + */ + void truncate(final long index) { + // If the index is greater than or equal to the last index, skip the truncate. + if (index >= getLastIndex()) { + return; + } + + // Reset the last written + lastIndex = null; + + // Truncate the index. + this.index.truncate(index); + + if (index < segment.firstIndex()) { + // Reset the writer to the first entry. + currentPosition = JournalSegmentDescriptor.BYTES; + } else { + // Reset the writer to the given index. + reset(index); + } + + // Zero the entry header at current channel position. + fileWriter.writeEmptyHeader(currentPosition); + } + + /** + * Flushes written entries to disk. + */ + void flush() { + fileWriter.flush(); + } + + /** + * Closes this writer. + */ + void close() { + fileWriter.close(); + } + + /** + * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a + * buffer. + * + * @return the mapped buffer underlying the segment writer, or {@code null}. + */ + @Nullable MappedByteBuffer buffer() { + return fileWriter.buffer(); + } + + @NonNull JournalSegmentWriter toMapped() { + final var newWriter = fileWriter.toMapped(); + return newWriter == null ? this : new JournalSegmentWriter(this, newWriter); + } + + @NonNull JournalSegmentWriter toFileChannel() { + final var newWriter = fileWriter.toDisk(); + return newWriter == null ? this : new JournalSegmentWriter(this, newWriter); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java new file mode 100644 index 0000000000..a970882edf --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java @@ -0,0 +1,209 @@ +/* + * Copyright 2014-2021 Open Networking Foundation + * Copyright 2023 PANTHEON.tech, s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; +import io.atomix.utils.serializer.KryoJournalSerdesBuilder; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * Support for serialization of {@link Journal} entries. + * + * @deprecated due to dependency on outdated Kryo library, {@link JournalSerializer} to be used instead. + */ +@Deprecated(forRemoval = true, since="9.0.3") +public interface JournalSerdes { + /** + * Serializes given object to byte array. + * + * @param obj Object to serialize + * @return serialized bytes + */ + byte[] serialize(Object obj); + + /** + * Serializes given object to byte array. + * + * @param obj Object to serialize + * @param bufferSize maximum size of serialized bytes + * @return serialized bytes + */ + byte[] serialize(Object obj, int bufferSize); + + /** + * Serializes given object to byte buffer. + * + * @param obj Object to serialize + * @param buffer to write to + */ + void serialize(Object obj, ByteBuffer buffer); + + /** + * Serializes given object to OutputStream. + * + * @param obj Object to serialize + * @param stream to write to + */ + void serialize(Object obj, OutputStream stream); + + /** + * Serializes given object to OutputStream. + * + * @param obj Object to serialize + * @param stream to write to + * @param bufferSize size of the buffer in front of the stream + */ + void serialize(Object obj, OutputStream stream, int bufferSize); + + /** + * Deserializes given byte array to Object. + * + * @param bytes serialized bytes + * @param deserialized Object type + * @return deserialized Object + */ + T deserialize(byte[] bytes); + + /** + * Deserializes given byte buffer to Object. + * + * @param buffer input with serialized bytes + * @param deserialized Object type + * @return deserialized Object + */ + T deserialize(final ByteBuffer buffer); + + /** + * Deserializes given InputStream to an Object. + * + * @param stream input stream + * @param deserialized Object type + * @return deserialized Object + */ + T deserialize(InputStream stream); + + /** + * Deserializes given InputStream to an Object. + * + * @param stream input stream + * @param deserialized Object type + * @param bufferSize size of the buffer in front of the stream + * @return deserialized Object + */ + T deserialize(final InputStream stream, final int bufferSize); + + /** + * Creates a new {@link JournalSerdes} builder. + * + * @return builder + */ + static Builder builder() { + return new KryoJournalSerdesBuilder(); + } + + /** + * Builder for {@link JournalSerdes}. + */ + interface Builder { + /** + * Builds a {@link JournalSerdes} instance. + * + * @return A {@link JournalSerdes} implementation. + */ + JournalSerdes build(); + + /** + * Builds a {@link JournalSerdes} instance. + * + * @param friendlyName friendly name for the namespace + * @return A {@link JournalSerdes} implementation. + */ + JournalSerdes build(String friendlyName); + + /** + * Registers serializer for the given set of classes. + *

+ * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees + * all instances will be serialized with the same type ID. + * + * @param classes list of classes to register + * @param serdes serializer to use for the class + * @return this builder + */ + Builder register(EntrySerdes serdes, Class... classes); + + /** + * Sets the namespace class loader. + * + * @param classLoader the namespace class loader + * @return this builder + */ + Builder setClassLoader(ClassLoader classLoader); + } + + /** + * Input data stream exposed to {@link EntrySerdes#read(EntryInput)}. + */ + @Beta + interface EntryInput { + + byte[] readBytes(int length) throws IOException; + + long readLong() throws IOException; + + String readString() throws IOException; + + Object readObject() throws IOException; + + @VisibleForTesting + int readVarInt() throws IOException; + } + + /** + * Output data stream exposed to {@link EntrySerdes#write(EntryOutput, Object)}. + */ + @Beta + interface EntryOutput { + + void writeBytes(byte[] bytes) throws IOException; + + void writeLong(long value) throws IOException; + + void writeObject(Object value) throws IOException; + + void writeString(String value) throws IOException; + + @VisibleForTesting + void writeVarInt(int value) throws IOException; + } + + /** + * A serializer/deserializer for an entry. + * + * @param Entry type + */ + interface EntrySerdes { + + T read(EntryInput input) throws IOException; + + void write(EntryOutput output, T entry) throws IOException; + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java new file mode 100644 index 0000000000..eff9af8559 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 PANTHEON.tech s.r.o. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package io.atomix.storage.journal; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; + +/** + * Support for serialization of {@link Journal} entries. + */ +public interface JournalSerializer { + + /** + * Serializes given object to byte array. + * + * @param obj Object to serialize + * @return serialized bytes as {@link ByteBuf} + */ + ByteBuf serialize(T obj) ; + + /** + * Deserializes given byte array to Object. + * + * @param buf serialized bytes as {@link ByteBuf} + * @return deserialized Object + */ + T deserialize(final ByteBuf buf); + + static JournalSerializer wrap(final JournalSerdes serdes) { + return new JournalSerializer<>() { + @Override + public ByteBuf serialize(final E obj) { + return Unpooled.wrappedBuffer(serdes.serialize(obj)); + } + + @Override + public E deserialize(final ByteBuf buf) { + return serdes.deserialize(ByteBufUtil.getBytes(buf)); + } + }; + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java new file mode 100644 index 0000000000..064fd019ec --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java @@ -0,0 +1,73 @@ +/* + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import org.eclipse.jdt.annotation.NonNull; + +/** + * Log writer. + * + * @author Jordan Halterman + */ +public interface JournalWriter { + /** + * Returns the last written index. + * + * @return The last written index. + */ + long getLastIndex(); + + /** + * Returns the next index to be written. + * + * @return The next index to be written. + */ + long getNextIndex(); + + /** + * Appends an entry to the journal. + * + * @param entry The entry to append. + * @return The appended indexed entry. + */ + @NonNull Indexed append(T entry); + + /** + * Commits entries up to the given index. + * + * @param index The index up to which to commit entries. + */ + void commit(long index); + + /** + * Resets the head of the journal to the given index. + * + * @param index the index to which to reset the head of the journal + */ + void reset(long index); + + /** + * Truncates the log to the given index. + * + * @param index The index to which to truncate the log. + */ + void truncate(long index); + + /** + * Flushes written entries to disk. + */ + void flush(); +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java new file mode 100644 index 0000000000..204fd72550 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import java.nio.ByteBuffer; +import java.nio.file.Path; + +/** + * A {@link StorageLevel#MAPPED} implementation of {@link FileReader}. Operates on direct mapping of the entire file. + */ +final class MappedFileReader extends FileReader { + private final ByteBuffer buffer; + + MappedFileReader(final Path path, final ByteBuffer buffer) { + super(path); + this.buffer = buffer.slice().asReadOnlyBuffer(); + } + + @Override + void invalidateCache() { + // No-op: the mapping is guaranteed to be coherent + } + + @Override + ByteBuffer read(final int position, final int size) { + return buffer.slice(position, size); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java new file mode 100644 index 0000000000..47f26ba151 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import io.netty.util.internal.PlatformDependent; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import org.eclipse.jdt.annotation.NonNull; + +/** + * A {@link StorageLevel#MAPPED} {@link FileWriter}. + */ +final class MappedFileWriter extends FileWriter { + private final @NonNull MappedByteBuffer mappedBuffer; + private final MappedFileReader reader; + private final ByteBuffer buffer; + + MappedFileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) { + super(path, channel, maxSegmentSize, maxEntrySize); + + mappedBuffer = mapBuffer(channel, maxSegmentSize); + buffer = mappedBuffer.slice(); + reader = new MappedFileReader(path, mappedBuffer); + } + + private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) { + try { + return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize); + } catch (IOException e) { + throw new StorageException(e); + } + } + + @Override + MappedFileReader reader() { + return reader; + } + + @Override + MappedByteBuffer buffer() { + return mappedBuffer; + } + + @Override + MappedFileWriter toMapped() { + return null; + } + + @Override + DiskFileWriter toDisk() { + close(); + return new DiskFileWriter(path, channel, maxSegmentSize, maxEntrySize); + } + + @Override + void writeEmptyHeader(final int position) { + // Note: we issue a single putLong() instead of two putInt()s. + buffer.putLong(position, 0L); + } + + @Override + ByteBuffer startWrite(final int position, final int size) { + return buffer.slice(position, size); + } + + @Override + void commitWrite(final int position, final ByteBuffer entry) { + // No-op, buffer is write-through + } + + @Override + void flush() { + mappedBuffer.force(); + } + + @Override + void close() { + flush(); + PlatformDependent.freeDirectBuffer(mappedBuffer); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java new file mode 100644 index 0000000000..be6c6ba831 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import java.nio.ByteBuffer; + +/** + * An {@link Indexed} entry read from {@link JournalSegment}. + * + * @param checksum The CRC32 checksum of data + * @param bytes Entry bytes + */ +record SegmentEntry(int checksum, ByteBuffer bytes) { + /** + * The size of the header, comprising of: + *