<artifactId>akka-actor_2.13</artifactId>
<version>2.6.12</version>
</dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor-typed_2.13</artifactId>
+ <version>2.6.12</version>
+ </dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.13</artifactId>
<version>2.6.12</version>
</dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-cluster-typed_2.13</artifactId>
+ <version>2.6.12</version>
+ </dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-osgi_2.13</artifactId>
--- /dev/null
+akka.actor.typed {
+
+ # List FQCN of `akka.actor.typed.ExtensionId`s which shall be loaded at actor system startup.
+ # Should be on the format: 'extensions = ["com.example.MyExtId1", "com.example.MyExtId2"]' etc.
+ # See the Akka Documentation for more info about Extensions
+ extensions = []
+
+ # List FQCN of extensions which shall be loaded at actor system startup.
+ # Library extensions are regular extensions that are loaded at startup and are
+ # available for third party library authors to enable auto-loading of extensions when
+ # present on the classpath. This is done by appending entries:
+ # 'library-extensions += "Extension"' in the library `reference.conf`.
+ #
+ # Should not be set by end user applications in 'application.conf', use the extensions property for that
+ #
+ library-extensions = ${?akka.actor.typed.library-extensions} []
+
+ # Receptionist is started eagerly to allow clustered receptionist to gather remote registrations early on.
+ library-extensions += "akka.actor.typed.receptionist.Receptionist$"
+
+ # While an actor is restarted (waiting for backoff to expire and children to stop)
+ # incoming messages and signals are stashed, and delivered later to the newly restarted
+ # behavior. This property defines the capacity in number of messages of the stash
+ # buffer. If the capacity is exceed then additional incoming messages are dropped.
+ restart-stash-capacity = 1000
+
+ # Typed mailbox defaults to the single consumber mailbox as balancing dispatcher is not supported
+ default-mailbox {
+ mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
+ }
+}
+
+# Load typed extensions by a classic extension.
+akka.library-extensions += "akka.actor.typed.internal.adapter.ActorSystemAdapter$LoadTypedExtensions"
+
+akka.actor {
+ serializers {
+ typed-misc = "akka.actor.typed.internal.MiscMessageSerializer"
+ service-key = "akka.actor.typed.internal.receptionist.ServiceKeySerializer"
+ }
+
+ serialization-identifiers {
+ "akka.actor.typed.internal.MiscMessageSerializer" = 24
+ "akka.actor.typed.internal.receptionist.ServiceKeySerializer" = 26
+ }
+
+ serialization-bindings {
+ "akka.actor.typed.ActorRef" = typed-misc
+ "akka.actor.typed.internal.adapter.ActorRefAdapter" = typed-misc
+ "akka.actor.typed.internal.receptionist.DefaultServiceKey" = service-key
+ }
+}
+
+# When using Akka Typed (having akka-actor-typed in classpath) the
+# akka.event.slf4j.Slf4jLogger is enabled instead of the DefaultLogger
+# even though it has not been explicitly defined in `akka.loggers`
+# configuration.
+#
+# Slf4jLogger will be used for all Akka classic logging via eventStream,
+# including logging from Akka internals. The Slf4jLogger is then using
+# an ordinary org.slf4j.Logger to emit the log events.
+#
+# The Slf4jLoggingFilter is also enabled automatically.
+#
+# This behavior can be disabled by setting this property to `off`.
+akka.use-slf4j = on
+
+akka.reliable-delivery {
+ producer-controller {
+
+ # To avoid head of line blocking from serialization and transfer
+ # of large messages this can be enabled.
+ # Large messages are chunked into pieces of the given size in bytes. The
+ # chunked messages are sent separatetely and assembled on the consumer side.
+ # Serialization and deserialization is performed by the ProducerController and
+ # ConsumerController respectively instead of in the remote transport layer.
+ chunk-large-messages = off
+
+ durable-queue {
+ # The ProducerController uses this timeout for the requests to
+ # the durable queue. If there is no reply within the timeout it
+ # will be retried.
+ request-timeout = 3s
+
+ # The ProducerController retries requests to the durable queue this
+ # number of times before failing.
+ retry-attempts = 10
+
+ # The ProducerController retries sending the first message with this interval
+ # until it has been confirmed.
+ resend-first-interval = 1s
+ }
+ }
+
+ consumer-controller {
+ # Number of messages in flight between ProducerController and
+ # ConsumerController. The ConsumerController requests for more messages
+ # when half of the window has been used.
+ flow-control-window = 50
+
+ # The ConsumerController resends flow control messages to the
+ # ProducerController with the resend-interval-min, and increasing
+ # it gradually to resend-interval-max when idle.
+ resend-interval-min = 2s
+ resend-interval-max = 30s
+
+ # If this is enabled lost messages will not be resent, but flow control is used.
+ # This can be more efficient since messages don't have to be
+ # kept in memory in the `ProducerController` until they have been
+ # confirmed, but the drawback is that lost messages will not be delivered.
+ only-flow-control = false
+ }
+
+ work-pulling {
+ producer-controller = ${akka.reliable-delivery.producer-controller}
+ producer-controller {
+ # Limit of how many messages that can be buffered when there
+ # is no demand from the consumer side.
+ buffer-size = 1000
+
+ # Ask timeout for sending message to worker until receiving Ack from worker
+ internal-ask-timeout = 60s
+
+ # Chunked messages not implemented for work-pulling yet. Override to not
+ # propagate property from akka.reliable-delivery.producer-controller.
+ chunk-large-messages = off
+ }
+ }
+}
--- /dev/null
+############################################
+# Akka Cluster Tools Reference Config File #
+############################################
+
+# This is the reference config file that contains all the default settings.
+# Make your edits/overrides in your application.conf.
+
+# //#pub-sub-ext-config
+# Settings for the DistributedPubSub extension
+akka.cluster.pub-sub {
+ # Actor name of the mediator actor, /system/distributedPubSubMediator
+ name = distributedPubSubMediator
+
+ # Start the mediator on members tagged with this role.
+ # All members are used if undefined or empty.
+ role = ""
+
+ # The routing logic to use for 'Send'
+ # Possible values: random, round-robin, broadcast
+ routing-logic = random
+
+ # How often the DistributedPubSubMediator should send out gossip information
+ gossip-interval = 1s
+
+ # Removed entries are pruned after this duration
+ removed-time-to-live = 120s
+
+ # Maximum number of elements to transfer in one message when synchronizing the registries.
+ # Next chunk will be transferred in next round of gossip.
+ max-delta-elements = 3000
+
+ # When a message is published to a topic with no subscribers send it to the dead letters.
+ send-to-dead-letters-when-no-subscribers = on
+
+ # The id of the dispatcher to use for DistributedPubSubMediator actors.
+ # If specified you need to define the settings of the actual dispatcher.
+ use-dispatcher = "akka.actor.internal-dispatcher"
+}
+# //#pub-sub-ext-config
+
+# Protobuf serializer for cluster DistributedPubSubMeditor messages
+akka.actor {
+ serializers {
+ akka-pubsub = "akka.cluster.pubsub.protobuf.DistributedPubSubMessageSerializer"
+ }
+ serialization-bindings {
+ "akka.cluster.pubsub.DistributedPubSubMessage" = akka-pubsub
+ "akka.cluster.pubsub.DistributedPubSubMediator$Internal$SendToOneSubscriber" = akka-pubsub
+ }
+ serialization-identifiers {
+ "akka.cluster.pubsub.protobuf.DistributedPubSubMessageSerializer" = 9
+ }
+}
+
+
+# //#receptionist-ext-config
+# Settings for the ClusterClientReceptionist extension
+akka.cluster.client.receptionist {
+ # Actor name of the ClusterReceptionist actor, /system/receptionist
+ name = receptionist
+
+ # Start the receptionist on members tagged with this role.
+ # All members are used if undefined or empty.
+ role = ""
+
+ # The receptionist will send this number of contact points to the client
+ number-of-contacts = 3
+
+ # The actor that tunnel response messages to the client will be stopped
+ # after this time of inactivity.
+ response-tunnel-receive-timeout = 30s
+
+ # The id of the dispatcher to use for ClusterReceptionist actors.
+ # If specified you need to define the settings of the actual dispatcher.
+ use-dispatcher = "akka.actor.internal-dispatcher"
+
+ # How often failure detection heartbeat messages should be received for
+ # each ClusterClient
+ heartbeat-interval = 2s
+
+ # Number of potentially lost/delayed heartbeats that will be
+ # accepted before considering it to be an anomaly.
+ # The ClusterReceptionist is using the akka.remote.DeadlineFailureDetector, which
+ # will trigger if there are no heartbeats within the duration
+ # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
+ # the default settings.
+ acceptable-heartbeat-pause = 13s
+
+ # Failure detection checking interval for checking all ClusterClients
+ failure-detection-interval = 2s
+}
+# //#receptionist-ext-config
+
+# //#cluster-client-config
+# Settings for the ClusterClient
+akka.cluster.client {
+ # Actor paths of the ClusterReceptionist actors on the servers (cluster nodes)
+ # that the client will try to contact initially. It is mandatory to specify
+ # at least one initial contact.
+ # Comma separated full actor paths defined by a string on the form of
+ # "akka://system@hostname:port/system/receptionist"
+ initial-contacts = []
+
+ # Interval at which the client retries to establish contact with one of
+ # ClusterReceptionist on the servers (cluster nodes)
+ establishing-get-contacts-interval = 3s
+
+ # Interval at which the client will ask the ClusterReceptionist for
+ # new contact points to be used for next reconnect.
+ refresh-contacts-interval = 60s
+
+ # How often failure detection heartbeat messages should be sent
+ heartbeat-interval = 2s
+
+ # Number of potentially lost/delayed heartbeats that will be
+ # accepted before considering it to be an anomaly.
+ # The ClusterClient is using the akka.remote.DeadlineFailureDetector, which
+ # will trigger if there are no heartbeats within the duration
+ # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
+ # the default settings.
+ acceptable-heartbeat-pause = 13s
+
+ # If connection to the receptionist is not established the client will buffer
+ # this number of messages and deliver them the connection is established.
+ # When the buffer is full old messages will be dropped when new messages are sent
+ # via the client. Use 0 to disable buffering, i.e. messages will be dropped
+ # immediately if the location of the singleton is unknown.
+ # Maximum allowed buffer size is 10000.
+ buffer-size = 1000
+
+ # If connection to the receiptionist is lost and the client has not been
+ # able to acquire a new connection for this long the client will stop itself.
+ # This duration makes it possible to watch the cluster client and react on a more permanent
+ # loss of connection with the cluster, for example by accessing some kind of
+ # service registry for an updated set of initial contacts to start a new cluster client with.
+ # If this is not wanted it can be set to "off" to disable the timeout and retry
+ # forever.
+ reconnect-timeout = off
+}
+# //#cluster-client-config
+
+# Protobuf serializer for ClusterClient messages
+akka.actor {
+ serializers {
+ akka-cluster-client = "akka.cluster.client.protobuf.ClusterClientMessageSerializer"
+ }
+ serialization-bindings {
+ "akka.cluster.client.ClusterClientMessage" = akka-cluster-client
+ }
+ serialization-identifiers {
+ "akka.cluster.client.protobuf.ClusterClientMessageSerializer" = 15
+ }
+}
+
+# //#singleton-config
+akka.cluster.singleton {
+ # The actor name of the child singleton actor.
+ singleton-name = "singleton"
+
+ # Singleton among the nodes tagged with specified role.
+ # If the role is not specified it's a singleton among all nodes in the cluster.
+ role = ""
+
+ # When a node is becoming oldest it sends hand-over request to previous oldest,
+ # that might be leaving the cluster. This is retried with this interval until
+ # the previous oldest confirms that the hand over has started or the previous
+ # oldest member is removed from the cluster (+ akka.cluster.down-removal-margin).
+ hand-over-retry-interval = 1s
+
+ # The number of retries are derived from hand-over-retry-interval and
+ # akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin),
+ # but it will never be less than this property.
+ # After the hand over retries and it's still not able to exchange the hand over messages
+ # with the previous oldest it will restart itself by throwing ClusterSingletonManagerIsStuck,
+ # to start from a clean state. After that it will still not start the singleton instance
+ # until the previous oldest node has been removed from the cluster.
+ # On the other side, on the previous oldest node, the same number of retries - 3 are used
+ # and after that the singleton instance is stopped.
+ # For large clusters it might be necessary to increase this to avoid too early timeouts while
+ # gossip dissemination of the Leaving to Exiting phase occurs. For normal leaving scenarios
+ # it will not be a quicker hand over by reducing this value, but in extreme failure scenarios
+ # the recovery might be faster.
+ min-number-of-hand-over-retries = 15
+
+ # Config path of the lease to be taken before creating the singleton actor
+ # if the lease is lost then the actor is restarted and it will need to re-acquire the lease
+ # the default is no lease
+ use-lease = ""
+
+ # The interval between retries for acquiring the lease
+ lease-retry-interval = 5s
+}
+# //#singleton-config
+
+# //#singleton-proxy-config
+akka.cluster.singleton-proxy {
+ # The actor name of the singleton actor that is started by the ClusterSingletonManager
+ singleton-name = ${akka.cluster.singleton.singleton-name}
+
+ # The role of the cluster nodes where the singleton can be deployed.
+ # Corresponding to the role used by the `ClusterSingletonManager`. If the role is not
+ # specified it's a singleton among all nodes in the cluster, and the `ClusterSingletonManager`
+ # must then also be configured in same way.
+ role = ""
+
+ # Interval at which the proxy will try to resolve the singleton instance.
+ singleton-identification-interval = 1s
+
+ # If the location of the singleton is unknown the proxy will buffer this
+ # number of messages and deliver them when the singleton is identified.
+ # When the buffer is full old messages will be dropped when new messages are
+ # sent via the proxy.
+ # Use 0 to disable buffering, i.e. messages will be dropped immediately if
+ # the location of the singleton is unknown.
+ # Maximum allowed buffer size is 10000.
+ buffer-size = 1000
+}
+# //#singleton-proxy-config
+
+# Serializer for cluster ClusterSingleton messages
+akka.actor {
+ serializers {
+ akka-singleton = "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer"
+ }
+ serialization-bindings {
+ "akka.cluster.singleton.ClusterSingletonMessage" = akka-singleton
+ }
+ serialization-identifiers {
+ "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer" = 14
+ }
+}
\ No newline at end of file
--- /dev/null
+############################################
+# Akka Cluster Typed Reference Config File #
+############################################
+
+# This is the reference config file that contains all the default settings.
+# Make your edits/overrides in your application.conf.
+
+akka.cluster.typed.receptionist {
+ # Updates with Distributed Data are done with this consistency level.
+ # Possible values: local, majority, all, 2, 3, 4 (n)
+ write-consistency = local
+
+ # Period task to remove actor references that are hosted by removed nodes,
+ # in case of abrupt termination.
+ pruning-interval = 3 s
+
+ # The periodic task to remove actor references that are hosted by removed nodes
+ # will only remove entries older than this duration. The reason for this
+ # is to avoid removing entries of nodes that haven't been visible as joining.
+ prune-removed-older-than = 60 s
+
+ # Shard the services over this many Distributed Data keys, with large amounts of different
+ # service keys storing all of them in the same Distributed Data entry would lead to large updates
+ # etc. instead the keys are sharded across this number of keys. This must be the same on all nodes
+ # in a cluster, changing it requires a full cluster restart (stopping all nodes before starting them again)
+ distributed-key-count = 5
+
+ # Settings for the Distributed Data replicator used by Receptionist.
+ # Same layout as akka.cluster.distributed-data.
+ distributed-data = ${akka.cluster.distributed-data}
+ # make sure that by default it's for all roles (Play loads config in different way)
+ distributed-data.role = ""
+}
+
+akka.cluster.ddata.typed {
+ # The timeout to use for ask operations in ReplicatorMessageAdapter.
+ # This should be longer than the timeout given in Replicator.WriteConsistency and
+ # Replicator.ReadConsistency. The replicator will always send a reply within those
+ # timeouts so the unexpected ask timeout should not occur, but for cleanup in a
+ # failure situation it must still exist.
+ # If askUpdate, askGet or askDelete takes longer then this timeout a
+ # java.util.concurrent.TimeoutException will be thrown by the requesting actor and
+ # may be handled by supervision.
+ replicator-message-adapter-unexpected-ask-timeout = 20 s
+}
+
+akka {
+ actor {
+ serialization-identifiers {
+ "akka.cluster.typed.internal.AkkaClusterTypedSerializer" = 28
+ "akka.cluster.typed.internal.delivery.ReliableDeliverySerializer" = 36
+ }
+ serializers {
+ typed-cluster = "akka.cluster.typed.internal.AkkaClusterTypedSerializer"
+ reliable-delivery = "akka.cluster.typed.internal.delivery.ReliableDeliverySerializer"
+ }
+ serialization-bindings {
+ "akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster
+ "akka.actor.typed.internal.pubsub.TopicImpl$MessagePublished" = typed-cluster
+ "akka.actor.typed.delivery.internal.DeliverySerializable" = reliable-delivery
+ }
+ }
+ cluster.configuration-compatibility-check.checkers {
+ receptionist = "akka.cluster.typed.internal.receptionist.ClusterReceptionistConfigCompatChecker"
+ }
+}
--- /dev/null
+##############################################
+# Akka Distributed DataReference Config File #
+##############################################
+
+# This is the reference config file that contains all the default settings.
+# Make your edits/overrides in your application.conf.
+
+
+#//#distributed-data
+# Settings for the DistributedData extension
+akka.cluster.distributed-data {
+ # Actor name of the Replicator actor, /system/ddataReplicator
+ name = ddataReplicator
+
+ # Replicas are running on members tagged with this role.
+ # All members are used if undefined or empty.
+ role = ""
+
+ # How often the Replicator should send out gossip information
+ gossip-interval = 2 s
+
+ # How often the subscribers will be notified of changes, if any
+ notify-subscribers-interval = 500 ms
+
+ # Logging of data with payload size in bytes larger than
+ # this value. Maximum detected size per key is logged once,
+ # with an increase threshold of 10%.
+ # It can be disabled by setting the property to off.
+ log-data-size-exceeding = 10 KiB
+
+ # Maximum number of entries to transfer in one round of gossip exchange when
+ # synchronizing the replicas. Next chunk will be transferred in next round of gossip.
+ # The actual number of data entries in each Gossip message is dynamically
+ # adjusted to not exceed the maximum remote message size (maximum-frame-size).
+ max-delta-elements = 500
+
+ # The id of the dispatcher to use for Replicator actors.
+ # If specified you need to define the settings of the actual dispatcher.
+ use-dispatcher = "akka.actor.internal-dispatcher"
+
+ # How often the Replicator checks for pruning of data associated with
+ # removed cluster nodes. If this is set to 'off' the pruning feature will
+ # be completely disabled.
+ pruning-interval = 120 s
+
+ # How long time it takes to spread the data to all other replica nodes.
+ # This is used when initiating and completing the pruning process of data associated
+ # with removed cluster nodes. The time measurement is stopped when any replica is
+ # unreachable, but it's still recommended to configure this with certain margin.
+ # It should be in the magnitude of minutes even though typical dissemination time
+ # is shorter (grows logarithmic with number of nodes). There is no advantage of
+ # setting this too low. Setting it to large value will delay the pruning process.
+ max-pruning-dissemination = 300 s
+
+ # The markers of that pruning has been performed for a removed node are kept for this
+ # time and thereafter removed. If and old data entry that was never pruned is somehow
+ # injected and merged with existing data after this time the value will not be correct.
+ # This would be possible (although unlikely) in the case of a long network partition.
+ # It should be in the magnitude of hours. For durable data it is configured by
+ # 'akka.cluster.distributed-data.durable.pruning-marker-time-to-live'.
+ pruning-marker-time-to-live = 6 h
+
+ # Serialized Write and Read messages are cached when they are sent to
+ # several nodes. If no further activity they are removed from the cache
+ # after this duration.
+ serializer-cache-time-to-live = 10s
+
+ # Update and Get operations are sent to oldest nodes first.
+ # This is useful together with Cluster Singleton, which is running on oldest nodes.
+ prefer-oldest = off
+
+ # Settings for delta-CRDT
+ delta-crdt {
+ # enable or disable delta-CRDT replication
+ enabled = on
+
+ # Some complex deltas grow in size for each update and above this
+ # threshold such deltas are discarded and sent as full state instead.
+ # This is number of elements or similar size hint, not size in bytes.
+ max-delta-size = 50
+ }
+
+ durable {
+ # List of keys that are durable. Prefix matching is supported by using * at the
+ # end of a key.
+ keys = []
+
+ # The markers of that pruning has been performed for a removed node are kept for this
+ # time and thereafter removed. If and old data entry that was never pruned is
+ # injected and merged with existing data after this time the value will not be correct.
+ # This would be possible if replica with durable data didn't participate in the pruning
+ # (e.g. it was shutdown) and later started after this time. A durable replica should not
+ # be stopped for longer time than this duration and if it is joining again after this
+ # duration its data should first be manually removed (from the lmdb directory).
+ # It should be in the magnitude of days. Note that there is a corresponding setting
+ # for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'.
+ pruning-marker-time-to-live = 10 d
+
+ # Fully qualified class name of the durable store actor. It must be a subclass
+ # of akka.actor.Actor and handle the protocol defined in
+ # akka.cluster.ddata.DurableStore. The class must have a constructor with
+ # com.typesafe.config.Config parameter.
+ store-actor-class = akka.cluster.ddata.LmdbDurableStore
+
+ use-dispatcher = akka.cluster.distributed-data.durable.pinned-store
+
+ pinned-store {
+ executor = thread-pool-executor
+ type = PinnedDispatcher
+ }
+
+ # Config for the LmdbDurableStore
+ lmdb {
+ # Directory of LMDB file. There are two options:
+ # 1. A relative or absolute path to a directory that ends with 'ddata'
+ # the full name of the directory will contain name of the ActorSystem
+ # and its remote port.
+ # 2. Otherwise the path is used as is, as a relative or absolute path to
+ # a directory.
+ #
+ # When running in production you may want to configure this to a specific
+ # path (alt 2), since the default directory contains the remote port of the
+ # actor system to make the name unique. If using a dynamically assigned
+ # port (0) it will be different each time and the previously stored data
+ # will not be loaded.
+ dir = "ddata"
+
+ # Size in bytes of the memory mapped file.
+ map-size = 100 MiB
+
+ # Accumulate changes before storing improves performance with the
+ # risk of losing the last writes if the JVM crashes.
+ # The interval is by default set to 'off' to write each update immediately.
+ # Enabling write behind by specifying a duration, e.g. 200ms, is especially
+ # efficient when performing many writes to the same key, because it is only
+ # the last value for each key that will be serialized and stored.
+ # write-behind-interval = 200 ms
+ write-behind-interval = off
+ }
+ }
+
+}
+#//#distributed-data
+
+# Protobuf serializer for cluster DistributedData messages
+akka.actor {
+ serializers {
+ akka-data-replication = "akka.cluster.ddata.protobuf.ReplicatorMessageSerializer"
+ akka-replicated-data = "akka.cluster.ddata.protobuf.ReplicatedDataSerializer"
+ }
+ serialization-bindings {
+ "akka.cluster.ddata.Replicator$ReplicatorMessage" = akka-data-replication
+ "akka.cluster.ddata.ReplicatedDataSerialization" = akka-replicated-data
+ }
+ serialization-identifiers {
+ "akka.cluster.ddata.protobuf.ReplicatedDataSerializer" = 11
+ "akka.cluster.ddata.protobuf.ReplicatorMessageSerializer" = 12
+ }
+}
include "actor_reference.conf"
+include "actor_typed_reference.conf"
include "cluster_reference.conf"
+include "cluster_tools_reference.conf"
+include "cluster_typed_reference.conf"
+include "distributed_data_reference.conf"
include "persistence_reference.conf"
include "remote_reference.conf"
include "stream_reference.conf"
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.lmdbjava</groupId>
+ <artifactId>lmdbjava</artifactId>
+ <version>0.7.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.github.jnr</groupId>
+ <artifactId>jffi</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.github.jnr</groupId>
+ <artifactId>jnr-ffi</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.github.jnr</groupId>
+ <artifactId>jnr-constants</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<artifactId>cds-mgmt-api</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>eos-dom-akka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Toaster -->
<dependency>
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor-testkit-typed_2.13</artifactId>
+ <version>2.6.12</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor-typed_2.13</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-slf4j_2.13</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence-tck_2.13</artifactId>
<bundle>mvn:org.agrona/agrona/1.8.0</bundle>
<bundle>mvn:org.opendaylight.controller/repackaged-akka/${project.version}</bundle>
<bundle>mvn:org.reactivestreams/reactive-streams/1.0.3</bundle>
+ <feature>wrap</feature>
+ <bundle>wrap:mvn:org.lmdbjava/lmdbjava/0.7.0</bundle>
</feature>
</features>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-distributed-eos</artifactId>
+ <artifactId>eos-dom-akka</artifactId>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>bundle-parent</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ <relativePath>../../../bundle-parent</relativePath>
+ </parent>
+
+ <artifactId>eos-dom-akka</artifactId>
+ <packaging>bundle</packaging>
+
+ <properties>
+ <odlparent.dependency.enforce>true</odlparent.dependency.enforce>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>repackaged-akka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-clustering-commons</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-eos-common-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-eos-dom-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>concepts</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.component.annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.guicedee.services</groupId>
+ <artifactId>javax.inject</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor-testkit-typed_2.13</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
+ </dependencies>
+</project>
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Scheduler;
+import akka.actor.typed.javadsl.Adapter;
+import akka.actor.typed.javadsl.AskPattern;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.cluster.typed.Cluster;
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
+import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
+import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
+import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DOMEntityOwnershipService implementation backed by native Akka clustering constructs. We use distributed-data
+ * to track all registered candidates and cluster-singleton to maintain a single cluster-wide authority which selects
+ * the appropriate owners.
+ */
+@Singleton
+@Component(immediate = true, service = DOMEntityOwnershipService.class)
+public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
+ private static final String DATACENTER_PREFIX = "dc";
+
+ private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
+ private final String localCandidate;
+ private final Scheduler scheduler;
+
+ private final ActorRef<BootstrapCommand> bootstrap;
+ private final RunningContext runningContext;
+ private final ActorRef<CandidateRegistryCommand> candidateRegistry;
+ private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
+ private final ActorRef<StateCheckerCommand> ownerStateChecker;
+
+ @VisibleForTesting
+ AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException {
+ final var typedActorSystem = Adapter.toTyped(actorSystem);
+
+ scheduler = typedActorSystem.scheduler();
+ localCandidate = Cluster.get(typedActorSystem).selfMember().getRoles().stream()
+ .filter(role -> !role.contains(DATACENTER_PREFIX))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+
+ bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(context -> EOSMain.create()), "EOSBootstrap");
+
+ final CompletionStage<RunningContext> ask = AskPattern.ask(bootstrap,
+ GetRunningContext::new, Duration.ofSeconds(5), scheduler);
+ runningContext = ask.toCompletableFuture().get();
+
+ candidateRegistry = runningContext.getCandidateRegistry();
+ listenerRegistry = runningContext.getListenerRegistry();
+ ownerStateChecker = runningContext.getOwnerStateChecker();
+ }
+
+ @Inject
+ @Activate
+ public AkkaEntityOwnershipService(@Reference final ActorSystemProvider provider)
+ throws ExecutionException, InterruptedException {
+ this(provider.getActorSystem());
+ }
+
+ @PreDestroy
+ @Deactivate
+ @Override
+ public void close() throws InterruptedException, ExecutionException {
+ AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get();
+ }
+
+ @Override
+ public DOMEntityOwnershipCandidateRegistration registerCandidate(final DOMEntity entity)
+ throws CandidateAlreadyRegisteredException {
+ if (!registeredEntities.add(entity)) {
+ throw new CandidateAlreadyRegisteredException(entity);
+ }
+
+ final RegisterCandidate msg = new RegisterCandidate(entity, localCandidate);
+ LOG.debug("Registering candidate with message: {}", msg);
+ candidateRegistry.tell(msg);
+
+ return new CandidateRegistration(entity, this);
+ }
+
+ @Override
+ public DOMEntityOwnershipListenerRegistration registerListener(final String entityType,
+ final DOMEntityOwnershipListener listener) {
+ LOG.debug("Registering listener {} for type {}", listener, entityType);
+ listenerRegistry.tell(new RegisterListener(entityType, listener));
+
+ return new ListenerRegistration(listener, entityType, this);
+ }
+
+ @Override
+ public Optional<EntityOwnershipState> getOwnershipState(final DOMEntity entity) {
+ LOG.debug("Retrieving ownership state for {}", entity);
+
+ final CompletionStage<GetOwnershipStateReply> result = AskPattern.ask(ownerStateChecker,
+ replyTo -> new GetOwnershipState(entity, replyTo),
+ Duration.ofSeconds(5), scheduler);
+
+ final GetOwnershipStateReply reply;
+ try {
+ reply = result.toCompletableFuture().get();
+ } catch (final InterruptedException | ExecutionException exception) {
+ LOG.warn("Failed to retrieve ownership state for {}", entity, exception);
+ return Optional.empty();
+ }
+
+ return Optional.ofNullable(reply.getOwnershipState());
+ }
+
+ @Override
+ public boolean isCandidateRegistered(final DOMEntity forEntity) {
+ return registeredEntities.contains(forEntity);
+ }
+
+ void unregisterCandidate(final DOMEntity entity) {
+ LOG.debug("Unregistering candidate for {}", entity);
+
+ if (registeredEntities.remove(entity)) {
+ candidateRegistry.tell(new UnregisterCandidate(entity, localCandidate));
+ }
+ }
+
+ void unregisterListener(final String entityType, final DOMEntityOwnershipListener listener) {
+ LOG.debug("Unregistering listener {} for type {}", listener, entityType);
+
+ listenerRegistry.tell(new UnregisterListener(entityType, listener));
+ }
+
+ @VisibleForTesting
+ RunningContext getRunningContext() {
+ return runningContext;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static java.util.Objects.requireNonNull;
+
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+
+final class CandidateRegistration extends AbstractObjectRegistration<DOMEntity>
+ implements DOMEntityOwnershipCandidateRegistration {
+ private final AkkaEntityOwnershipService service;
+
+ CandidateRegistration(final DOMEntity instance, final AkkaEntityOwnershipService service) {
+ super(instance);
+ this.service = requireNonNull(service);
+ }
+
+ @Override
+ protected void removeRegistration() {
+ service.unregisterCandidate(getInstance());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+
+final class ListenerRegistration extends AbstractObjectRegistration<DOMEntityOwnershipListener>
+ implements DOMEntityOwnershipListenerRegistration {
+ private final AkkaEntityOwnershipService service;
+ private final @NonNull String entityType;
+
+ ListenerRegistration(final DOMEntityOwnershipListener listener, final String entityType,
+ final AkkaEntityOwnershipService service) {
+ super(listener);
+ this.entityType = requireNonNull(entityType);
+ this.service = requireNonNull(service);
+ }
+
+ @Override
+ public String getEntityType() {
+ return entityType;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ service.unregisterListener(entityType, getInstance());
+ }
+
+ @Override
+ protected MoreObjects.ToStringHelper addToStringAttributes(final MoreObjects.ToStringHelper toStringHelper) {
+ return toStringHelper.add("entityType", entityType);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.bootstrap;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.typed.Cluster;
+import akka.cluster.typed.ClusterSingleton;
+import akka.cluster.typed.SingletonActor;
+import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
+import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
+import org.opendaylight.controller.eos.akka.owner.checker.OwnerStateChecker;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.OwnerSyncer;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.EntityTypeListenerRegistry;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.yangtools.yang.common.Empty;
+
+public final class EOSMain extends AbstractBehavior<BootstrapCommand> {
+ private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
+ private final ActorRef<CandidateRegistryCommand> candidateRegistry;
+ private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+ private final ActorRef<StateCheckerCommand> ownerStateChecker;
+
+ private EOSMain(final ActorContext<BootstrapCommand> context) {
+ super(context);
+
+ final String role = Cluster.get(context.getSystem()).selfMember().getRoles().iterator().next();
+
+ listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry");
+ candidateRegistry = context.spawn(CandidateRegistry.create(), "CandidateRegistry");
+ ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker");
+
+ final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
+ // start the initial sync behavior that switches to the regular one after syncing
+ ownerSupervisor = clusterSingleton.init(SingletonActor.of(OwnerSyncer.create(), "OwnerSupervisor"));
+ }
+
+ public static Behavior<BootstrapCommand> create() {
+ return Behaviors.setup(EOSMain::new);
+ }
+
+ @Override
+ public Receive<BootstrapCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(GetRunningContext.class, this::onGetRunningContext)
+ .onMessage(Terminate.class, this::onTerminate)
+ .build();
+ }
+
+ private Behavior<BootstrapCommand> onGetRunningContext(final GetRunningContext request) {
+ request.getReplyTo().tell(
+ new RunningContext(listenerRegistry, candidateRegistry, ownerStateChecker, ownerSupervisor));
+ return this;
+ }
+
+ private Behavior<BootstrapCommand> onTerminate(final Terminate request) {
+ request.getReplyTo().tell(Empty.getInstance());
+ return Behaviors.stopped();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.bootstrap.command;
+
+public abstract class BootstrapCommand {
+ BootstrapCommand() {
+ // Hidden on purpose
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.bootstrap.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+
+public final class GetRunningContext extends BootstrapCommand {
+ private final ActorRef<RunningContext> replyTo;
+
+ public GetRunningContext(final ActorRef<RunningContext> replyTo) {
+ this.replyTo = requireNonNull(replyTo);
+ }
+
+ public ActorRef<RunningContext> getReplyTo() {
+ return replyTo;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.bootstrap.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+
+public final class RunningContext extends BootstrapCommand {
+ private final @NonNull ActorRef<TypeListenerRegistryCommand> listenerRegistry;
+ private final @NonNull ActorRef<CandidateRegistryCommand> candidateRegistry;
+ private final @NonNull ActorRef<StateCheckerCommand> ownerStateChecker;
+ private final @NonNull ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+
+ public RunningContext(final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
+ final ActorRef<CandidateRegistryCommand> candidateRegistry,
+ final ActorRef<StateCheckerCommand> ownerStateChecker,
+ final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
+ this.listenerRegistry = requireNonNull(listenerRegistry);
+ this.candidateRegistry = requireNonNull(candidateRegistry);
+ this.ownerStateChecker = requireNonNull(ownerStateChecker);
+ this.ownerSupervisor = requireNonNull(ownerSupervisor);
+ }
+
+ public @NonNull ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
+ return listenerRegistry;
+ }
+
+ public @NonNull ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
+ return candidateRegistry;
+ }
+
+ public @NonNull ActorRef<StateCheckerCommand> getOwnerStateChecker() {
+ return ownerStateChecker;
+ }
+
+ public @NonNull ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
+ return ownerSupervisor;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.bootstrap.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yangtools.yang.common.Empty;
+
+public final class Terminate extends BootstrapCommand {
+ private final @NonNull ActorRef<Empty> replyTo;
+
+ public Terminate(final ActorRef<Empty> replyTo) {
+ this.replyTo = requireNonNull(replyTo);
+ }
+
+ public @NonNull ActorRef<Empty> getReplyTo() {
+ return replyTo;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator.Get;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetFailure;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetSuccess;
+import akka.cluster.ddata.typed.javadsl.Replicator.NotFound;
+import akka.cluster.ddata.typed.javadsl.Replicator.ReadMajority;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.InternalGetReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class OwnerStateChecker extends AbstractBehavior<StateCheckerCommand> {
+ private static final Logger LOG = LoggerFactory.getLogger(OwnerStateChecker.class);
+ private static final Duration GET_OWNERSHIP_TIMEOUT = Duration.ofSeconds(5);
+ private static final Duration UNEXPECTED_ASK_TIMEOUT = Duration.ofSeconds(5);
+
+ private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> replicatorAdapter;
+ private final String localMember;
+
+ private OwnerStateChecker(final ActorContext<StateCheckerCommand> context, final String localMember) {
+ super(context);
+ this.localMember = requireNonNull(localMember);
+ replicatorAdapter = new ReplicatorMessageAdapter<>(context,
+ DistributedData.get(context.getSystem()).replicator(), UNEXPECTED_ASK_TIMEOUT);
+ }
+
+ public static Behavior<StateCheckerCommand> create(final String localMember) {
+ return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember));
+ }
+
+ @Override
+ public Receive<StateCheckerCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(GetOwnershipState.class, this::onGetOwnershipState)
+ .onMessage(InternalGetReply.class, this::respondWithState)
+ .build();
+ }
+
+ private Behavior<StateCheckerCommand> onGetOwnershipState(final GetOwnershipState message) {
+ replicatorAdapter.askGet(
+ askReplyTo -> new Get<>(
+ new LWWRegisterKey<>(message.getEntity().toString()),
+ new ReadMajority(GET_OWNERSHIP_TIMEOUT),
+ askReplyTo),
+ reply -> new InternalGetReply(reply, message.getEntity(), message.getReplyTo()));
+ return this;
+ }
+
+ private Behavior<StateCheckerCommand> respondWithState(final InternalGetReply reply) {
+ final GetResponse<LWWRegister<String>> response = reply.getResponse();
+ if (response instanceof NotFound) {
+ LOG.debug("Data for owner not found, most likely no owner has beed picked for entity: {}",
+ reply.getEntity());
+ reply.getReplyTo().tell(new GetOwnershipStateReply(null));
+ } else if (response instanceof GetFailure) {
+ LOG.warn("Failure retrieving data for entity: {}", reply.getEntity());
+ reply.getReplyTo().tell(new GetOwnershipStateReply(null));
+ } else if (response instanceof GetSuccess) {
+ final String owner = ((GetSuccess<LWWRegister<String>>) response).get(response.key()).getValue();
+ LOG.debug("Data for owner received. {}, owner: {}", response, owner);
+
+ final boolean isOwner = localMember.equals(owner);
+ final boolean hasOwner = !owner.isEmpty();
+
+ reply.getReplyTo().tell(new GetOwnershipStateReply(EntityOwnershipState.from(isOwner, hasOwner)));
+ }
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class GetOwnershipState extends StateCheckerCommand {
+ private final @NonNull DOMEntity entity;
+ private final @NonNull ActorRef<GetOwnershipStateReply> replyTo;
+
+ public GetOwnershipState(final DOMEntity entity, final ActorRef<GetOwnershipStateReply> replyTo) {
+ this.entity = requireNonNull(entity);
+ this.replyTo = requireNonNull(replyTo);
+ }
+
+ public @NonNull DOMEntity getEntity() {
+ return entity;
+ }
+
+ public @NonNull ActorRef<GetOwnershipStateReply> getReplyTo() {
+ return replyTo;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
+
+public final class GetOwnershipStateReply extends StateCheckerReply {
+ private final @Nullable EntityOwnershipState ownershipState;
+
+ public GetOwnershipStateReply(final EntityOwnershipState ownershipState) {
+ this.ownershipState = ownershipState;
+ }
+
+ public @Nullable EntityOwnershipState getOwnershipState() {
+ return ownershipState;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class InternalGetReply extends StateCheckerCommand {
+ private final @NonNull GetResponse<LWWRegister<String>> response;
+ private final @NonNull ActorRef<GetOwnershipStateReply> replyTo;
+ private final @NonNull DOMEntity entity;
+
+ public InternalGetReply(final GetResponse<LWWRegister<String>> response, final DOMEntity entity,
+ final ActorRef<GetOwnershipStateReply> replyTo) {
+ this.response = requireNonNull(response);
+ this.entity = requireNonNull(entity);
+ this.replyTo = requireNonNull(replyTo);
+ }
+
+ public @NonNull GetResponse<LWWRegister<String>> getResponse() {
+ return response;
+ }
+
+ public @NonNull DOMEntity getEntity() {
+ return entity;
+ }
+
+ public @NonNull ActorRef<GetOwnershipStateReply> getReplyTo() {
+ return replyTo;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+public abstract class StateCheckerCommand {
+ StateCheckerCommand() {
+ // Hidden on purpose
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+public abstract class StateCheckerReply {
+ StateCheckerReply() {
+ // Hidden on purpose
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.SelfUniqueAddress;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import akka.cluster.typed.Cluster;
+import akka.cluster.typed.Subscribe;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate
+ * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
+ * On cluster up/down etc. events the owners are reassigned if possible.
+ */
+public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
+ private static final String DATACENTER_PREFIX = "dc";
+
+ private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
+
+ // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
+ // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
+ // running in a cluster-singleton
+ private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
+
+ private final Cluster cluster;
+ private final SelfUniqueAddress node;
+
+ private final Set<String> activeMembers;
+
+ // currently registered candidates
+ private final Map<DOMEntity, Set<String>> currentCandidates;
+ // current owners
+ private final Map<DOMEntity, String> currentOwners;
+ // reverse lookup of owner to entity
+ private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
+
+ private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
+ final Map<DOMEntity, Set<String>> currentCandidates,
+ final Map<DOMEntity, String> currentOwners) {
+ super(context);
+
+ final DistributedData distributedData = DistributedData.get(context.getSystem());
+ final ActorRef<Replicator.Command> replicator = distributedData.replicator();
+
+ this.cluster = Cluster.get(context.getSystem());
+ this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+
+ this.node = distributedData.selfUniqueAddress();
+ this.activeMembers = getActiveMembers(cluster);
+
+ this.currentCandidates = currentCandidates;
+ this.currentOwners = currentOwners;
+
+ for (final Map.Entry<DOMEntity, String> entry : currentOwners.entrySet()) {
+ ownerToEntity.put(entry.getValue(), entry.getKey());
+ }
+
+ // check whether we have any unreachable/missing owners
+ reassignUnreachableOwners();
+ assignMissingOwners();
+
+ final ActorRef<ClusterEvent.MemberEvent> memberEventAdapter =
+ context.messageAdapter(ClusterEvent.MemberEvent.class, event -> {
+ if (event instanceof ClusterEvent.MemberUp) {
+ return new MemberUpEvent(event.member().address(), event.member().getRoles());
+ } else {
+ return new MemberDownEvent(event.member().address(), event.member().getRoles());
+ }
+ });
+ cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class));
+
+ final ActorRef<ClusterEvent.ReachabilityEvent> reachabilityEventAdapter =
+ context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> {
+ if (event instanceof ClusterEvent.ReachableMember) {
+ return new MemberReachableEvent(event.member().address(), event.member().getRoles());
+ } else {
+ return new MemberUnreachableEvent(event.member().address(), event.member().getRoles());
+ }
+ });
+ cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
+
+ new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
+ Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
+
+ LOG.debug("Owner Supervisor started");
+ }
+
+ public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
+ final Map<DOMEntity, String> currentOwners) {
+ return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners));
+ }
+
+ @Override
+ public Receive<OwnerSupervisorCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
+ .onMessage(MemberUpEvent.class, this::onPeerUp)
+ .onMessage(MemberDownEvent.class, this::onPeerDown)
+ .onMessage(MemberReachableEvent.class, this::onPeerReachable)
+ .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
+ .build();
+ }
+
+ private void reassignUnreachableOwners() {
+ final Set<String> ownersToReassign = new HashSet<>();
+ for (final String owner : ownerToEntity.keys()) {
+ if (!activeMembers.contains(owner)) {
+ ownersToReassign.add(owner);
+ }
+ }
+
+ for (final String owner : ownersToReassign) {
+ reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)));
+ }
+ }
+
+ private void assignMissingOwners() {
+ for (final Map.Entry<DOMEntity, Set<String>> entry : currentCandidates.entrySet()) {
+ if (!currentOwners.containsKey(entry.getKey())) {
+ assignOwnerFor(entry.getKey());
+ }
+ }
+ }
+
+ private Behavior<OwnerSupervisorCommand> onCandidatesChanged(final CandidatesChanged message) {
+ LOG.debug("onCandidatesChanged {}", message.getResponse());
+ if (message.getResponse() instanceof Replicator.Changed) {
+ final Replicator.Changed<ORMap<DOMEntity, ORSet<String>>> changed =
+ (Replicator.Changed<ORMap<DOMEntity, ORSet<String>>>) message.getResponse();
+ processCandidateChanges(changed.get(CandidateRegistry.KEY));
+ }
+ return this;
+ }
+
+ private void processCandidateChanges(final ORMap<DOMEntity, ORSet<String>> candidates) {
+ final Map<DOMEntity, ORSet<String>> entries = candidates.getEntries();
+ for (final Map.Entry<DOMEntity, ORSet<String>> entry : entries.entrySet()) {
+ processCandidatesFor(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void processCandidatesFor(final DOMEntity entity, final ORSet<String> receivedCandidates) {
+ LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements());
+
+ final Set<String> candidates = JavaConverters.asJava(receivedCandidates.elements());
+ // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with
+ // no owner
+ if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) {
+ LOG.debug("Candidates missing for entity: {} adding all candidates", entity);
+ currentCandidates.put(entity, new HashSet<>(candidates));
+
+ LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString());
+ assignOwnerFor(entity);
+
+ return;
+ }
+
+ final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
+ final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
+
+ LOG.debug("currently present candidates: {}", currentlyPresent);
+ LOG.debug("difference: {}", difference);
+
+ final List<String> ownersToReassign = new ArrayList<>();
+
+ // first add/remove candidates from entities
+ for (final String toCheck : difference) {
+ if (!currentlyPresent.contains(toCheck)) {
+ // add new candidate
+ LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
+ currentCandidates.get(entity).add(toCheck);
+
+ if (!currentOwners.containsKey(entity)) {
+ // might as well assign right away when we don't have an owner
+ assignOwnerFor(entity);
+ }
+
+ LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
+ continue;
+ }
+
+ if (!candidates.contains(toCheck)) {
+ // remove candidate
+ LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck);
+ currentCandidates.get(entity).remove(toCheck);
+ if (ownerToEntity.containsKey(toCheck)) {
+ ownersToReassign.add(toCheck);
+ }
+ }
+ }
+
+ // then reassign those that need new owners
+ for (final String toReassign : ownersToReassign) {
+ reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)));
+ }
+
+ if (currentCandidates.get(entity) == null) {
+ LOG.debug("Last candidate removed for {}", entity);
+ } else {
+ LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
+ }
+ }
+
+ private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities) {
+ LOG.debug("Reassigning owners for {}", entities);
+ for (final DOMEntity entity : entities) {
+
+ // only reassign owner for those entities that lost this candidate or is not reachable
+ if (!activeMembers.contains(oldOwner)
+ || !currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(oldOwner)) {
+ ownerToEntity.remove(oldOwner, entity);
+ assignOwnerFor(entity);
+ }
+ }
+ }
+
+ private void assignOwnerFor(final DOMEntity entity) {
+ final Set<String> candidatesForEntity = currentCandidates.get(entity);
+ if (candidatesForEntity.isEmpty()) {
+ LOG.debug("No candidates present for entity: {}", entity);
+ removeOwner(entity);
+ return;
+ }
+
+ String pickedCandidate = null;
+ for (final String candidate : candidatesForEntity) {
+ if (activeMembers.contains(candidate)) {
+ pickedCandidate = candidate;
+ break;
+ }
+ }
+ if (pickedCandidate == null) {
+ LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}",
+ entity, activeMembers, currentCandidates.get(entity));
+ // no candidate is reachable so only remove owner if necessary
+ removeOwner(entity);
+ return;
+ }
+ ownerToEntity.put(pickedCandidate, entity);
+
+ LOG.debug("Entity {} new owner: {}", entity, pickedCandidate);
+ currentOwners.put(entity, pickedCandidate);
+ writeNewOwner(entity, pickedCandidate);
+ }
+
+ private void removeOwner(final DOMEntity entity) {
+ if (currentOwners.containsKey(entity)) {
+ // assign empty owner to dd, as we cannot delete data for a key since that would prevent
+ // writes for the same key
+ currentOwners.remove(entity);
+
+ writeNewOwner(entity, "");
+ }
+ }
+
+ private void writeNewOwner(final DOMEntity entity, final String candidate) {
+ ownerReplicator.askUpdate(
+ askReplyTo -> new Replicator.Update<>(
+ new LWWRegisterKey<>(entity.toString()),
+ new LWWRegister<>(node.uniqueAddress(), candidate, 0),
+ Replicator.writeLocal(),
+ askReplyTo,
+ register -> register.withValue(node, candidate, clock)),
+ OwnerChanged::new);
+ }
+
+ private Behavior<OwnerSupervisorCommand> onPeerUp(final MemberUpEvent event) {
+ LOG.debug("Received MemberUp : {}", event);
+
+ handleReachableEvent(event.getRoles());
+ return this;
+ }
+
+ private Behavior<OwnerSupervisorCommand> onPeerReachable(final MemberReachableEvent event) {
+ LOG.debug("Received MemberReachable : {}", event);
+
+ handleReachableEvent(event.getRoles());
+ return this;
+ }
+
+ private void handleReachableEvent(final Set<String> roles) {
+ activeMembers.add(extractRole(roles));
+ assignMissingOwners();
+ }
+
+ private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
+ LOG.debug("Received MemberDown : {}", event);
+
+ handleUnreachableEvent(event.getRoles());
+ return this;
+ }
+
+ private Behavior<OwnerSupervisorCommand> onPeerUnreachable(final MemberUnreachableEvent event) {
+ LOG.debug("Received MemberUnreachable : {}", event);
+
+ handleUnreachableEvent(event.getRoles());
+ return this;
+ }
+
+ private void handleUnreachableEvent(final Set<String> roles) {
+ activeMembers.remove(extractRole(roles));
+ reassignUnreachableOwners();
+ }
+
+ private static Set<String> getActiveMembers(final Cluster cluster) {
+ final Set<String> activeMembers = new HashSet<>();
+ cluster.state().getMembers().forEach(member -> activeMembers.add(extractRole(member)));
+ activeMembers.removeAll(cluster.state().getUnreachable().stream()
+ .map(OwnerSupervisor::extractRole).collect(Collectors.toSet()));
+
+ return activeMembers;
+ }
+
+ private static String extractRole(final Member member) {
+ return extractRole(member.getRoles());
+ }
+
+ private static String extractRole(final Set<String> roles) {
+ return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
+ .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Behavior that retrieves current candidates/owners from distributed-data and switches to OwnerSupervisor when the
+ * sync has finished.
+ */
+public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand> {
+ private static final Logger LOG = LoggerFactory.getLogger(OwnerSyncer.class);
+
+ private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
+ private final Map<DOMEntity, Set<String>> currentCandidates = new HashMap<>();
+ private final Map<DOMEntity, String> currentOwners = new HashMap<>();
+
+ // String representation of Entity to DOMEntity
+ private final Map<String, DOMEntity> entityLookup = new HashMap<>();
+
+ private int toSync = -1;
+
+ private OwnerSyncer(final ActorContext<OwnerSupervisorCommand> context) {
+ super(context);
+ LOG.debug("Starting candidate and owner sync");
+
+ final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
+
+ this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+
+ new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
+ Duration.ofSeconds(5)).askGet(
+ askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
+ InitialCandidateSync::new);
+ }
+
+ public static Behavior<OwnerSupervisorCommand> create() {
+ return Behaviors.setup(OwnerSyncer::new);
+ }
+
+ @Override
+ public Receive<OwnerSupervisorCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(InitialCandidateSync.class, this::onInitialCandidateSync)
+ .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
+ .build();
+ }
+
+ private Behavior<OwnerSupervisorCommand> onInitialCandidateSync(final InitialCandidateSync rsp) {
+ final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = rsp.getResponse();
+ if (response instanceof Replicator.GetSuccess) {
+ return doInitialSync((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response);
+ } else if (response instanceof Replicator.NotFound) {
+ LOG.debug("No candidates found switching to supervisor");
+ return switchToSupervisor();
+ } else {
+ LOG.debug("Initial candidate sync failed, switching to supervisor. Sync reply: {}", response);
+ return switchToSupervisor();
+ }
+ }
+
+ private Behavior<OwnerSupervisorCommand> doInitialSync(
+ final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response) {
+
+ final ORMap<DOMEntity, ORSet<String>> candidates = response.get(CandidateRegistry.KEY);
+ candidates.getEntries().entrySet().forEach(entry -> {
+ currentCandidates.put(entry.getKey(), new HashSet<>(entry.getValue().getElements()));
+ });
+
+ toSync = candidates.keys().size();
+ for (final DOMEntity entity : candidates.keys().getElements()) {
+ entityLookup.put(entity.toString(), entity);
+
+ ownerReplicator.askGet(
+ askReplyTo -> new Replicator.Get<>(
+ new LWWRegisterKey<>(entity.toString()),
+ Replicator.readLocal(),
+ askReplyTo),
+ InitialOwnerSync::new);
+ }
+
+ return this;
+ }
+
+ private Behavior<OwnerSupervisorCommand> onInitialOwnerSync(final InitialOwnerSync rsp) {
+ final Replicator.GetResponse<LWWRegister<String>> response = rsp.getResponse();
+ if (response instanceof Replicator.GetSuccess) {
+ handleOwnerRsp((Replicator.GetSuccess<LWWRegister<String>>) response);
+ } else if (response instanceof Replicator.NotFound) {
+ handleNotFoundOwnerRsp((Replicator.NotFound<LWWRegister<String>>) response);
+ } else {
+ LOG.debug("Initial sync failed response: {}", response);
+ }
+
+ // count the responses, on last switch behaviors
+ toSync--;
+ if (toSync == 0) {
+ return switchToSupervisor();
+ }
+
+ return this;
+ }
+
+ private Behavior<OwnerSupervisorCommand> switchToSupervisor() {
+ LOG.debug("Initial sync done, switching to supervisor. candidates: {}, owners: {}",
+ currentCandidates, currentOwners);
+ return Behaviors.setup(ctx ->
+ OwnerSupervisor.create(currentCandidates, currentOwners));
+ }
+
+ private void handleOwnerRsp(final Replicator.GetSuccess<LWWRegister<String>> rsp) {
+ final DOMEntity entity = entityLookup.get(rsp.key().id());
+ final String owner = rsp.get(rsp.key()).getValue();
+
+ currentOwners.put(entity, owner);
+ }
+
+ private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
+ LOG.debug("Owner not found. {}", rsp);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class CandidatesChanged extends OwnerSupervisorCommand {
+ private final @NonNull SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response;
+
+ public CandidatesChanged(final SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> subscribeResponse) {
+ this.response = requireNonNull(subscribeResponse);
+ }
+
+ public @NonNull SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+ return response;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("response", response).toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class InitialCandidateSync extends OwnerSupervisorCommand {
+ private final @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> response;
+
+ public InitialCandidateSync(final GetResponse<ORMap<DOMEntity, ORSet<String>>> response) {
+ this.response = response;
+ }
+
+ public @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+ return response;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+
+public final class InitialOwnerSync extends OwnerSupervisorCommand {
+ private final @NonNull GetResponse<LWWRegister<String>> response;
+
+ public InitialOwnerSync(final GetResponse<LWWRegister<String>> response) {
+ this.response = requireNonNull(response);
+ }
+
+ public @NonNull GetResponse<LWWRegister<String>> getResponse() {
+ return response;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.Address;
+import com.google.common.base.MoreObjects;
+import java.util.Set;
+import org.eclipse.jdt.annotation.NonNull;
+
+public abstract class InternalClusterEvent extends OwnerSupervisorCommand {
+ private final @NonNull Set<String> roles;
+ private final @NonNull Address address;
+
+ InternalClusterEvent(final Address address, final Set<String> roles) {
+ this.address = requireNonNull(address);
+ this.roles = Set.copyOf(roles);
+ }
+
+ public final @NonNull Address getAddress() {
+ return address;
+ }
+
+ public final @NonNull Set<String> getRoles() {
+ return roles;
+ }
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).add("address", address).add("roles", roles).toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.Address;
+import java.util.Set;
+
+public final class MemberDownEvent extends InternalClusterEvent {
+ public MemberDownEvent(final Address address, final Set<String> roles) {
+ super(address, roles);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.Address;
+import java.util.Set;
+
+public final class MemberReachableEvent extends InternalClusterEvent {
+ public MemberReachableEvent(final Address address, final Set<String> roles) {
+ super(address, roles);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.Address;
+import java.util.Set;
+
+public final class MemberUnreachableEvent extends InternalClusterEvent {
+ public MemberUnreachableEvent(final Address address, final Set<String> roles) {
+ super(address, roles);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.Address;
+import java.util.Set;
+
+public final class MemberUpEvent extends InternalClusterEvent {
+ public MemberUpEvent(final Address address, final Set<String> roles) {
+ super(address, roles);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.UpdateResponse;
+import org.eclipse.jdt.annotation.NonNull;
+
+public final class OwnerChanged extends OwnerSupervisorCommand {
+ private final @NonNull UpdateResponse<LWWRegister<String>> rsp;
+
+ public OwnerChanged(final UpdateResponse<LWWRegister<String>> rsp) {
+ this.rsp = requireNonNull(rsp);
+ }
+
+ public @NonNull UpdateResponse<LWWRegister<String>> getResponse() {
+ return rsp;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+public abstract class OwnerSupervisorCommand {
+ OwnerSupervisorCommand() {
+ // Hidden on purpose
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.Key;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORMapKey;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.SelfUniqueAddress;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Actor responsible for handling registrations of candidates into distributed-data.
+ */
+public final class CandidateRegistry extends AbstractBehavior<CandidateRegistryCommand> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistry.class);
+
+ public static final Key<ORMap<DOMEntity, ORSet<String>>> KEY = new ORMapKey<>("candidateRegistry");
+
+ private final ReplicatorMessageAdapter<CandidateRegistryCommand, ORMap<DOMEntity, ORSet<String>>> replicatorAdapter;
+ private final SelfUniqueAddress node;
+
+ private CandidateRegistry(final ActorContext<CandidateRegistryCommand> context,
+ final ReplicatorMessageAdapter<CandidateRegistryCommand,
+ ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) {
+ super(context);
+ this.replicatorAdapter = replicatorAdapter;
+
+ this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
+
+ LOG.debug("Candidate registry started");
+ }
+
+ public static Behavior<CandidateRegistryCommand> create() {
+ return Behaviors.setup(ctx ->
+ DistributedData.withReplicatorMessageAdapter(
+ (ReplicatorMessageAdapter<CandidateRegistryCommand,
+ ORMap<DOMEntity,ORSet<String>>> replicatorAdapter) ->
+ new CandidateRegistry(ctx, replicatorAdapter)));
+ }
+
+ @Override
+ public Receive<CandidateRegistryCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(RegisterCandidate.class, this::onRegisterCandidate)
+ .onMessage(UnregisterCandidate.class, this::onUnregisterCandidate)
+ .onMessage(InternalUpdateResponse.class, this::onInternalUpdateResponse)
+ .build();
+ }
+
+ private Behavior<CandidateRegistryCommand> onRegisterCandidate(final RegisterCandidate registerCandidate) {
+ LOG.debug("Registering candidate({}) for entity: {}",
+ registerCandidate.getCandidate(), registerCandidate.getEntity());
+ replicatorAdapter.askUpdate(
+ askReplyTo -> new Replicator.Update<>(
+ KEY,
+ ORMap.empty(),
+ Replicator.writeLocal(),
+ askReplyTo,
+ map -> map.update(node, registerCandidate.getEntity(), ORSet.empty(),
+ value -> value.add(node, registerCandidate.getCandidate()))),
+ InternalUpdateResponse::new);
+ return this;
+ }
+
+ private Behavior<CandidateRegistryCommand> onUnregisterCandidate(final UnregisterCandidate unregisterCandidate) {
+ LOG.debug("Removing candidate({}) from entity: {}",
+ unregisterCandidate.getCandidate(), unregisterCandidate.getEntity());
+ replicatorAdapter.askUpdate(
+ askReplyTo -> new Replicator.Update<>(
+ KEY,
+ ORMap.empty(),
+ Replicator.writeLocal(),
+ askReplyTo,
+ map -> map.update(node, unregisterCandidate.getEntity(), ORSet.empty(),
+ value -> value.remove(node, unregisterCandidate.getCandidate()))),
+ InternalUpdateResponse::new);
+ return this;
+ }
+
+ private Behavior<CandidateRegistryCommand> onInternalUpdateResponse(final InternalUpdateResponse updateResponse) {
+ LOG.debug("Received update response: {}", updateResponse.getRsp());
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public abstract class AbstractCandidateCommand extends CandidateRegistryCommand {
+ private final @NonNull DOMEntity entity;
+ private final @NonNull String candidate;
+
+ AbstractCandidateCommand(final DOMEntity entity, final String candidate) {
+ this.entity = requireNonNull(entity);
+ this.candidate = requireNonNull(candidate);
+ }
+
+ public final @NonNull DOMEntity getEntity() {
+ return entity;
+ }
+
+ public final @NonNull String getCandidate() {
+ return candidate;
+ }
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).add("entity", entity).add("candidate", candidate).toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+public abstract class CandidateRegistryCommand {
+ CandidateRegistryCommand() {
+ // Hidden on purpose
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.UpdateResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class InternalUpdateResponse extends CandidateRegistryCommand {
+ private final @NonNull UpdateResponse<ORMap<DOMEntity, ORSet<String>>> rsp;
+
+ public InternalUpdateResponse(final UpdateResponse<ORMap<DOMEntity, ORSet<String>>> rsp) {
+ this.rsp = requireNonNull(rsp);
+ }
+
+ public @NonNull UpdateResponse<ORMap<DOMEntity, ORSet<String>>> getRsp() {
+ return rsp;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+/**
+ * Sent to Candidate registry to register the candidate for a given entity.
+ */
+public final class RegisterCandidate extends AbstractCandidateCommand {
+ public RegisterCandidate(final DOMEntity entity, final String candidate) {
+ super(entity, candidate);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+/**
+ * Sent to CandidateRegistry to unregister the candidate for a given entity.
+ */
+public final class UnregisterCandidate extends AbstractCandidateCommand {
+ public UnregisterCandidate(final DOMEntity entity, final String candidate) {
+ super(entity, candidate);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.owner;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.command.InitialOwnerSync;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.command.OwnerChanged;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Keeps track of owners for a single entity, which is mapped to a single LWWRegister in distributed-data.
+ * Notifies the listener responsible for tracking the whole entity-type of changes.
+ */
+public class SingleEntityListenerActor extends AbstractBehavior<ListenerCommand> {
+ private static final Logger LOG = LoggerFactory.getLogger(SingleEntityListenerActor.class);
+
+ private final String localMember;
+ private final DOMEntity entity;
+ private final ActorRef<TypeListenerCommand> toNotify;
+ private final ReplicatorMessageAdapter<ListenerCommand, LWWRegister<String>> ownerReplicator;
+
+ private String currentOwner = "";
+
+ public SingleEntityListenerActor(final ActorContext<ListenerCommand> context, final String localMember,
+ final DOMEntity entity, final ActorRef<TypeListenerCommand> toNotify) {
+ super(context);
+ this.localMember = localMember;
+ this.entity = entity;
+ this.toNotify = toNotify;
+
+ final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
+ ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+
+ ownerReplicator.askGet(
+ replyTo -> new Replicator.Get<>(new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
+ InitialOwnerSync::new);
+ LOG.debug("OwnerListenerActor for {} started", entity.toString());
+ }
+
+ public static Behavior<ListenerCommand> create(final String localMember, final DOMEntity entity,
+ final ActorRef<TypeListenerCommand> toNotify) {
+ return Behaviors.setup(ctx -> new SingleEntityListenerActor(ctx, localMember, entity, toNotify));
+ }
+
+ @Override
+ public Receive<ListenerCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(OwnerChanged.class, this::onOwnerChanged)
+ .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
+ .build();
+ }
+
+ private Behavior<ListenerCommand> onInitialOwnerSync(final InitialOwnerSync ownerSync) {
+ final Replicator.GetResponse<LWWRegister<String>> response = ownerSync.getResponse();
+ LOG.debug("Received initial sync response for: {}, response: {}", entity, response);
+
+ // only trigger initial notification when there is no owner present as we wont get a subscription callback
+ // when distributed-data does not have any data for a key
+ if (response instanceof Replicator.NotFound) {
+
+ // no data is present, trigger initial notification with no owner
+ triggerNoOwnerNotification();
+ } else if (response instanceof Replicator.GetSuccess) {
+
+ // when we get a success just let subscribe callback handle the initial notification
+ LOG.debug("Owner present for entity: {} at the time of initial sync.", entity);
+ } else {
+ LOG.warn("Get has failed for entity: {}", response);
+ }
+
+ // make sure to subscribe AFTER initial notification
+ ownerReplicator.subscribe(new LWWRegisterKey<>(entity.toString()), OwnerChanged::new);
+
+ return this;
+ }
+
+ private void triggerNoOwnerNotification() {
+ LOG.debug("Triggering initial notification without an owner for: {}", entity);
+
+ toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
+ entity, EntityOwnershipChangeState.REMOTE_OWNERSHIP_LOST_NO_OWNER)));
+ }
+
+ private Behavior<ListenerCommand> onOwnerChanged(final OwnerChanged ownerChanged) {
+
+ final Replicator.SubscribeResponse<LWWRegister<String>> response = ownerChanged.getResponse();
+ if (response instanceof Replicator.Changed) {
+
+ final Replicator.Changed<LWWRegister<String>> registerChanged =
+ (Replicator.Changed<LWWRegister<String>>) response;
+ LOG.debug("Owner changed for: {}, prevOwner: {}, newOwner: {}",
+ entity, currentOwner, registerChanged.get(registerChanged.key()).getValue());
+ handleOwnerChange(registerChanged);
+ } else if (response instanceof Replicator.Deleted) {
+ handleOwnerLost((Replicator.Deleted<LWWRegister<String>>) response);
+ }
+
+ return this;
+ }
+
+ private void handleOwnerChange(final Replicator.Changed<LWWRegister<String>> changed) {
+ final String newOwner = changed.get(changed.key()).getValue();
+
+ final boolean wasOwner = currentOwner.equals(localMember);
+ final boolean isOwner = newOwner.equals(localMember);
+ final boolean hasOwner = !newOwner.equals("");
+
+ LOG.debug("Owner changed for entity:{}, currentOwner: {}, wasOwner: {}, isOwner: {}, hasOwner:{}",
+ entity, currentOwner, wasOwner, isOwner, hasOwner);
+
+ currentOwner = newOwner;
+
+ toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
+ entity, EntityOwnershipChangeState.from(wasOwner, isOwner, hasOwner))));
+ }
+
+ private void handleOwnerLost(final Replicator.Deleted<LWWRegister<String>> changed) {
+ final boolean wasOwner = currentOwner.equals(localMember);
+
+ LOG.debug("Owner lost for entity:{}, currentOwner: {}, wasOwner: {}", entity, currentOwner, wasOwner);
+
+ currentOwner = "";
+ toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
+ entity, EntityOwnershipChangeState.from(wasOwner, false, false))));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.owner.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+
+public final class InitialOwnerSync extends ListenerCommand {
+ private final @NonNull GetResponse<LWWRegister<String>> response;
+
+ public InitialOwnerSync(final GetResponse<LWWRegister<String>> response) {
+ this.response = requireNonNull(response);
+ }
+
+ public @NonNull GetResponse<LWWRegister<String>> getResponse() {
+ return response;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.owner.command;
+
+public abstract class ListenerCommand {
+ ListenerCommand() {
+ // Hidden on purpose
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.owner.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Notification from distributed-data sent to the SingleEntityListenerActor when owner changes for the tracked entity.
+ */
+public final class OwnerChanged extends ListenerCommand {
+ private final @NonNull SubscribeResponse<LWWRegister<String>> response;
+
+ public OwnerChanged(final SubscribeResponse<LWWRegister<String>> response) {
+ this.response = requireNonNull(response);
+ }
+
+ public @NonNull SubscribeResponse<LWWRegister<String>> getResponse() {
+ return response;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator.Changed;
+import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.SingleEntityListenerActor;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EntityTypeListenerActor extends AbstractBehavior<TypeListenerCommand> {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityTypeListenerActor.class);
+
+ private final Map<DOMEntity, ActorRef<ListenerCommand>> activeListeners = new HashMap<>();
+ private final String localMember;
+ private final String entityType;
+ private final DOMEntityOwnershipListener listener;
+
+ public EntityTypeListenerActor(final ActorContext<TypeListenerCommand> context, final String localMember,
+ final String entityType, final DOMEntityOwnershipListener listener) {
+ super(context);
+ this.localMember = localMember;
+ this.entityType = entityType;
+ this.listener = listener;
+
+ new ReplicatorMessageAdapter<TypeListenerCommand, ORMap<DOMEntity, ORSet<String>>>(context,
+ DistributedData.get(context.getSystem()).replicator(), Duration.ofSeconds(5))
+ .subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
+ }
+
+ public static Behavior<TypeListenerCommand> create(final String localMember, final String entityType,
+ final DOMEntityOwnershipListener listener) {
+ return Behaviors.setup(ctx -> new EntityTypeListenerActor(ctx, localMember, entityType, listener));
+ }
+
+ @Override
+ public Receive<TypeListenerCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
+ .onMessage(EntityOwnerChanged.class, this::onOwnerChanged)
+ .build();
+ }
+
+ private Behavior<TypeListenerCommand> onCandidatesChanged(final CandidatesChanged notification) {
+ final SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response = notification.getResponse();
+ if (response instanceof Changed) {
+ processCandidates(((Changed<ORMap<DOMEntity, ORSet<String>>>) response).get(response.key()).getEntries());
+ } else {
+ LOG.warn("Unexpected notification from replicator: {}", response);
+ }
+ return this;
+ }
+
+ private void processCandidates(final Map<DOMEntity, ORSet<String>> entries) {
+ final Map<DOMEntity, ORSet<String>> filteredCandidates = entries.entrySet().stream()
+ .filter(entry -> entry.getKey().getType().equals(entityType))
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+ LOG.debug("Entity-type: {} current candidates: {}", entityType, filteredCandidates);
+
+ final Set<DOMEntity> removed =
+ ImmutableSet.copyOf(Sets.difference(activeListeners.keySet(), filteredCandidates.keySet()));
+ if (!removed.isEmpty()) {
+ LOG.debug("Stopping listeners for {}", removed);
+ // kill actors for the removed
+ removed.forEach(removedEntity -> getContext().stop(activeListeners.remove(removedEntity)));
+ }
+
+ for (final Entry<DOMEntity, ORSet<String>> entry : filteredCandidates.entrySet()) {
+ activeListeners.computeIfAbsent(entry.getKey(), key -> {
+ // spawn actor for this entity
+ LOG.debug("Starting listener for {}", key);
+ return getContext().spawn(SingleEntityListenerActor.create(localMember, key, getContext().getSelf()),
+ "SingleEntityListener-" + encodeEntityToActorName(key));
+ });
+ }
+ }
+
+ private Behavior<TypeListenerCommand> onOwnerChanged(final EntityOwnerChanged rsp) {
+ LOG.debug("Entity-type: {} listener, owner change: {}", entityType, rsp);
+
+ listener.ownershipChanged(rsp.getOwnershipChange());
+ return this;
+ }
+
+ private static String encodeEntityToActorName(final DOMEntity entity) {
+ return "type=" + entity.getType() + ",entity="
+ + entity.getIdentifier().getLastPathArgument().getNodeType().getLocalName() + "-" + UUID.randomUUID();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EntityTypeListenerRegistry extends AbstractBehavior<TypeListenerRegistryCommand> {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityTypeListenerRegistry.class);
+
+ private final Map<DOMEntityOwnershipListener, ActorRef<TypeListenerCommand>> spawnedListenerActors =
+ new HashMap<>();
+ private final String localMember;
+
+ public EntityTypeListenerRegistry(final ActorContext<TypeListenerRegistryCommand> context,
+ final String localMember) {
+ super(context);
+ this.localMember = requireNonNull(localMember);
+ }
+
+ public static Behavior<TypeListenerRegistryCommand> create(final String role) {
+ return Behaviors.setup(ctx -> new EntityTypeListenerRegistry(ctx, role));
+ }
+
+ @Override
+ public Receive<TypeListenerRegistryCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(RegisterListener.class, this::onRegisterListener)
+ .onMessage(UnregisterListener.class, this::onUnregisterListener)
+ .build();
+ }
+
+ private Behavior<TypeListenerRegistryCommand> onRegisterListener(final RegisterListener command) {
+ LOG.debug("Spawning entity type listener actor for: {}", command.getEntityType());
+
+ final ActorRef<TypeListenerCommand> listenerActor =
+ getContext().spawn(EntityTypeListenerActor.create(localMember,
+ command.getEntityType(), command.getDelegateListener()),
+ "TypeListener:" + encodeEntityToActorName(command.getEntityType()));
+ spawnedListenerActors.put(command.getDelegateListener(), listenerActor);
+ return this;
+ }
+
+ private Behavior<TypeListenerRegistryCommand> onUnregisterListener(final UnregisterListener command) {
+ LOG.debug("Stopping entity type listener actor for: {}", command.getEntityType());
+
+ getContext().stop(spawnedListenerActors.remove(command.getDelegateListener()));
+ return this;
+ }
+
+ private static String encodeEntityToActorName(final String entityType) {
+ return "type=" + entityType + "-" + UUID.randomUUID();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+/**
+ * Adapted notification from distributed-data sent to EntityTypeListenerActor when candidates change.
+ */
+public final class CandidatesChanged extends TypeListenerCommand {
+ private final @NonNull SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response;
+
+ public CandidatesChanged(final SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response) {
+ this.response = requireNonNull(response);
+ }
+
+ public @NonNull SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+ return response;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("response", response).toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
+
+/**
+ * Notification sent to EntityTypeListenerActor when there is an owner change for an Entity of a given type.
+ */
+public final class EntityOwnerChanged extends TypeListenerCommand {
+ private final @NonNull DOMEntityOwnershipChange ownershipChange;
+
+ public EntityOwnerChanged(final DOMEntityOwnershipChange ownershipChange) {
+ this.ownershipChange = requireNonNull(ownershipChange);
+ }
+
+ public @NonNull DOMEntityOwnershipChange getOwnershipChange() {
+ return ownershipChange;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("ownershipChange", ownershipChange).toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+
+/**
+ * Register a DOMEntityOwnershipListener for a given entity-type.
+ */
+public final class RegisterListener extends TypeListenerRegistryCommand {
+ public RegisterListener(final String entityType, final DOMEntityOwnershipListener delegateListener) {
+ super(entityType, delegateListener);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+public abstract class TypeListenerCommand {
+ TypeListenerCommand() {
+ // Hidden on purpose
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+import static java.util.Objects.requireNonNull;
+
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+
+public abstract class TypeListenerRegistryCommand {
+ private final @NonNull String entityType;
+ private final @NonNull DOMEntityOwnershipListener delegateListener;
+
+ TypeListenerRegistryCommand(final String entityType, final DOMEntityOwnershipListener delegateListener) {
+ this.entityType = requireNonNull(entityType);
+ this.delegateListener = requireNonNull(delegateListener);
+ }
+
+ public final @NonNull String getEntityType() {
+ return entityType;
+ }
+
+ public final @NonNull DOMEntityOwnershipListener getDelegateListener() {
+ return delegateListener;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+
+/**
+ * Unregister a listener from the EntityTypeListenerRegistry.
+ */
+public final class UnregisterListener extends TypeListenerRegistryCommand {
+ public UnregisterListener(final String entityType, final DOMEntityOwnershipListener delegateListener) {
+ super(entityType, delegateListener);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.Adapter;
+import akka.actor.typed.javadsl.AskPattern;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
+import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
+import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractNativeEosTest {
+
+ public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
+ public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
+
+ protected static final List<String> TWO_NODE_SEED_NODES =
+ List.of("akka://ClusterSystem@127.0.0.1:2550",
+ "akka://ClusterSystem@127.0.0.1:2551");
+
+ protected static final List<String> THREE_NODE_SEED_NODES =
+ List.of("akka://ClusterSystem@127.0.0.1:2550",
+ "akka://ClusterSystem@127.0.0.1:2551",
+ "akka://ClusterSystem@127.0.0.1:2552");
+
+ private static final String REMOTE_PROTOCOL = "akka";
+ private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
+ private static final String ROLE_PARAM = "akka.cluster.roles";
+ private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
+
+
+ protected static ClusterNode startupRemote(final int port, final List<String> roles)
+ throws ExecutionException, InterruptedException {
+ return startup(port, roles, THREE_NODE_SEED_NODES);
+ }
+
+ protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
+ throws ExecutionException, InterruptedException {
+ return startup(port, roles, seedNodes);
+ }
+
+ protected static ClusterNode startup(final int port, final List<String> roles)
+ throws ExecutionException, InterruptedException {
+ return startup(port, roles, List.of());
+ }
+
+ protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
+ throws ExecutionException, InterruptedException {
+
+ return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
+ }
+
+ protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
+ final Supplier<Behavior<BootstrapCommand>> bootstrap)
+ throws ExecutionException, InterruptedException {
+ // Override the configuration
+ final Map<String, Object> overrides = new HashMap<>(4);
+ overrides.put(PORT_PARAM, port);
+ overrides.put(ROLE_PARAM, roles);
+ if (!seedNodes.isEmpty()) {
+ overrides.put(SEED_NODES_PARAM, seedNodes);
+ }
+
+ final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
+
+ // Create a classic Akka system since thats what we will have in osgi
+ final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
+ final ActorRef<BootstrapCommand> eosBootstrap =
+ Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
+
+ final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
+ GetRunningContext::new,
+ Duration.ofSeconds(5),
+ Adapter.toTyped(system.scheduler()));
+ final RunningContext runningContext = ask.toCompletableFuture().get();
+
+ return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
+ runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+ }
+
+ private static Behavior<BootstrapCommand> rootBehavior() {
+ return Behaviors.setup(context -> EOSMain.create());
+ }
+
+ protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
+ final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
+ registerCandidates(candidateRegistry, entity, members);
+ }
+
+ protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
+ final DOMEntity entity, final String... members) {
+ for (final String member : members) {
+ candidateRegistry.tell(new RegisterCandidate(entity, member));
+ }
+ }
+
+ protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
+ final String... members) {
+ final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
+ for (final String member : members) {
+ candidateRegistry.tell(new UnregisterCandidate(entity, member));
+ }
+ }
+
+ protected static MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
+ final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
+ final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
+ listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
+
+ return listener;
+ }
+
+ protected static void reachableMember(final ClusterNode node, final String role) {
+ reachableMember(node.getOwnerSupervisor(), role);
+ }
+
+ protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
+ ownerSupervisor.tell(new MemberReachableEvent(
+ new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
+ }
+
+ protected static void unreachableMember(final ClusterNode node, final String role) {
+ unreachableMember(node.getOwnerSupervisor(), role);
+ }
+
+ protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
+ ownerSupervisor.tell(new MemberUnreachableEvent(
+ new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
+ }
+
+ protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
+ await().until(() -> {
+ final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
+ final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
+ AskPattern.ask(distributedData.replicator(),
+ replyTo -> new Replicator.Get<>(
+ new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
+ Duration.ofSeconds(5),
+ clusterNode.getActorSystem().scheduler());
+
+ final Replicator.GetResponse<LWWRegister<String>> response =
+ ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+ if (response instanceof Replicator.GetSuccess) {
+ final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
+ return !owner.isEmpty();
+ }
+
+ return false;
+ });
+ }
+
+ protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
+ final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
+ await().until(() -> !listener.getChanges().isEmpty());
+
+ await().untilAsserted(() -> {
+ final List<DOMEntityOwnershipChange> changes = listener.getChanges();
+ final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
+ assertEquals(entity, domEntityOwnershipChange.getEntity());
+
+ assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
+ assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
+ assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
+ });
+ }
+
+ protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
+ await().pollDelay(2, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
+ }
+
+ protected static final class ClusterNode {
+ private final int port;
+ private final List<String> roles;
+ private final akka.actor.typed.ActorSystem<Void> actorSystem;
+ private final ActorRef<BootstrapCommand> eosBootstrap;
+ private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
+ private final ActorRef<CandidateRegistryCommand> candidateRegistry;
+ private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+
+ private ClusterNode(final int port,
+ final List<String> roles,
+ final ActorSystem actorSystem,
+ final ActorRef<BootstrapCommand> eosBootstrap,
+ final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
+ final ActorRef<CandidateRegistryCommand> candidateRegistry,
+ final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
+ this.port = port;
+ this.roles = roles;
+ this.actorSystem = Adapter.toTyped(actorSystem);
+ this.eosBootstrap = eosBootstrap;
+ this.listenerRegistry = listenerRegistry;
+ this.candidateRegistry = candidateRegistry;
+ this.ownerSupervisor = ownerSupervisor;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public akka.actor.typed.ActorSystem<Void> getActorSystem() {
+ return actorSystem;
+ }
+
+ public ActorRef<BootstrapCommand> getEosBootstrap() {
+ return eosBootstrap;
+ }
+
+ public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
+ return listenerRegistry;
+ }
+
+ public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
+ return candidateRegistry;
+ }
+
+ public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
+ return ownerSupervisor;
+ }
+
+ public List<String> getRoles() {
+ return roles;
+ }
+ }
+
+ protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
+
+ private final Logger log = LoggerFactory.getLogger(MockEntityOwnershipListener.class);
+
+ private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
+ private final String member;
+
+ public MockEntityOwnershipListener(final String member) {
+
+ this.member = member;
+ }
+
+ @Override
+ public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
+ log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
+ log.info("{} changes: {}", member, changes.size());
+ changes.add(ownershipChange);
+ }
+
+ public List<DOMEntityOwnershipChange> getChanges() {
+ return changes;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import akka.actor.ActorSystem;
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.javadsl.Adapter;
+import akka.actor.typed.javadsl.AskPattern;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import com.typesafe.config.ConfigFactory;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import org.awaitility.Durations;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class AkkaEntityOwnershipServiceTest extends AbstractNativeEosTest {
+ static final String ENTITY_TYPE = "test";
+ static final String ENTITY_TYPE2 = "test2";
+ static final QName QNAME = QName.create("test", "2015-08-11", "foo");
+ static int ID_COUNTER = 1;
+
+ private ActorSystem system;
+ private akka.actor.typed.ActorSystem<Void> typedSystem;
+ private AkkaEntityOwnershipService service;
+ private ActorRef<Replicator.Command> replicator;
+
+ @Before
+ public void setUp() throws Exception {
+ system = ActorSystem.create("ClusterSystem", ConfigFactory.load());
+ typedSystem = Adapter.toTyped(this.system);
+ replicator = DistributedData.get(typedSystem).replicator();
+
+ service = new AkkaEntityOwnershipService(system);
+ }
+
+ @After
+ public void tearDown() throws InterruptedException, ExecutionException {
+ service.close();
+ ActorTestKit.shutdown(Adapter.toTyped(system));
+ }
+
+ @Test
+ public void testRegisterCandidate() throws Exception {
+ final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
+ final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
+
+ final DOMEntityOwnershipCandidateRegistration reg = service.registerCandidate(entity);
+
+ verifyEntityOwnershipCandidateRegistration(entity, reg);
+ verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1");
+
+ try {
+ service.registerCandidate(entity);
+ fail("Expected CandidateAlreadyRegisteredException");
+ } catch (final CandidateAlreadyRegisteredException e) {
+ // expected
+ assertEquals("getEntity", entity, e.getEntity());
+ }
+
+ final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE2, entityId);
+ final DOMEntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2);
+
+ verifyEntityOwnershipCandidateRegistration(entity2, reg2);
+ verifyEntityCandidateRegistered(ENTITY_TYPE2, entityId, "member-1");
+ }
+
+ @Test
+ public void testUnregisterCandidate() throws Exception {
+ final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
+ final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
+
+ final DOMEntityOwnershipCandidateRegistration reg = service.registerCandidate(entity);
+
+ verifyEntityOwnershipCandidateRegistration(entity, reg);
+ verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1");
+
+ reg.close();
+ verifyEntityCandidateMissing(ENTITY_TYPE, entityId, "member-1");
+
+ service.registerCandidate(entity);
+ verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1");
+ }
+
+ @Test
+ public void testListenerRegistration() throws Exception {
+
+ final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
+ final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
+ final MockEntityOwnershipListener listener = new MockEntityOwnershipListener("member-1");
+
+ final DOMEntityOwnershipListenerRegistration reg = service.registerListener(entity.getType(), listener);
+
+ assertNotNull("EntityOwnershipListenerRegistration null", reg);
+ assertEquals("getEntityType", entity.getType(), reg.getEntityType());
+ assertEquals("getInstance", listener, reg.getInstance());
+
+ final DOMEntityOwnershipCandidateRegistration candidate = service.registerCandidate(entity);
+
+ verifyListenerState(listener, entity, true, true, false);
+ final int changes = listener.getChanges().size();
+
+ reg.close();
+ candidate.close();
+
+ verifyEntityCandidateMissing(ENTITY_TYPE, entityId, "member-1");
+
+ service.registerCandidate(entity);
+ // check listener not called when listener registration closed
+ await().pollDelay(Durations.TWO_SECONDS).until(() -> listener.getChanges().size() == changes);
+ }
+
+ @Test
+ public void testGetOwnershipState() throws Exception {
+ final DOMEntity entity = new DOMEntity(ENTITY_TYPE, "one");
+
+ final DOMEntityOwnershipCandidateRegistration registration = service.registerCandidate(entity);
+ verifyGetOwnershipState(service, entity, EntityOwnershipState.IS_OWNER);
+
+ final RunningContext runningContext = service.getRunningContext();
+ registerCandidates(runningContext.getCandidateRegistry(), entity, "member-2");
+
+ final ActorRef<OwnerSupervisorCommand> ownerSupervisor = runningContext.getOwnerSupervisor();
+ reachableMember(ownerSupervisor, "member-2");
+ unreachableMember(ownerSupervisor, "member-1");
+ verifyGetOwnershipState(service, entity, EntityOwnershipState.OWNED_BY_OTHER);
+
+ final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, "two");
+ final Optional<EntityOwnershipState> state = service.getOwnershipState(entity2);
+ assertFalse(state.isPresent());
+
+ unreachableMember(ownerSupervisor, "member-2");
+ verifyGetOwnershipState(service, entity, EntityOwnershipState.NO_OWNER);
+ }
+
+ @Test
+ public void testIsCandidateRegistered() throws Exception {
+ final DOMEntity test = new DOMEntity("test-type", "test");
+
+ assertFalse(service.isCandidateRegistered(test));
+
+ service.registerCandidate(test);
+
+ assertTrue(service.isCandidateRegistered(test));
+ }
+
+ private static void verifyGetOwnershipState(final DOMEntityOwnershipService service, final DOMEntity entity,
+ final EntityOwnershipState expState) {
+ await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
+ final Optional<EntityOwnershipState> state = service.getOwnershipState(entity);
+ assertTrue("getOwnershipState present", state.isPresent());
+ assertEquals("EntityOwnershipState", expState, state.get());
+ });
+ }
+
+ private void verifyEntityCandidateRegistered(final String entityType,
+ final YangInstanceIdentifier entityId,
+ final String candidateName) {
+ await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> doVerifyEntityCandidateRegistered(entityType, entityId, candidateName));
+ }
+
+ private void doVerifyEntityCandidateRegistered(final String entityType,
+ final YangInstanceIdentifier entityId,
+ final String candidateName)
+ throws ExecutionException, InterruptedException {
+ final Map<DOMEntity, ORSet<String>> entries = getCandidateData();
+ final DOMEntity entity = new DOMEntity(entityType, entityId);
+ assertTrue(entries.containsKey(entity));
+ assertTrue(entries.get(entity).getElements().contains(candidateName));
+ }
+
+ private void verifyEntityCandidateMissing(final String entityType,
+ final YangInstanceIdentifier entityId,
+ final String candidateName) {
+ await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> doVerifyEntityCandidateMissing(entityType, entityId, candidateName));
+ }
+
+ private void doVerifyEntityCandidateMissing(final String entityType,
+ final YangInstanceIdentifier entityId,
+ final String candidateName)
+ throws ExecutionException, InterruptedException {
+ final Map<DOMEntity, ORSet<String>> entries = getCandidateData();
+ final DOMEntity entity = new DOMEntity(entityType, entityId);
+ assertTrue(entries.containsKey(entity));
+ assertFalse(entries.get(entity).getElements().contains(candidateName));
+ }
+
+ private Map<DOMEntity, ORSet<String>> getCandidateData() throws ExecutionException, InterruptedException {
+ final CompletionStage<Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>>> ask =
+ AskPattern.ask(replicator, replyTo ->
+ new Replicator.Get<>(
+ CandidateRegistry.KEY,
+ Replicator.readLocal(),
+ replyTo),
+ Duration.ofSeconds(5),
+ typedSystem.scheduler());
+
+ final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = ask.toCompletableFuture().get();
+ assertTrue(response instanceof Replicator.GetSuccess);
+
+ final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> success =
+ (Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response;
+
+ return success.get(CandidateRegistry.KEY).getEntries();
+ }
+
+ private static void verifyEntityOwnershipCandidateRegistration(final DOMEntity entity,
+ final DOMEntityOwnershipCandidateRegistration reg) {
+ assertNotNull("EntityOwnershipCandidateRegistration null", reg);
+ assertEquals("getInstance", entity, reg.getInstance());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class SingleNodeTest extends AbstractNativeEosTest {
+
+ public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
+ public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
+
+ private ClusterNode clusterNode;
+
+ @Before
+ public void setUp() throws Exception {
+ clusterNode = startup(2550, List.of("member-1"));
+
+ reachableMember(clusterNode, "member-2");
+ reachableMember(clusterNode, "member-3");
+ }
+
+ @After
+ public void tearDown() {
+ ActorTestKit.shutdown(clusterNode.getActorSystem());
+ }
+
+ @Test
+ public void testNotificationPriorToCandidateRegistration() {
+ final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
+ verifyNoNotifications(listener);
+
+ registerCandidates(clusterNode, ENTITY_1, "member-1");
+ verifyListenerState(listener, ENTITY_1, true, true, false);
+ }
+
+ @Test
+ public void testListenerPriorToAddingCandidates() {
+ final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
+
+ registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
+ verifyListenerState(listener, ENTITY_1, true, true, false);
+
+ unregisterCandidates(clusterNode, ENTITY_1, "member-1");
+ verifyListenerState(listener, ENTITY_1, true, false, true);
+ }
+
+ @Test
+ public void testListenerRegistrationAfterCandidates() {
+ registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
+ waitUntillOwnerPresent(clusterNode, ENTITY_1);
+
+ final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
+ verifyListenerState(listener, ENTITY_1, true, true, false);
+
+ unregisterCandidates(clusterNode, ENTITY_1, "member-1", "member-2");
+ verifyListenerState(listener, ENTITY_1, true, false, true);
+ }
+
+ @Test
+ public void testMultipleEntities() {
+ registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
+ waitUntillOwnerPresent(clusterNode, ENTITY_1);
+
+ final MockEntityOwnershipListener listener1 = registerListener(clusterNode, ENTITY_1);
+ final MockEntityOwnershipListener listener2 = registerListener(clusterNode, ENTITY_2);
+
+ verifyListenerState(listener1, ENTITY_1, true, true, false);
+ verifyNoNotifications(listener2);
+
+ unregisterCandidates(clusterNode, ENTITY_1, "member-1");
+ verifyListenerState(listener1, ENTITY_1, true, false, true);
+ verifyNoNotifications(listener2);
+
+ registerCandidates(clusterNode, ENTITY_2, "member-2");
+ verifyListenerState(listener1, ENTITY_1, true, false, true);
+ verifyListenerState(listener2, ENTITY_2, true, false, false);
+
+ unregisterCandidates(clusterNode, ENTITY_2, "member-2");
+
+ verifyListenerState(listener1, ENTITY_1, true, false, true);
+ verifyListenerState(listener2, ENTITY_2, false, false, false);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.typed.Cluster;
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.List;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class ThreeNodeBaseTest extends AbstractNativeEosTest {
+ public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
+ public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
+
+ private ClusterNode node1;
+ private ClusterNode node2;
+ private ClusterNode node3;
+
+ @Before
+ public void setUp() throws Exception {
+ node1 = startupRemote(2550, List.of("member-1"));
+ node2 = startupRemote(2551, List.of("member-2"));
+ node3 = startupRemote(2552, List.of("member-3"));
+
+ // need to wait until all nodes are ready
+ final Cluster cluster = Cluster.get(node3.getActorSystem());
+ // need a longer timeout with classic remoting, artery.tcp doesnt need to wait as long for init
+ Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+ final List<Member> members = ImmutableList.copyOf(cluster.state().getMembers());
+ if (members.size() != 3) {
+ return false;
+ }
+
+ for (final Member member : members) {
+ if (!member.status().equals(MemberStatus.up())) {
+ return false;
+ }
+ }
+
+ return true;
+ });
+ }
+
+ @After
+ public void tearDown() {
+ // same issue with classic remoting as in setup
+ ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
+ ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
+ ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20));
+ }
+
+ @Test
+ public void testInitialNotificationsWithoutOwner() throws Exception {
+ final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
+ verifyNoNotifications(listener1);
+
+ final MockEntityOwnershipListener listener2 = registerListener(node2, ENTITY_1);
+ verifyNoNotifications(listener2);
+
+ final MockEntityOwnershipListener listener3 = registerListener(node3, ENTITY_1);
+ verifyNoNotifications(listener3);
+ }
+
+ @Test
+ public void testInitialNotificationsWithOwner() {
+ registerCandidates(node1, ENTITY_1, "member-1");
+ // make sure we register other candidates after the first is seen everywhere to prevent different results due
+ // to timing
+ waitUntillOwnerPresent(node3, ENTITY_1);
+
+ registerCandidates(node2, ENTITY_1, "member-2");
+ registerCandidates(node3, ENTITY_1, "member-3");
+
+ final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
+ verifyListenerState(listener1, ENTITY_1, true, true, false);
+
+ final MockEntityOwnershipListener listener2 = registerListener(node2, ENTITY_1);
+ verifyListenerState(listener2, ENTITY_1, true, false, false);
+
+ final MockEntityOwnershipListener listener3 = registerListener(node3, ENTITY_1);
+ verifyListenerState(listener3, ENTITY_1, true, false, false);
+ }
+
+ @Test
+ public void testMultipleEntities() {
+ registerCandidates(node1, ENTITY_1, "member-1");
+ registerCandidates(node2, ENTITY_1, "member-2");
+ registerCandidates(node3, ENTITY_1, "member-3");
+
+ waitUntillOwnerPresent(node3, ENTITY_1);
+
+ registerCandidates(node2, ENTITY_2, "member-2");
+ waitUntillOwnerPresent(node2, ENTITY_2);
+ registerCandidates(node1, ENTITY_2, "member-1");
+
+ final MockEntityOwnershipListener firstEntityListener1 = registerListener(node1, ENTITY_1);
+ final MockEntityOwnershipListener firstEntityListener2 = registerListener(node2, ENTITY_1);
+ final MockEntityOwnershipListener firstEntityListener3 = registerListener(node3, ENTITY_1);
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
+ verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false);
+ verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+
+ final MockEntityOwnershipListener secondEntityListener1 = registerListener(node1, ENTITY_2);
+ final MockEntityOwnershipListener secondEntityListener2 = registerListener(node2, ENTITY_2);
+ final MockEntityOwnershipListener secondEntityListener3 = registerListener(node3, ENTITY_2);
+
+ verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false);
+ verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false);
+ verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
+
+ unregisterCandidates(node1, ENTITY_1, "member-1");
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, true, false, true);
+ verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false);
+ verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+
+ unregisterCandidates(node2, ENTITY_1, "member-2");
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false);
+ verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
+ verifyListenerState(firstEntityListener3, ENTITY_1, true, true, false);
+
+ unregisterCandidates(node3, ENTITY_1, "member-3");
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, false, false, false);
+ verifyListenerState(firstEntityListener2, ENTITY_1, false, false, false);
+ verifyListenerState(firstEntityListener3, ENTITY_1, false, false, true);
+
+ // check second listener hasnt moved
+ verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false);
+ verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false);
+ verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
+
+ registerCandidates(node1, ENTITY_1, "member-1");
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
+ verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false);
+ verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static org.awaitility.Awaitility.await;
+
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.typed.Cluster;
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class ThreeNodeReachabilityTest extends AbstractNativeEosTest {
+ public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
+ public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
+
+ private ClusterNode node1 = null;
+ private ClusterNode node2 = null;
+ private ClusterNode node3 = null;
+
+ @Before
+ public void setUp() throws Exception {
+ node1 = startupRemote(2550, List.of("member-1"), TWO_NODE_SEED_NODES);
+ node2 = startupRemote(2551, List.of("member-2"), TWO_NODE_SEED_NODES);
+
+ // need to wait until all nodes are ready
+ final Cluster cluster = Cluster.get(node2.getActorSystem());
+ await().atMost(Duration.ofSeconds(20)).until(() -> {
+ final List<Member> members = ImmutableList.copyOf(cluster.state().getMembers());
+ if (members.size() != 2) {
+ return false;
+ }
+
+ for (final Member member : members) {
+ if (!member.status().equals(MemberStatus.up())) {
+ return false;
+ }
+ }
+
+ return true;
+ });
+ }
+
+ @After
+ public void tearDown() {
+ ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
+ ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
+
+
+ if (node3 != null) {
+ ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20));
+ }
+ }
+
+ @Test
+ public void testNodeLateStart() throws Exception {
+ registerCandidates(node1, ENTITY_1, "member-1");
+ registerCandidates(node2, ENTITY_1, "member-2");
+
+ registerCandidates(node2, ENTITY_2, "member-2");
+ waitUntillOwnerPresent(node2, ENTITY_2);
+ registerCandidates(node1, ENTITY_2, "member-1");
+
+ final MockEntityOwnershipListener firstEntityListener1 = registerListener(node1, ENTITY_1);
+ final MockEntityOwnershipListener firstEntityListener2 = registerListener(node2, ENTITY_1);
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
+ verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false);
+
+ final MockEntityOwnershipListener secondEntityListener1 = registerListener(node1, ENTITY_2);
+ final MockEntityOwnershipListener secondEntityListener2 = registerListener(node2, ENTITY_2);
+
+ verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false);
+ verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false);
+
+ unregisterCandidates(node1, ENTITY_1, "member-1");
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, true, false, true);
+ verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false);
+
+ unregisterCandidates(node2, ENTITY_1, "member-2");
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, false, false, false);
+ verifyListenerState(firstEntityListener2, ENTITY_1, false, false, true);
+
+ startNode3();
+
+ final MockEntityOwnershipListener firstEntityListener3 = registerListener(node3, ENTITY_1);
+ verifyListenerState(firstEntityListener3, ENTITY_1, false, false, false);
+
+ final MockEntityOwnershipListener secondEntityListener3 = registerListener(node3, ENTITY_2);
+ verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
+
+ registerCandidates(node3, ENTITY_1, "member-3");
+ waitUntillOwnerPresent(node3, ENTITY_1);
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false);
+ verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false);
+
+ verifyListenerState(firstEntityListener3, ENTITY_1, true, true, false);
+ }
+
+ @Test
+ public void testReachabilityChangesDuringRuntime() throws Exception {
+ startNode3();
+
+ registerCandidates(node2, ENTITY_1, "member-2");
+ // we want singleton on node1 but owner on node2
+ waitUntillOwnerPresent(node2, ENTITY_1);
+
+ registerCandidates(node1, ENTITY_1, "member-1");
+ registerCandidates(node3, ENTITY_1, "member-3");
+
+ registerCandidates(node2, ENTITY_2, "member-2");
+ waitUntillOwnerPresent(node2, ENTITY_2);
+ registerCandidates(node1, ENTITY_2, "member-1");
+
+ final MockEntityOwnershipListener firstEntityListener1 = registerListener(node1, ENTITY_1);
+ final MockEntityOwnershipListener firstEntityListener2 = registerListener(node2, ENTITY_1);
+ final MockEntityOwnershipListener firstEntityListener3 = registerListener(node3, ENTITY_1);
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false);
+ verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false);
+ verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+
+ final MockEntityOwnershipListener secondEntityListener1 = registerListener(node1, ENTITY_2);
+ final MockEntityOwnershipListener secondEntityListener2 = registerListener(node2, ENTITY_2);
+ final MockEntityOwnershipListener secondEntityListener3 = registerListener(node3, ENTITY_2);
+
+ verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false);
+ verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false);
+ verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
+
+ unreachableMember(node1, "member-2");
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
+ verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
+ verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+
+ verifyListenerState(secondEntityListener1, ENTITY_2, true, true, false);
+ verifyListenerState(secondEntityListener2, ENTITY_2, true, false, true);
+ verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
+
+ unreachableMember(node1, "member-3");
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
+ verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
+ verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+
+ unregisterCandidates(node1, ENTITY_1, "member-1");
+ unregisterCandidates(node1, ENTITY_2, "member-1");
+
+ verifyListenerState(firstEntityListener1, ENTITY_1, false, false, true);
+ verifyListenerState(firstEntityListener2, ENTITY_1, false, false, false);
+ verifyListenerState(firstEntityListener3, ENTITY_1, false, false, false);
+
+ verifyListenerState(secondEntityListener1, ENTITY_2, false, false, true);
+ verifyListenerState(secondEntityListener2, ENTITY_2, false, false, false);
+ verifyListenerState(secondEntityListener3, ENTITY_2, false, false, false);
+
+ reachableMember(node1, "member-2");
+ verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false);
+ verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false);
+ verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+ }
+
+ @Test
+ public void testSingletonMoving() throws Exception {
+ final MockEntityOwnershipListener listener1 = registerListener(node2, ENTITY_1);
+ final MockEntityOwnershipListener listener2 = registerListener(node2, ENTITY_2);
+ verifyNoNotifications(listener1);
+ verifyNoNotifications(listener2);
+
+ registerCandidates(node1, ENTITY_1, "member-1");
+ registerCandidates(node2, ENTITY_1, "member-2");
+
+ registerCandidates(node2, ENTITY_2, "member-2");
+ waitUntillOwnerPresent(node2, ENTITY_2);
+ registerCandidates(node1, ENTITY_2, "member-1");
+ // end up with node1 - member-1, node2 - member-2 owners
+ verifyListenerState(listener1, ENTITY_1, true, false, false);
+ verifyListenerState(listener2, ENTITY_2, true, true, false);
+
+ ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
+
+ verifyListenerState(listener1, ENTITY_1, true, true, false);
+ verifyListenerState(listener2, ENTITY_2, true, true, false);
+
+ startNode3(2);
+
+ final MockEntityOwnershipListener listener3 = registerListener(node3, ENTITY_2);
+ verifyListenerState(listener3, ENTITY_2, true, false, false);
+
+ node1 = startupRemote(2550, List.of("member-1"));
+
+ final Cluster cluster = Cluster.get(node2.getActorSystem());
+ await().atMost(Duration.ofSeconds(20)).until(() -> {
+ final List<Member> members = ImmutableList.copyOf(cluster.state().getMembers());
+ if (members.size() != 3) {
+ return false;
+ }
+
+ for (final Member member : members) {
+ if (!member.status().equals(MemberStatus.up())) {
+ return false;
+ }
+ }
+
+ return true;
+ });
+
+ final MockEntityOwnershipListener node1Listener = registerListener(node1, ENTITY_1);
+ verifyListenerState(node1Listener, ENTITY_1, true, false, false);
+ }
+
+ private void startNode3() throws Exception {
+ startNode3(3);
+ }
+
+ private void startNode3(final int membersPresent) throws Exception {
+ node3 = startupRemote(2552, List.of("member-3"), THREE_NODE_SEED_NODES);
+
+ // need to wait until all nodes are ready
+ final Cluster cluster = Cluster.get(node2.getActorSystem());
+ await().atMost(Duration.ofSeconds(20)).until(() -> {
+ final List<Member> members = ImmutableList.copyOf(cluster.state().getMembers());
+ if (members.size() != membersPresent) {
+ return false;
+ }
+
+ for (final Member member : members) {
+ if (!member.status().equals(MemberStatus.up())) {
+ return false;
+ }
+ }
+
+ return true;
+ });
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.typed.Cluster;
+import akka.cluster.typed.ClusterSingleton;
+import akka.cluster.typed.SingletonActor;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Test;
+import org.opendaylight.controller.eos.akka.AbstractNativeEosTest;
+import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
+import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.owner.checker.OwnerStateChecker;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.EntityTypeListenerRegistry;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class OwnerSupervisorTest extends AbstractNativeEosTest {
+
+ @Test
+ public void testCandidatePickingWhenUnreachableCandidates() throws Exception {
+
+ final ClusterNode node = startup(2550, Collections.singletonList("member-1"));
+ try {
+ reachableMember(node, "member-2");
+ reachableMember(node, "member-3");
+ registerCandidates(node, ENTITY_1, "member-1", "member-2", "member-3");
+
+ final MockEntityOwnershipListener listener = registerListener(node, ENTITY_1);
+ verifyListenerState(listener, ENTITY_1,true, true, false);
+
+ unreachableMember(node, "member-1");
+ verifyListenerState(listener, ENTITY_1, true, false, true);
+
+ unreachableMember(node, "member-2");
+ verifyListenerState(listener, ENTITY_1, true, false, false);
+
+ unreachableMember(node, "member-3");
+ verifyListenerState(listener, ENTITY_1, false, false, false);
+
+ reachableMember(node, "member-2");
+ verifyListenerState(listener, ENTITY_1, true, false, false);
+
+ // no notification here as member-2 is already the owner
+ reachableMember(node, "member-1");
+
+ unreachableMember(node, "member-2");
+ verifyListenerState(listener, ENTITY_1,true, true, false);
+ } finally {
+ ActorTestKit.shutdown(node.getActorSystem());
+ }
+ }
+
+ @Test
+ public void testSupervisorInitWithMissingOwners() throws Exception {
+ final Map<DOMEntity, Set<String>> candidates = new HashMap<>();
+ candidates.put(ENTITY_1, Set.of("member-1"));
+ candidates.put(ENTITY_2, Set.of("member-2"));
+
+ final ClusterNode node = startup(2550, Collections.singletonList("member-1"), Collections.emptyList(),
+ () -> mockedBootstrap(candidates, new HashMap<>()));
+
+ try {
+ waitUntillOwnerPresent(node, ENTITY_1);
+
+ // also do a proper register so the listener from the type lister actor are spawned
+ registerCandidates(node, ENTITY_1, "member-1");
+ registerCandidates(node, ENTITY_2, "member-2");
+
+ final MockEntityOwnershipListener listener1 = registerListener(node, ENTITY_1);
+ final MockEntityOwnershipListener listener2 = registerListener(node, ENTITY_2);
+
+ // first entity should have correctly assigned owner as its reachable
+ verifyListenerState(listener1, ENTITY_1, true, true, false);
+ // this one could not be assigned during init as we dont have member-2 thats reachable
+ verifyListenerState(listener2, ENTITY_2, false, false, false);
+
+ reachableMember(node, "member-2");
+ verifyListenerState(listener2, ENTITY_2, true, false, false);
+ } finally {
+ ActorTestKit.shutdown(node.getActorSystem());
+ }
+ }
+
+ private static Behavior<BootstrapCommand> mockedBootstrap(final Map<DOMEntity, Set<String>> currentCandidates,
+ final Map<DOMEntity, String> currentOwners) {
+ return Behaviors.setup(context -> MockBootstrap.create(currentCandidates, currentOwners));
+ }
+
+ /**
+ * Initial behavior that skips initial sync and instead initializes OwnerSupervisor with provided values.
+ */
+ private static final class MockSyncer extends AbstractBehavior<OwnerSupervisorCommand> {
+
+ private final Map<DOMEntity, Set<String>> currentCandidates;
+ private final Map<DOMEntity, String> currentOwners;
+
+ private MockSyncer(final ActorContext<OwnerSupervisorCommand> context,
+ final Map<DOMEntity, Set<String>> currentCandidates,
+ final Map<DOMEntity, String> currentOwners) {
+ super(context);
+ this.currentCandidates = currentCandidates;
+ this.currentOwners = currentOwners;
+
+ context.getSelf().tell(new InitialCandidateSync(null));
+ }
+
+ public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
+ final Map<DOMEntity, String> currentOwners) {
+ return Behaviors.setup(ctx -> new MockSyncer(ctx, currentCandidates, currentOwners));
+ }
+
+ @Override
+ public Receive<OwnerSupervisorCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(InitialCandidateSync.class, this::switchToSupervisor)
+ .build();
+ }
+
+ private Behavior<OwnerSupervisorCommand> switchToSupervisor(final InitialCandidateSync message) {
+ return OwnerSupervisor.create(currentCandidates, currentOwners);
+ }
+ }
+
+ /**
+ * Bootstrap with OwnerSyncer replaced with the testing syncer behavior.
+ */
+ private static final class MockBootstrap extends AbstractBehavior<BootstrapCommand> {
+
+ private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
+ private final ActorRef<CandidateRegistryCommand> candidateRegistry;
+ private final ActorRef<StateCheckerCommand> ownerStateChecker;
+ private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+
+ private MockBootstrap(final ActorContext<BootstrapCommand> context,
+ final Map<DOMEntity, Set<String>> currentCandidates,
+ final Map<DOMEntity, String> currentOwners) {
+ super(context);
+
+ final Cluster cluster = Cluster.get(context.getSystem());
+ final String role = cluster.selfMember().getRoles().iterator().next();
+
+ listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry");
+ candidateRegistry = context.spawn(CandidateRegistry.create(), "CandidateRegistry");
+ ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker");
+
+ final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
+ // start the initial sync behavior that switches to the regular one after syncing
+ ownerSupervisor = clusterSingleton.init(SingletonActor.of(
+ MockSyncer.create(currentCandidates, currentOwners), "OwnerSupervisor"));
+ }
+
+ public static Behavior<BootstrapCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
+ final Map<DOMEntity, String> currentOwners) {
+ return Behaviors.setup(ctx -> new MockBootstrap(ctx, currentCandidates, currentOwners));
+ }
+
+ @Override
+ public Receive<BootstrapCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(GetRunningContext.class, this::onGetRunningContext)
+ .build();
+ }
+
+ private Behavior<BootstrapCommand> onGetRunningContext(final GetRunningContext request) {
+ request.getReplyTo().tell(
+ new RunningContext(listenerRegistry, candidateRegistry,ownerStateChecker, ownerSupervisor));
+ return this;
+ }
+ }
+
+}
\ No newline at end of file
--- /dev/null
+akka {
+ loglevel = debug
+ actor {
+ warn-about-java-serializer-usage = off
+ allow-java-serialization = on
+ provider = cluster
+ }
+
+ remote {
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2550
+ }
+ }
+ cluster {
+ seed-nodes = [
+ "akka://ClusterSystem@127.0.0.1:2550"]
+ roles = [
+ "member-1"
+ ]
+ downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
+ }
+}
+
--- /dev/null
+org.slf4j.simpleLogger.defaultLogLevel=info
+org.slf4j.simpleLogger.showDateTime=true
+org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
+org.slf4j.simpleLogger.logFile=System.out
+org.slf4j.simpleLogger.showShortLogName=true
+org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.opendaylight.controller.eos.akka=debug
<module>sal-cluster-admin-api</module>
<module>sal-cluster-admin-impl</module>
<module>sal-distributed-eos</module>
+ <module>eos-dom-akka</module>
<!-- Yang Test Models for MD-SAL -->
<module>sal-test-model</module>