From e1e6d8e34fd4c5c5c07c7a8063ffa94a8dbe2062 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Thu, 18 Mar 2021 11:32:54 +0100 Subject: [PATCH] Introduce DOMEntityOwnershipService replacement Introduce a new DOMEntityOwnershipService implementation backed by Akka's distributed-data and cluster-singleton. This severs an implementation link to sal-distributed-datastore, reducing overall complexity. JIRA: CONTROLLER-1982 Change-Id: I30753e83d1af10658141311842bdceb315b9237f Signed-off-by: Tomas Cere Signed-off-by: Robert Varga --- akka/repackaged-akka-jar/pom.xml | 10 + .../main/resources/actor_typed_reference.conf | 129 ++++++ .../resources/cluster_tools_reference.conf | 231 +++++++++++ .../resources/cluster_typed_reference.conf | 66 ++++ .../resources/distributed_data_reference.conf | 159 ++++++++ .../src/main/resources/reference.conf | 4 + akka/repackaged-akka/pom.xml | 19 + artifacts/pom.xml | 5 + bundle-parent/pom.xml | 16 + .../src/main/history/dependencies.xml | 2 + .../odl-mdsal-distributed-datastore/pom.xml | 2 +- opendaylight/md-sal/eos-dom-akka/pom.xml | 94 +++++ .../eos/akka/AkkaEntityOwnershipService.java | 177 +++++++++ .../eos/akka/CandidateRegistration.java | 29 ++ .../eos/akka/ListenerRegistration.java | 44 +++ .../eos/akka/bootstrap/EOSMain.java | 75 ++++ .../bootstrap/command/BootstrapCommand.java | 14 + .../bootstrap/command/GetRunningContext.java | 24 ++ .../bootstrap/command/RunningContext.java | 50 +++ .../eos/akka/bootstrap/command/Terminate.java | 26 ++ .../akka/owner/checker/OwnerStateChecker.java | 93 +++++ .../checker/command/GetOwnershipState.java | 32 ++ .../command/GetOwnershipStateReply.java | 23 ++ .../checker/command/InternalGetReply.java | 41 ++ .../checker/command/StateCheckerCommand.java | 14 + .../checker/command/StateCheckerReply.java | 14 + .../owner/supervisor/OwnerSupervisor.java | 369 ++++++++++++++++++ .../akka/owner/supervisor/OwnerSyncer.java | 150 +++++++ .../supervisor/command/CandidatesChanged.java | 34 ++ .../command/InitialCandidateSync.java | 26 ++ .../supervisor/command/InitialOwnerSync.java | 26 ++ .../command/InternalClusterEvent.java | 38 ++ .../supervisor/command/MemberDownEvent.java | 17 + .../command/MemberReachableEvent.java | 17 + .../command/MemberUnreachableEvent.java | 17 + .../supervisor/command/MemberUpEvent.java | 17 + .../supervisor/command/OwnerChanged.java | 26 ++ .../command/OwnerSupervisorCommand.java | 14 + .../registry/candidate/CandidateRegistry.java | 105 +++++ .../command/AbstractCandidateCommand.java | 37 ++ .../command/CandidateRegistryCommand.java | 14 + .../command/InternalUpdateResponse.java | 28 ++ .../candidate/command/RegisterCandidate.java | 19 + .../command/UnregisterCandidate.java | 19 + .../owner/SingleEntityListenerActor.java | 149 +++++++ .../owner/command/InitialOwnerSync.java | 26 ++ .../owner/command/ListenerCommand.java | 14 + .../listener/owner/command/OwnerChanged.java | 29 ++ .../type/EntityTypeListenerActor.java | 120 ++++++ .../type/EntityTypeListenerRegistry.java | 75 ++++ .../type/command/CandidatesChanged.java | 37 ++ .../type/command/EntityOwnerChanged.java | 34 ++ .../type/command/RegisterListener.java | 19 + .../type/command/TypeListenerCommand.java | 14 + .../command/TypeListenerRegistryCommand.java | 31 ++ .../type/command/UnregisterListener.java | 19 + .../eos/akka/AbstractNativeEosTest.java | 291 ++++++++++++++ .../akka/AkkaEntityOwnershipServiceTest.java | 245 ++++++++++++ .../controller/eos/akka/SingleNodeTest.java | 93 +++++ .../eos/akka/ThreeNodeBaseTest.java | 153 ++++++++ .../eos/akka/ThreeNodeReachabilityTest.java | 251 ++++++++++++ .../owner/supervisor/OwnerSupervisorTest.java | 193 +++++++++ .../src/test/resources/application.conf | 25 ++ .../test/resources/simplelogger.properties | 7 + opendaylight/md-sal/pom.xml | 1 + 65 files changed, 4191 insertions(+), 1 deletion(-) create mode 100644 akka/repackaged-akka-jar/src/main/resources/actor_typed_reference.conf create mode 100644 akka/repackaged-akka-jar/src/main/resources/cluster_tools_reference.conf create mode 100644 akka/repackaged-akka-jar/src/main/resources/cluster_typed_reference.conf create mode 100644 akka/repackaged-akka-jar/src/main/resources/distributed_data_reference.conf create mode 100644 opendaylight/md-sal/eos-dom-akka/pom.xml create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/CandidateRegistration.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/ListenerRegistration.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/BootstrapCommand.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/GetRunningContext.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/RunningContext.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/Terminate.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipState.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipStateReply.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/InternalGetReply.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerCommand.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerReply.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/CandidatesChanged.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialCandidateSync.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialOwnerSync.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InternalClusterEvent.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberDownEvent.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberReachableEvent.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUnreachableEvent.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUpEvent.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerChanged.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorCommand.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistry.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/AbstractCandidateCommand.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRegistryCommand.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InternalUpdateResponse.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/RegisterCandidate.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/UnregisterCandidate.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/SingleEntityListenerActor.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/InitialOwnerSync.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/ListenerCommand.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/OwnerChanged.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerRegistry.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/CandidatesChanged.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/EntityOwnerChanged.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/RegisterListener.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerCommand.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerRegistryCommand.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/UnregisterListener.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/SingleNodeTest.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeBaseTest.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeReachabilityTest.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/test/resources/application.conf create mode 100644 opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties diff --git a/akka/repackaged-akka-jar/pom.xml b/akka/repackaged-akka-jar/pom.xml index eb2393b4ff..e637cadbd3 100644 --- a/akka/repackaged-akka-jar/pom.xml +++ b/akka/repackaged-akka-jar/pom.xml @@ -35,11 +35,21 @@ akka-actor_2.13 2.6.12 + + com.typesafe.akka + akka-actor-typed_2.13 + 2.6.12 + com.typesafe.akka akka-cluster_2.13 2.6.12 + + com.typesafe.akka + akka-cluster-typed_2.13 + 2.6.12 + com.typesafe.akka akka-osgi_2.13 diff --git a/akka/repackaged-akka-jar/src/main/resources/actor_typed_reference.conf b/akka/repackaged-akka-jar/src/main/resources/actor_typed_reference.conf new file mode 100644 index 0000000000..d34d52aeef --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/actor_typed_reference.conf @@ -0,0 +1,129 @@ +akka.actor.typed { + + # List FQCN of `akka.actor.typed.ExtensionId`s which shall be loaded at actor system startup. + # Should be on the format: 'extensions = ["com.example.MyExtId1", "com.example.MyExtId2"]' etc. + # See the Akka Documentation for more info about Extensions + extensions = [] + + # List FQCN of extensions which shall be loaded at actor system startup. + # Library extensions are regular extensions that are loaded at startup and are + # available for third party library authors to enable auto-loading of extensions when + # present on the classpath. This is done by appending entries: + # 'library-extensions += "Extension"' in the library `reference.conf`. + # + # Should not be set by end user applications in 'application.conf', use the extensions property for that + # + library-extensions = ${?akka.actor.typed.library-extensions} [] + + # Receptionist is started eagerly to allow clustered receptionist to gather remote registrations early on. + library-extensions += "akka.actor.typed.receptionist.Receptionist$" + + # While an actor is restarted (waiting for backoff to expire and children to stop) + # incoming messages and signals are stashed, and delivered later to the newly restarted + # behavior. This property defines the capacity in number of messages of the stash + # buffer. If the capacity is exceed then additional incoming messages are dropped. + restart-stash-capacity = 1000 + + # Typed mailbox defaults to the single consumber mailbox as balancing dispatcher is not supported + default-mailbox { + mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + } +} + +# Load typed extensions by a classic extension. +akka.library-extensions += "akka.actor.typed.internal.adapter.ActorSystemAdapter$LoadTypedExtensions" + +akka.actor { + serializers { + typed-misc = "akka.actor.typed.internal.MiscMessageSerializer" + service-key = "akka.actor.typed.internal.receptionist.ServiceKeySerializer" + } + + serialization-identifiers { + "akka.actor.typed.internal.MiscMessageSerializer" = 24 + "akka.actor.typed.internal.receptionist.ServiceKeySerializer" = 26 + } + + serialization-bindings { + "akka.actor.typed.ActorRef" = typed-misc + "akka.actor.typed.internal.adapter.ActorRefAdapter" = typed-misc + "akka.actor.typed.internal.receptionist.DefaultServiceKey" = service-key + } +} + +# When using Akka Typed (having akka-actor-typed in classpath) the +# akka.event.slf4j.Slf4jLogger is enabled instead of the DefaultLogger +# even though it has not been explicitly defined in `akka.loggers` +# configuration. +# +# Slf4jLogger will be used for all Akka classic logging via eventStream, +# including logging from Akka internals. The Slf4jLogger is then using +# an ordinary org.slf4j.Logger to emit the log events. +# +# The Slf4jLoggingFilter is also enabled automatically. +# +# This behavior can be disabled by setting this property to `off`. +akka.use-slf4j = on + +akka.reliable-delivery { + producer-controller { + + # To avoid head of line blocking from serialization and transfer + # of large messages this can be enabled. + # Large messages are chunked into pieces of the given size in bytes. The + # chunked messages are sent separatetely and assembled on the consumer side. + # Serialization and deserialization is performed by the ProducerController and + # ConsumerController respectively instead of in the remote transport layer. + chunk-large-messages = off + + durable-queue { + # The ProducerController uses this timeout for the requests to + # the durable queue. If there is no reply within the timeout it + # will be retried. + request-timeout = 3s + + # The ProducerController retries requests to the durable queue this + # number of times before failing. + retry-attempts = 10 + + # The ProducerController retries sending the first message with this interval + # until it has been confirmed. + resend-first-interval = 1s + } + } + + consumer-controller { + # Number of messages in flight between ProducerController and + # ConsumerController. The ConsumerController requests for more messages + # when half of the window has been used. + flow-control-window = 50 + + # The ConsumerController resends flow control messages to the + # ProducerController with the resend-interval-min, and increasing + # it gradually to resend-interval-max when idle. + resend-interval-min = 2s + resend-interval-max = 30s + + # If this is enabled lost messages will not be resent, but flow control is used. + # This can be more efficient since messages don't have to be + # kept in memory in the `ProducerController` until they have been + # confirmed, but the drawback is that lost messages will not be delivered. + only-flow-control = false + } + + work-pulling { + producer-controller = ${akka.reliable-delivery.producer-controller} + producer-controller { + # Limit of how many messages that can be buffered when there + # is no demand from the consumer side. + buffer-size = 1000 + + # Ask timeout for sending message to worker until receiving Ack from worker + internal-ask-timeout = 60s + + # Chunked messages not implemented for work-pulling yet. Override to not + # propagate property from akka.reliable-delivery.producer-controller. + chunk-large-messages = off + } + } +} diff --git a/akka/repackaged-akka-jar/src/main/resources/cluster_tools_reference.conf b/akka/repackaged-akka-jar/src/main/resources/cluster_tools_reference.conf new file mode 100644 index 0000000000..6fb9b323ca --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/cluster_tools_reference.conf @@ -0,0 +1,231 @@ +############################################ +# Akka Cluster Tools Reference Config File # +############################################ + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + +# //#pub-sub-ext-config +# Settings for the DistributedPubSub extension +akka.cluster.pub-sub { + # Actor name of the mediator actor, /system/distributedPubSubMediator + name = distributedPubSubMediator + + # Start the mediator on members tagged with this role. + # All members are used if undefined or empty. + role = "" + + # The routing logic to use for 'Send' + # Possible values: random, round-robin, broadcast + routing-logic = random + + # How often the DistributedPubSubMediator should send out gossip information + gossip-interval = 1s + + # Removed entries are pruned after this duration + removed-time-to-live = 120s + + # Maximum number of elements to transfer in one message when synchronizing the registries. + # Next chunk will be transferred in next round of gossip. + max-delta-elements = 3000 + + # When a message is published to a topic with no subscribers send it to the dead letters. + send-to-dead-letters-when-no-subscribers = on + + # The id of the dispatcher to use for DistributedPubSubMediator actors. + # If specified you need to define the settings of the actual dispatcher. + use-dispatcher = "akka.actor.internal-dispatcher" +} +# //#pub-sub-ext-config + +# Protobuf serializer for cluster DistributedPubSubMeditor messages +akka.actor { + serializers { + akka-pubsub = "akka.cluster.pubsub.protobuf.DistributedPubSubMessageSerializer" + } + serialization-bindings { + "akka.cluster.pubsub.DistributedPubSubMessage" = akka-pubsub + "akka.cluster.pubsub.DistributedPubSubMediator$Internal$SendToOneSubscriber" = akka-pubsub + } + serialization-identifiers { + "akka.cluster.pubsub.protobuf.DistributedPubSubMessageSerializer" = 9 + } +} + + +# //#receptionist-ext-config +# Settings for the ClusterClientReceptionist extension +akka.cluster.client.receptionist { + # Actor name of the ClusterReceptionist actor, /system/receptionist + name = receptionist + + # Start the receptionist on members tagged with this role. + # All members are used if undefined or empty. + role = "" + + # The receptionist will send this number of contact points to the client + number-of-contacts = 3 + + # The actor that tunnel response messages to the client will be stopped + # after this time of inactivity. + response-tunnel-receive-timeout = 30s + + # The id of the dispatcher to use for ClusterReceptionist actors. + # If specified you need to define the settings of the actual dispatcher. + use-dispatcher = "akka.actor.internal-dispatcher" + + # How often failure detection heartbeat messages should be received for + # each ClusterClient + heartbeat-interval = 2s + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # The ClusterReceptionist is using the akka.remote.DeadlineFailureDetector, which + # will trigger if there are no heartbeats within the duration + # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with + # the default settings. + acceptable-heartbeat-pause = 13s + + # Failure detection checking interval for checking all ClusterClients + failure-detection-interval = 2s +} +# //#receptionist-ext-config + +# //#cluster-client-config +# Settings for the ClusterClient +akka.cluster.client { + # Actor paths of the ClusterReceptionist actors on the servers (cluster nodes) + # that the client will try to contact initially. It is mandatory to specify + # at least one initial contact. + # Comma separated full actor paths defined by a string on the form of + # "akka://system@hostname:port/system/receptionist" + initial-contacts = [] + + # Interval at which the client retries to establish contact with one of + # ClusterReceptionist on the servers (cluster nodes) + establishing-get-contacts-interval = 3s + + # Interval at which the client will ask the ClusterReceptionist for + # new contact points to be used for next reconnect. + refresh-contacts-interval = 60s + + # How often failure detection heartbeat messages should be sent + heartbeat-interval = 2s + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # The ClusterClient is using the akka.remote.DeadlineFailureDetector, which + # will trigger if there are no heartbeats within the duration + # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with + # the default settings. + acceptable-heartbeat-pause = 13s + + # If connection to the receptionist is not established the client will buffer + # this number of messages and deliver them the connection is established. + # When the buffer is full old messages will be dropped when new messages are sent + # via the client. Use 0 to disable buffering, i.e. messages will be dropped + # immediately if the location of the singleton is unknown. + # Maximum allowed buffer size is 10000. + buffer-size = 1000 + + # If connection to the receiptionist is lost and the client has not been + # able to acquire a new connection for this long the client will stop itself. + # This duration makes it possible to watch the cluster client and react on a more permanent + # loss of connection with the cluster, for example by accessing some kind of + # service registry for an updated set of initial contacts to start a new cluster client with. + # If this is not wanted it can be set to "off" to disable the timeout and retry + # forever. + reconnect-timeout = off +} +# //#cluster-client-config + +# Protobuf serializer for ClusterClient messages +akka.actor { + serializers { + akka-cluster-client = "akka.cluster.client.protobuf.ClusterClientMessageSerializer" + } + serialization-bindings { + "akka.cluster.client.ClusterClientMessage" = akka-cluster-client + } + serialization-identifiers { + "akka.cluster.client.protobuf.ClusterClientMessageSerializer" = 15 + } +} + +# //#singleton-config +akka.cluster.singleton { + # The actor name of the child singleton actor. + singleton-name = "singleton" + + # Singleton among the nodes tagged with specified role. + # If the role is not specified it's a singleton among all nodes in the cluster. + role = "" + + # When a node is becoming oldest it sends hand-over request to previous oldest, + # that might be leaving the cluster. This is retried with this interval until + # the previous oldest confirms that the hand over has started or the previous + # oldest member is removed from the cluster (+ akka.cluster.down-removal-margin). + hand-over-retry-interval = 1s + + # The number of retries are derived from hand-over-retry-interval and + # akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin), + # but it will never be less than this property. + # After the hand over retries and it's still not able to exchange the hand over messages + # with the previous oldest it will restart itself by throwing ClusterSingletonManagerIsStuck, + # to start from a clean state. After that it will still not start the singleton instance + # until the previous oldest node has been removed from the cluster. + # On the other side, on the previous oldest node, the same number of retries - 3 are used + # and after that the singleton instance is stopped. + # For large clusters it might be necessary to increase this to avoid too early timeouts while + # gossip dissemination of the Leaving to Exiting phase occurs. For normal leaving scenarios + # it will not be a quicker hand over by reducing this value, but in extreme failure scenarios + # the recovery might be faster. + min-number-of-hand-over-retries = 15 + + # Config path of the lease to be taken before creating the singleton actor + # if the lease is lost then the actor is restarted and it will need to re-acquire the lease + # the default is no lease + use-lease = "" + + # The interval between retries for acquiring the lease + lease-retry-interval = 5s +} +# //#singleton-config + +# //#singleton-proxy-config +akka.cluster.singleton-proxy { + # The actor name of the singleton actor that is started by the ClusterSingletonManager + singleton-name = ${akka.cluster.singleton.singleton-name} + + # The role of the cluster nodes where the singleton can be deployed. + # Corresponding to the role used by the `ClusterSingletonManager`. If the role is not + # specified it's a singleton among all nodes in the cluster, and the `ClusterSingletonManager` + # must then also be configured in same way. + role = "" + + # Interval at which the proxy will try to resolve the singleton instance. + singleton-identification-interval = 1s + + # If the location of the singleton is unknown the proxy will buffer this + # number of messages and deliver them when the singleton is identified. + # When the buffer is full old messages will be dropped when new messages are + # sent via the proxy. + # Use 0 to disable buffering, i.e. messages will be dropped immediately if + # the location of the singleton is unknown. + # Maximum allowed buffer size is 10000. + buffer-size = 1000 +} +# //#singleton-proxy-config + +# Serializer for cluster ClusterSingleton messages +akka.actor { + serializers { + akka-singleton = "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer" + } + serialization-bindings { + "akka.cluster.singleton.ClusterSingletonMessage" = akka-singleton + } + serialization-identifiers { + "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer" = 14 + } +} \ No newline at end of file diff --git a/akka/repackaged-akka-jar/src/main/resources/cluster_typed_reference.conf b/akka/repackaged-akka-jar/src/main/resources/cluster_typed_reference.conf new file mode 100644 index 0000000000..4cd45a5d24 --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/cluster_typed_reference.conf @@ -0,0 +1,66 @@ +############################################ +# Akka Cluster Typed Reference Config File # +############################################ + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + +akka.cluster.typed.receptionist { + # Updates with Distributed Data are done with this consistency level. + # Possible values: local, majority, all, 2, 3, 4 (n) + write-consistency = local + + # Period task to remove actor references that are hosted by removed nodes, + # in case of abrupt termination. + pruning-interval = 3 s + + # The periodic task to remove actor references that are hosted by removed nodes + # will only remove entries older than this duration. The reason for this + # is to avoid removing entries of nodes that haven't been visible as joining. + prune-removed-older-than = 60 s + + # Shard the services over this many Distributed Data keys, with large amounts of different + # service keys storing all of them in the same Distributed Data entry would lead to large updates + # etc. instead the keys are sharded across this number of keys. This must be the same on all nodes + # in a cluster, changing it requires a full cluster restart (stopping all nodes before starting them again) + distributed-key-count = 5 + + # Settings for the Distributed Data replicator used by Receptionist. + # Same layout as akka.cluster.distributed-data. + distributed-data = ${akka.cluster.distributed-data} + # make sure that by default it's for all roles (Play loads config in different way) + distributed-data.role = "" +} + +akka.cluster.ddata.typed { + # The timeout to use for ask operations in ReplicatorMessageAdapter. + # This should be longer than the timeout given in Replicator.WriteConsistency and + # Replicator.ReadConsistency. The replicator will always send a reply within those + # timeouts so the unexpected ask timeout should not occur, but for cleanup in a + # failure situation it must still exist. + # If askUpdate, askGet or askDelete takes longer then this timeout a + # java.util.concurrent.TimeoutException will be thrown by the requesting actor and + # may be handled by supervision. + replicator-message-adapter-unexpected-ask-timeout = 20 s +} + +akka { + actor { + serialization-identifiers { + "akka.cluster.typed.internal.AkkaClusterTypedSerializer" = 28 + "akka.cluster.typed.internal.delivery.ReliableDeliverySerializer" = 36 + } + serializers { + typed-cluster = "akka.cluster.typed.internal.AkkaClusterTypedSerializer" + reliable-delivery = "akka.cluster.typed.internal.delivery.ReliableDeliverySerializer" + } + serialization-bindings { + "akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster + "akka.actor.typed.internal.pubsub.TopicImpl$MessagePublished" = typed-cluster + "akka.actor.typed.delivery.internal.DeliverySerializable" = reliable-delivery + } + } + cluster.configuration-compatibility-check.checkers { + receptionist = "akka.cluster.typed.internal.receptionist.ClusterReceptionistConfigCompatChecker" + } +} diff --git a/akka/repackaged-akka-jar/src/main/resources/distributed_data_reference.conf b/akka/repackaged-akka-jar/src/main/resources/distributed_data_reference.conf new file mode 100644 index 0000000000..f716157bd5 --- /dev/null +++ b/akka/repackaged-akka-jar/src/main/resources/distributed_data_reference.conf @@ -0,0 +1,159 @@ +############################################## +# Akka Distributed DataReference Config File # +############################################## + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + + +#//#distributed-data +# Settings for the DistributedData extension +akka.cluster.distributed-data { + # Actor name of the Replicator actor, /system/ddataReplicator + name = ddataReplicator + + # Replicas are running on members tagged with this role. + # All members are used if undefined or empty. + role = "" + + # How often the Replicator should send out gossip information + gossip-interval = 2 s + + # How often the subscribers will be notified of changes, if any + notify-subscribers-interval = 500 ms + + # Logging of data with payload size in bytes larger than + # this value. Maximum detected size per key is logged once, + # with an increase threshold of 10%. + # It can be disabled by setting the property to off. + log-data-size-exceeding = 10 KiB + + # Maximum number of entries to transfer in one round of gossip exchange when + # synchronizing the replicas. Next chunk will be transferred in next round of gossip. + # The actual number of data entries in each Gossip message is dynamically + # adjusted to not exceed the maximum remote message size (maximum-frame-size). + max-delta-elements = 500 + + # The id of the dispatcher to use for Replicator actors. + # If specified you need to define the settings of the actual dispatcher. + use-dispatcher = "akka.actor.internal-dispatcher" + + # How often the Replicator checks for pruning of data associated with + # removed cluster nodes. If this is set to 'off' the pruning feature will + # be completely disabled. + pruning-interval = 120 s + + # How long time it takes to spread the data to all other replica nodes. + # This is used when initiating and completing the pruning process of data associated + # with removed cluster nodes. The time measurement is stopped when any replica is + # unreachable, but it's still recommended to configure this with certain margin. + # It should be in the magnitude of minutes even though typical dissemination time + # is shorter (grows logarithmic with number of nodes). There is no advantage of + # setting this too low. Setting it to large value will delay the pruning process. + max-pruning-dissemination = 300 s + + # The markers of that pruning has been performed for a removed node are kept for this + # time and thereafter removed. If and old data entry that was never pruned is somehow + # injected and merged with existing data after this time the value will not be correct. + # This would be possible (although unlikely) in the case of a long network partition. + # It should be in the magnitude of hours. For durable data it is configured by + # 'akka.cluster.distributed-data.durable.pruning-marker-time-to-live'. + pruning-marker-time-to-live = 6 h + + # Serialized Write and Read messages are cached when they are sent to + # several nodes. If no further activity they are removed from the cache + # after this duration. + serializer-cache-time-to-live = 10s + + # Update and Get operations are sent to oldest nodes first. + # This is useful together with Cluster Singleton, which is running on oldest nodes. + prefer-oldest = off + + # Settings for delta-CRDT + delta-crdt { + # enable or disable delta-CRDT replication + enabled = on + + # Some complex deltas grow in size for each update and above this + # threshold such deltas are discarded and sent as full state instead. + # This is number of elements or similar size hint, not size in bytes. + max-delta-size = 50 + } + + durable { + # List of keys that are durable. Prefix matching is supported by using * at the + # end of a key. + keys = [] + + # The markers of that pruning has been performed for a removed node are kept for this + # time and thereafter removed. If and old data entry that was never pruned is + # injected and merged with existing data after this time the value will not be correct. + # This would be possible if replica with durable data didn't participate in the pruning + # (e.g. it was shutdown) and later started after this time. A durable replica should not + # be stopped for longer time than this duration and if it is joining again after this + # duration its data should first be manually removed (from the lmdb directory). + # It should be in the magnitude of days. Note that there is a corresponding setting + # for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'. + pruning-marker-time-to-live = 10 d + + # Fully qualified class name of the durable store actor. It must be a subclass + # of akka.actor.Actor and handle the protocol defined in + # akka.cluster.ddata.DurableStore. The class must have a constructor with + # com.typesafe.config.Config parameter. + store-actor-class = akka.cluster.ddata.LmdbDurableStore + + use-dispatcher = akka.cluster.distributed-data.durable.pinned-store + + pinned-store { + executor = thread-pool-executor + type = PinnedDispatcher + } + + # Config for the LmdbDurableStore + lmdb { + # Directory of LMDB file. There are two options: + # 1. A relative or absolute path to a directory that ends with 'ddata' + # the full name of the directory will contain name of the ActorSystem + # and its remote port. + # 2. Otherwise the path is used as is, as a relative or absolute path to + # a directory. + # + # When running in production you may want to configure this to a specific + # path (alt 2), since the default directory contains the remote port of the + # actor system to make the name unique. If using a dynamically assigned + # port (0) it will be different each time and the previously stored data + # will not be loaded. + dir = "ddata" + + # Size in bytes of the memory mapped file. + map-size = 100 MiB + + # Accumulate changes before storing improves performance with the + # risk of losing the last writes if the JVM crashes. + # The interval is by default set to 'off' to write each update immediately. + # Enabling write behind by specifying a duration, e.g. 200ms, is especially + # efficient when performing many writes to the same key, because it is only + # the last value for each key that will be serialized and stored. + # write-behind-interval = 200 ms + write-behind-interval = off + } + } + +} +#//#distributed-data + +# Protobuf serializer for cluster DistributedData messages +akka.actor { + serializers { + akka-data-replication = "akka.cluster.ddata.protobuf.ReplicatorMessageSerializer" + akka-replicated-data = "akka.cluster.ddata.protobuf.ReplicatedDataSerializer" + } + serialization-bindings { + "akka.cluster.ddata.Replicator$ReplicatorMessage" = akka-data-replication + "akka.cluster.ddata.ReplicatedDataSerialization" = akka-replicated-data + } + serialization-identifiers { + "akka.cluster.ddata.protobuf.ReplicatedDataSerializer" = 11 + "akka.cluster.ddata.protobuf.ReplicatorMessageSerializer" = 12 + } +} diff --git a/akka/repackaged-akka-jar/src/main/resources/reference.conf b/akka/repackaged-akka-jar/src/main/resources/reference.conf index e096415d61..7e60da8488 100644 --- a/akka/repackaged-akka-jar/src/main/resources/reference.conf +++ b/akka/repackaged-akka-jar/src/main/resources/reference.conf @@ -1,5 +1,9 @@ include "actor_reference.conf" +include "actor_typed_reference.conf" include "cluster_reference.conf" +include "cluster_tools_reference.conf" +include "cluster_typed_reference.conf" +include "distributed_data_reference.conf" include "persistence_reference.conf" include "remote_reference.conf" include "stream_reference.conf" diff --git a/akka/repackaged-akka/pom.xml b/akka/repackaged-akka/pom.xml index 4416bf203f..1b436a98de 100644 --- a/akka/repackaged-akka/pom.xml +++ b/akka/repackaged-akka/pom.xml @@ -67,6 +67,25 @@ org.reactivestreams reactive-streams + + org.lmdbjava + lmdbjava + 0.7.0 + + + com.github.jnr + jffi + + + com.github.jnr + jnr-ffi + + + com.github.jnr + jnr-constants + + + org.scala-lang scala-library diff --git a/artifacts/pom.xml b/artifacts/pom.xml index 33c5a42f85..6e83e398fe 100644 --- a/artifacts/pom.xml +++ b/artifacts/pom.xml @@ -151,6 +151,11 @@ cds-mgmt-api ${project.version} + + ${project.groupId} + eos-dom-akka + ${project.version} + diff --git a/bundle-parent/pom.xml b/bundle-parent/pom.xml index 7bc1df3a67..c2776c0c76 100644 --- a/bundle-parent/pom.xml +++ b/bundle-parent/pom.xml @@ -84,6 +84,22 @@ + + com.typesafe.akka + akka-actor-testkit-typed_2.13 + 2.6.12 + test + + + com.typesafe.akka + akka-actor-typed_2.13 + + + com.typesafe.akka + akka-slf4j_2.13 + + + com.typesafe.akka akka-persistence-tck_2.13 diff --git a/features/odl-controller-akka/src/main/history/dependencies.xml b/features/odl-controller-akka/src/main/history/dependencies.xml index eec1a763ba..35f150fd53 100644 --- a/features/odl-controller-akka/src/main/history/dependencies.xml +++ b/features/odl-controller-akka/src/main/history/dependencies.xml @@ -10,5 +10,7 @@ mvn:org.agrona/agrona/1.8.0 mvn:org.opendaylight.controller/repackaged-akka/${project.version} mvn:org.reactivestreams/reactive-streams/1.0.3 + wrap + wrap:mvn:org.lmdbjava/lmdbjava/0.7.0 diff --git a/features/odl-mdsal-distributed-datastore/pom.xml b/features/odl-mdsal-distributed-datastore/pom.xml index 800b038622..dd3bc1ef87 100644 --- a/features/odl-mdsal-distributed-datastore/pom.xml +++ b/features/odl-mdsal-distributed-datastore/pom.xml @@ -87,7 +87,7 @@ org.opendaylight.controller - sal-distributed-eos + eos-dom-akka org.opendaylight.controller diff --git a/opendaylight/md-sal/eos-dom-akka/pom.xml b/opendaylight/md-sal/eos-dom-akka/pom.xml new file mode 100644 index 0000000000..510ef37c1b --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/pom.xml @@ -0,0 +1,94 @@ + + + + + 4.0.0 + + org.opendaylight.controller + bundle-parent + 4.0.0-SNAPSHOT + ../../../bundle-parent + + + eos-dom-akka + bundle + + + true + + + + + com.google.guava + guava + + + com.typesafe + config + + + org.opendaylight.controller + repackaged-akka + + + org.opendaylight.controller + sal-clustering-commons + + + org.opendaylight.mdsal + mdsal-eos-common-api + + + org.opendaylight.mdsal + mdsal-eos-dom-api + + + org.opendaylight.yangtools + concepts + + + org.opendaylight.yangtools + yang-common + + + org.opendaylight.yangtools + yang-data-api + + + org.osgi + org.osgi.service.component.annotations + + + com.guicedee.services + javax.inject + true + + + javax.annotation + javax.annotation-api + provided + true + + + org.scala-lang + scala-library + + + + com.typesafe.akka + akka-actor-testkit-typed_2.13 + + + org.awaitility + awaitility + + + diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java new file mode 100644 index 0000000000..27fbad4474 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka; + +import akka.actor.ActorSystem; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Scheduler; +import akka.actor.typed.javadsl.Adapter; +import akka.actor.typed.javadsl.AskPattern; +import akka.actor.typed.javadsl.Behaviors; +import akka.cluster.typed.Cluster; +import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.opendaylight.controller.cluster.ActorSystemProvider; +import org.opendaylight.controller.eos.akka.bootstrap.EOSMain; +import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand; +import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext; +import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext; +import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply; +import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate; +import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener; +import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException; +import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DOMEntityOwnershipService implementation backed by native Akka clustering constructs. We use distributed-data + * to track all registered candidates and cluster-singleton to maintain a single cluster-wide authority which selects + * the appropriate owners. + */ +@Singleton +@Component(immediate = true, service = DOMEntityOwnershipService.class) +public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class); + private static final String DATACENTER_PREFIX = "dc"; + + private final Set registeredEntities = ConcurrentHashMap.newKeySet(); + private final String localCandidate; + private final Scheduler scheduler; + + private final ActorRef bootstrap; + private final RunningContext runningContext; + private final ActorRef candidateRegistry; + private final ActorRef listenerRegistry; + private final ActorRef ownerStateChecker; + + @VisibleForTesting + AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException { + final var typedActorSystem = Adapter.toTyped(actorSystem); + + scheduler = typedActorSystem.scheduler(); + localCandidate = Cluster.get(typedActorSystem).selfMember().getRoles().stream() + .filter(role -> !role.contains(DATACENTER_PREFIX)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No valid role found.")); + + bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(context -> EOSMain.create()), "EOSBootstrap"); + + final CompletionStage ask = AskPattern.ask(bootstrap, + GetRunningContext::new, Duration.ofSeconds(5), scheduler); + runningContext = ask.toCompletableFuture().get(); + + candidateRegistry = runningContext.getCandidateRegistry(); + listenerRegistry = runningContext.getListenerRegistry(); + ownerStateChecker = runningContext.getOwnerStateChecker(); + } + + @Inject + @Activate + public AkkaEntityOwnershipService(@Reference final ActorSystemProvider provider) + throws ExecutionException, InterruptedException { + this(provider.getActorSystem()); + } + + @PreDestroy + @Deactivate + @Override + public void close() throws InterruptedException, ExecutionException { + AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get(); + } + + @Override + public DOMEntityOwnershipCandidateRegistration registerCandidate(final DOMEntity entity) + throws CandidateAlreadyRegisteredException { + if (!registeredEntities.add(entity)) { + throw new CandidateAlreadyRegisteredException(entity); + } + + final RegisterCandidate msg = new RegisterCandidate(entity, localCandidate); + LOG.debug("Registering candidate with message: {}", msg); + candidateRegistry.tell(msg); + + return new CandidateRegistration(entity, this); + } + + @Override + public DOMEntityOwnershipListenerRegistration registerListener(final String entityType, + final DOMEntityOwnershipListener listener) { + LOG.debug("Registering listener {} for type {}", listener, entityType); + listenerRegistry.tell(new RegisterListener(entityType, listener)); + + return new ListenerRegistration(listener, entityType, this); + } + + @Override + public Optional getOwnershipState(final DOMEntity entity) { + LOG.debug("Retrieving ownership state for {}", entity); + + final CompletionStage result = AskPattern.ask(ownerStateChecker, + replyTo -> new GetOwnershipState(entity, replyTo), + Duration.ofSeconds(5), scheduler); + + final GetOwnershipStateReply reply; + try { + reply = result.toCompletableFuture().get(); + } catch (final InterruptedException | ExecutionException exception) { + LOG.warn("Failed to retrieve ownership state for {}", entity, exception); + return Optional.empty(); + } + + return Optional.ofNullable(reply.getOwnershipState()); + } + + @Override + public boolean isCandidateRegistered(final DOMEntity forEntity) { + return registeredEntities.contains(forEntity); + } + + void unregisterCandidate(final DOMEntity entity) { + LOG.debug("Unregistering candidate for {}", entity); + + if (registeredEntities.remove(entity)) { + candidateRegistry.tell(new UnregisterCandidate(entity, localCandidate)); + } + } + + void unregisterListener(final String entityType, final DOMEntityOwnershipListener listener) { + LOG.debug("Unregistering listener {} for type {}", listener, entityType); + + listenerRegistry.tell(new UnregisterListener(entityType, listener)); + } + + @VisibleForTesting + RunningContext getRunningContext() { + return runningContext; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/CandidateRegistration.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/CandidateRegistration.java new file mode 100644 index 0000000000..56a2f099f6 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/CandidateRegistration.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka; + +import static java.util.Objects.requireNonNull; + +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration; +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; + +final class CandidateRegistration extends AbstractObjectRegistration + implements DOMEntityOwnershipCandidateRegistration { + private final AkkaEntityOwnershipService service; + + CandidateRegistration(final DOMEntity instance, final AkkaEntityOwnershipService service) { + super(instance); + this.service = requireNonNull(service); + } + + @Override + protected void removeRegistration() { + service.unregisterCandidate(getInstance()); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/ListenerRegistration.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/ListenerRegistration.java new file mode 100644 index 0000000000..435babe8ec --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/ListenerRegistration.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka; + +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration; +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; + +final class ListenerRegistration extends AbstractObjectRegistration + implements DOMEntityOwnershipListenerRegistration { + private final AkkaEntityOwnershipService service; + private final @NonNull String entityType; + + ListenerRegistration(final DOMEntityOwnershipListener listener, final String entityType, + final AkkaEntityOwnershipService service) { + super(listener); + this.entityType = requireNonNull(entityType); + this.service = requireNonNull(service); + } + + @Override + public String getEntityType() { + return entityType; + } + + @Override + protected void removeRegistration() { + service.unregisterListener(entityType, getInstance()); + } + + @Override + protected MoreObjects.ToStringHelper addToStringAttributes(final MoreObjects.ToStringHelper toStringHelper) { + return toStringHelper.add("entityType", entityType); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java new file mode 100644 index 0000000000..54f9a6b148 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.bootstrap; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.typed.Cluster; +import akka.cluster.typed.ClusterSingleton; +import akka.cluster.typed.SingletonActor; +import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand; +import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext; +import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext; +import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate; +import org.opendaylight.controller.eos.akka.owner.checker.OwnerStateChecker; +import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand; +import org.opendaylight.controller.eos.akka.owner.supervisor.OwnerSyncer; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; +import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; +import org.opendaylight.controller.eos.akka.registry.listener.type.EntityTypeListenerRegistry; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand; +import org.opendaylight.yangtools.yang.common.Empty; + +public final class EOSMain extends AbstractBehavior { + private final ActorRef listenerRegistry; + private final ActorRef candidateRegistry; + private final ActorRef ownerSupervisor; + private final ActorRef ownerStateChecker; + + private EOSMain(final ActorContext context) { + super(context); + + final String role = Cluster.get(context.getSystem()).selfMember().getRoles().iterator().next(); + + listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry"); + candidateRegistry = context.spawn(CandidateRegistry.create(), "CandidateRegistry"); + ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker"); + + final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem()); + // start the initial sync behavior that switches to the regular one after syncing + ownerSupervisor = clusterSingleton.init(SingletonActor.of(OwnerSyncer.create(), "OwnerSupervisor")); + } + + public static Behavior create() { + return Behaviors.setup(EOSMain::new); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(GetRunningContext.class, this::onGetRunningContext) + .onMessage(Terminate.class, this::onTerminate) + .build(); + } + + private Behavior onGetRunningContext(final GetRunningContext request) { + request.getReplyTo().tell( + new RunningContext(listenerRegistry, candidateRegistry, ownerStateChecker, ownerSupervisor)); + return this; + } + + private Behavior onTerminate(final Terminate request) { + request.getReplyTo().tell(Empty.getInstance()); + return Behaviors.stopped(); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/BootstrapCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/BootstrapCommand.java new file mode 100644 index 0000000000..122a53f360 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/BootstrapCommand.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.bootstrap.command; + +public abstract class BootstrapCommand { + BootstrapCommand() { + // Hidden on purpose + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/GetRunningContext.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/GetRunningContext.java new file mode 100644 index 0000000000..6804dcb998 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/GetRunningContext.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.bootstrap.command; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.ActorRef; + +public final class GetRunningContext extends BootstrapCommand { + private final ActorRef replyTo; + + public GetRunningContext(final ActorRef replyTo) { + this.replyTo = requireNonNull(replyTo); + } + + public ActorRef getReplyTo() { + return replyTo; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/RunningContext.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/RunningContext.java new file mode 100644 index 0000000000..6bbffca556 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/RunningContext.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.bootstrap.command; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.ActorRef; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand; + +public final class RunningContext extends BootstrapCommand { + private final @NonNull ActorRef listenerRegistry; + private final @NonNull ActorRef candidateRegistry; + private final @NonNull ActorRef ownerStateChecker; + private final @NonNull ActorRef ownerSupervisor; + + public RunningContext(final ActorRef listenerRegistry, + final ActorRef candidateRegistry, + final ActorRef ownerStateChecker, + final ActorRef ownerSupervisor) { + this.listenerRegistry = requireNonNull(listenerRegistry); + this.candidateRegistry = requireNonNull(candidateRegistry); + this.ownerStateChecker = requireNonNull(ownerStateChecker); + this.ownerSupervisor = requireNonNull(ownerSupervisor); + } + + public @NonNull ActorRef getListenerRegistry() { + return listenerRegistry; + } + + public @NonNull ActorRef getCandidateRegistry() { + return candidateRegistry; + } + + public @NonNull ActorRef getOwnerStateChecker() { + return ownerStateChecker; + } + + public @NonNull ActorRef getOwnerSupervisor() { + return ownerSupervisor; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/Terminate.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/Terminate.java new file mode 100644 index 0000000000..116b5e411c --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/Terminate.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.bootstrap.command; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.ActorRef; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.yangtools.yang.common.Empty; + +public final class Terminate extends BootstrapCommand { + private final @NonNull ActorRef replyTo; + + public Terminate(final ActorRef replyTo) { + this.replyTo = requireNonNull(replyTo); + } + + public @NonNull ActorRef getReplyTo() { + return replyTo; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java new file mode 100644 index 0000000000..57c40aca78 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.LWWRegisterKey; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator.Get; +import akka.cluster.ddata.typed.javadsl.Replicator.GetFailure; +import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse; +import akka.cluster.ddata.typed.javadsl.Replicator.GetSuccess; +import akka.cluster.ddata.typed.javadsl.Replicator.NotFound; +import akka.cluster.ddata.typed.javadsl.Replicator.ReadMajority; +import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; +import java.time.Duration; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply; +import org.opendaylight.controller.eos.akka.owner.checker.command.InternalGetReply; +import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand; +import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class OwnerStateChecker extends AbstractBehavior { + private static final Logger LOG = LoggerFactory.getLogger(OwnerStateChecker.class); + private static final Duration GET_OWNERSHIP_TIMEOUT = Duration.ofSeconds(5); + private static final Duration UNEXPECTED_ASK_TIMEOUT = Duration.ofSeconds(5); + + private final ReplicatorMessageAdapter> replicatorAdapter; + private final String localMember; + + private OwnerStateChecker(final ActorContext context, final String localMember) { + super(context); + this.localMember = requireNonNull(localMember); + replicatorAdapter = new ReplicatorMessageAdapter<>(context, + DistributedData.get(context.getSystem()).replicator(), UNEXPECTED_ASK_TIMEOUT); + } + + public static Behavior create(final String localMember) { + return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember)); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(GetOwnershipState.class, this::onGetOwnershipState) + .onMessage(InternalGetReply.class, this::respondWithState) + .build(); + } + + private Behavior onGetOwnershipState(final GetOwnershipState message) { + replicatorAdapter.askGet( + askReplyTo -> new Get<>( + new LWWRegisterKey<>(message.getEntity().toString()), + new ReadMajority(GET_OWNERSHIP_TIMEOUT), + askReplyTo), + reply -> new InternalGetReply(reply, message.getEntity(), message.getReplyTo())); + return this; + } + + private Behavior respondWithState(final InternalGetReply reply) { + final GetResponse> response = reply.getResponse(); + if (response instanceof NotFound) { + LOG.debug("Data for owner not found, most likely no owner has beed picked for entity: {}", + reply.getEntity()); + reply.getReplyTo().tell(new GetOwnershipStateReply(null)); + } else if (response instanceof GetFailure) { + LOG.warn("Failure retrieving data for entity: {}", reply.getEntity()); + reply.getReplyTo().tell(new GetOwnershipStateReply(null)); + } else if (response instanceof GetSuccess) { + final String owner = ((GetSuccess>) response).get(response.key()).getValue(); + LOG.debug("Data for owner received. {}, owner: {}", response, owner); + + final boolean isOwner = localMember.equals(owner); + final boolean hasOwner = !owner.isEmpty(); + + reply.getReplyTo().tell(new GetOwnershipStateReply(EntityOwnershipState.from(isOwner, hasOwner))); + } + return this; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipState.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipState.java new file mode 100644 index 0000000000..617b65e185 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipState.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.ActorRef; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public final class GetOwnershipState extends StateCheckerCommand { + private final @NonNull DOMEntity entity; + private final @NonNull ActorRef replyTo; + + public GetOwnershipState(final DOMEntity entity, final ActorRef replyTo) { + this.entity = requireNonNull(entity); + this.replyTo = requireNonNull(replyTo); + } + + public @NonNull DOMEntity getEntity() { + return entity; + } + + public @NonNull ActorRef getReplyTo() { + return replyTo; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipStateReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipStateReply.java new file mode 100644 index 0000000000..58fb5a048e --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipStateReply.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState; + +public final class GetOwnershipStateReply extends StateCheckerReply { + private final @Nullable EntityOwnershipState ownershipState; + + public GetOwnershipStateReply(final EntityOwnershipState ownershipState) { + this.ownershipState = ownershipState; + } + + public @Nullable EntityOwnershipState getOwnershipState() { + return ownershipState; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/InternalGetReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/InternalGetReply.java new file mode 100644 index 0000000000..1a8f3058f6 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/InternalGetReply.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.ActorRef; +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public final class InternalGetReply extends StateCheckerCommand { + private final @NonNull GetResponse> response; + private final @NonNull ActorRef replyTo; + private final @NonNull DOMEntity entity; + + public InternalGetReply(final GetResponse> response, final DOMEntity entity, + final ActorRef replyTo) { + this.response = requireNonNull(response); + this.entity = requireNonNull(entity); + this.replyTo = requireNonNull(replyTo); + } + + public @NonNull GetResponse> getResponse() { + return response; + } + + public @NonNull DOMEntity getEntity() { + return entity; + } + + public @NonNull ActorRef getReplyTo() { + return replyTo; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerCommand.java new file mode 100644 index 0000000000..e6b5412545 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerCommand.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +public abstract class StateCheckerCommand { + StateCheckerCommand() { + // Hidden on purpose + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerReply.java new file mode 100644 index 0000000000..39347cc696 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerReply.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +public abstract class StateCheckerReply { + StateCheckerReply() { + // Hidden on purpose + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java new file mode 100644 index 0000000000..41c83d4723 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java @@ -0,0 +1,369 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.ClusterEvent; +import akka.cluster.Member; +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.LWWRegisterKey; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.SelfUniqueAddress; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator; +import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; +import akka.cluster.typed.Cluster; +import akka.cluster.typed.Subscribe; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + +/** + * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate + * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates. + * On cluster up/down etc. events the owners are reassigned if possible. + */ +public final class OwnerSupervisor extends AbstractBehavior { + + private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class); + private static final String DATACENTER_PREFIX = "dc"; + + private final ReplicatorMessageAdapter> ownerReplicator; + + // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an + // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is + // running in a cluster-singleton + private final LWWRegister.Clock clock = (currentTimestamp, value) -> currentTimestamp + 1; + + private final Cluster cluster; + private final SelfUniqueAddress node; + + private final Set activeMembers; + + // currently registered candidates + private final Map> currentCandidates; + // current owners + private final Map currentOwners; + // reverse lookup of owner to entity + private final Multimap ownerToEntity = HashMultimap.create(); + + private OwnerSupervisor(final ActorContext context, + final Map> currentCandidates, + final Map currentOwners) { + super(context); + + final DistributedData distributedData = DistributedData.get(context.getSystem()); + final ActorRef replicator = distributedData.replicator(); + + this.cluster = Cluster.get(context.getSystem()); + this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5)); + + this.node = distributedData.selfUniqueAddress(); + this.activeMembers = getActiveMembers(cluster); + + this.currentCandidates = currentCandidates; + this.currentOwners = currentOwners; + + for (final Map.Entry entry : currentOwners.entrySet()) { + ownerToEntity.put(entry.getValue(), entry.getKey()); + } + + // check whether we have any unreachable/missing owners + reassignUnreachableOwners(); + assignMissingOwners(); + + final ActorRef memberEventAdapter = + context.messageAdapter(ClusterEvent.MemberEvent.class, event -> { + if (event instanceof ClusterEvent.MemberUp) { + return new MemberUpEvent(event.member().address(), event.member().getRoles()); + } else { + return new MemberDownEvent(event.member().address(), event.member().getRoles()); + } + }); + cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class)); + + final ActorRef reachabilityEventAdapter = + context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> { + if (event instanceof ClusterEvent.ReachableMember) { + return new MemberReachableEvent(event.member().address(), event.member().getRoles()); + } else { + return new MemberUnreachableEvent(event.member().address(), event.member().getRoles()); + } + }); + cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class)); + + new ReplicatorMessageAdapter>>(context, replicator, + Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new); + + LOG.debug("Owner Supervisor started"); + } + + public static Behavior create(final Map> currentCandidates, + final Map currentOwners) { + return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners)); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(CandidatesChanged.class, this::onCandidatesChanged) + .onMessage(MemberUpEvent.class, this::onPeerUp) + .onMessage(MemberDownEvent.class, this::onPeerDown) + .onMessage(MemberReachableEvent.class, this::onPeerReachable) + .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable) + .build(); + } + + private void reassignUnreachableOwners() { + final Set ownersToReassign = new HashSet<>(); + for (final String owner : ownerToEntity.keys()) { + if (!activeMembers.contains(owner)) { + ownersToReassign.add(owner); + } + } + + for (final String owner : ownersToReassign) { + reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner))); + } + } + + private void assignMissingOwners() { + for (final Map.Entry> entry : currentCandidates.entrySet()) { + if (!currentOwners.containsKey(entry.getKey())) { + assignOwnerFor(entry.getKey()); + } + } + } + + private Behavior onCandidatesChanged(final CandidatesChanged message) { + LOG.debug("onCandidatesChanged {}", message.getResponse()); + if (message.getResponse() instanceof Replicator.Changed) { + final Replicator.Changed>> changed = + (Replicator.Changed>>) message.getResponse(); + processCandidateChanges(changed.get(CandidateRegistry.KEY)); + } + return this; + } + + private void processCandidateChanges(final ORMap> candidates) { + final Map> entries = candidates.getEntries(); + for (final Map.Entry> entry : entries.entrySet()) { + processCandidatesFor(entry.getKey(), entry.getValue()); + } + } + + private void processCandidatesFor(final DOMEntity entity, final ORSet receivedCandidates) { + LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements()); + + final Set candidates = JavaConverters.asJava(receivedCandidates.elements()); + // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with + // no owner + if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) { + LOG.debug("Candidates missing for entity: {} adding all candidates", entity); + currentCandidates.put(entity, new HashSet<>(candidates)); + + LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString()); + assignOwnerFor(entity); + + return; + } + + final Set currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet()); + final Set difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates)); + + LOG.debug("currently present candidates: {}", currentlyPresent); + LOG.debug("difference: {}", difference); + + final List ownersToReassign = new ArrayList<>(); + + // first add/remove candidates from entities + for (final String toCheck : difference) { + if (!currentlyPresent.contains(toCheck)) { + // add new candidate + LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck); + currentCandidates.get(entity).add(toCheck); + + if (!currentOwners.containsKey(entity)) { + // might as well assign right away when we don't have an owner + assignOwnerFor(entity); + } + + LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString()); + continue; + } + + if (!candidates.contains(toCheck)) { + // remove candidate + LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck); + currentCandidates.get(entity).remove(toCheck); + if (ownerToEntity.containsKey(toCheck)) { + ownersToReassign.add(toCheck); + } + } + } + + // then reassign those that need new owners + for (final String toReassign : ownersToReassign) { + reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign))); + } + + if (currentCandidates.get(entity) == null) { + LOG.debug("Last candidate removed for {}", entity); + } else { + LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString()); + } + } + + private void reassignCandidatesFor(final String oldOwner, final Collection entities) { + LOG.debug("Reassigning owners for {}", entities); + for (final DOMEntity entity : entities) { + + // only reassign owner for those entities that lost this candidate or is not reachable + if (!activeMembers.contains(oldOwner) + || !currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(oldOwner)) { + ownerToEntity.remove(oldOwner, entity); + assignOwnerFor(entity); + } + } + } + + private void assignOwnerFor(final DOMEntity entity) { + final Set candidatesForEntity = currentCandidates.get(entity); + if (candidatesForEntity.isEmpty()) { + LOG.debug("No candidates present for entity: {}", entity); + removeOwner(entity); + return; + } + + String pickedCandidate = null; + for (final String candidate : candidatesForEntity) { + if (activeMembers.contains(candidate)) { + pickedCandidate = candidate; + break; + } + } + if (pickedCandidate == null) { + LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}", + entity, activeMembers, currentCandidates.get(entity)); + // no candidate is reachable so only remove owner if necessary + removeOwner(entity); + return; + } + ownerToEntity.put(pickedCandidate, entity); + + LOG.debug("Entity {} new owner: {}", entity, pickedCandidate); + currentOwners.put(entity, pickedCandidate); + writeNewOwner(entity, pickedCandidate); + } + + private void removeOwner(final DOMEntity entity) { + if (currentOwners.containsKey(entity)) { + // assign empty owner to dd, as we cannot delete data for a key since that would prevent + // writes for the same key + currentOwners.remove(entity); + + writeNewOwner(entity, ""); + } + } + + private void writeNewOwner(final DOMEntity entity, final String candidate) { + ownerReplicator.askUpdate( + askReplyTo -> new Replicator.Update<>( + new LWWRegisterKey<>(entity.toString()), + new LWWRegister<>(node.uniqueAddress(), candidate, 0), + Replicator.writeLocal(), + askReplyTo, + register -> register.withValue(node, candidate, clock)), + OwnerChanged::new); + } + + private Behavior onPeerUp(final MemberUpEvent event) { + LOG.debug("Received MemberUp : {}", event); + + handleReachableEvent(event.getRoles()); + return this; + } + + private Behavior onPeerReachable(final MemberReachableEvent event) { + LOG.debug("Received MemberReachable : {}", event); + + handleReachableEvent(event.getRoles()); + return this; + } + + private void handleReachableEvent(final Set roles) { + activeMembers.add(extractRole(roles)); + assignMissingOwners(); + } + + private Behavior onPeerDown(final MemberDownEvent event) { + LOG.debug("Received MemberDown : {}", event); + + handleUnreachableEvent(event.getRoles()); + return this; + } + + private Behavior onPeerUnreachable(final MemberUnreachableEvent event) { + LOG.debug("Received MemberUnreachable : {}", event); + + handleUnreachableEvent(event.getRoles()); + return this; + } + + private void handleUnreachableEvent(final Set roles) { + activeMembers.remove(extractRole(roles)); + reassignUnreachableOwners(); + } + + private static Set getActiveMembers(final Cluster cluster) { + final Set activeMembers = new HashSet<>(); + cluster.state().getMembers().forEach(member -> activeMembers.add(extractRole(member))); + activeMembers.removeAll(cluster.state().getUnreachable().stream() + .map(OwnerSupervisor::extractRole).collect(Collectors.toSet())); + + return activeMembers; + } + + private static String extractRole(final Member member) { + return extractRole(member.getRoles()); + } + + private static String extractRole(final Set roles) { + return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX)) + .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found.")); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java new file mode 100644 index 0000000000..1a8df09f1d --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.LWWRegisterKey; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator; +import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Behavior that retrieves current candidates/owners from distributed-data and switches to OwnerSupervisor when the + * sync has finished. + */ +public final class OwnerSyncer extends AbstractBehavior { + private static final Logger LOG = LoggerFactory.getLogger(OwnerSyncer.class); + + private final ReplicatorMessageAdapter> ownerReplicator; + private final Map> currentCandidates = new HashMap<>(); + private final Map currentOwners = new HashMap<>(); + + // String representation of Entity to DOMEntity + private final Map entityLookup = new HashMap<>(); + + private int toSync = -1; + + private OwnerSyncer(final ActorContext context) { + super(context); + LOG.debug("Starting candidate and owner sync"); + + final ActorRef replicator = DistributedData.get(context.getSystem()).replicator(); + + this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5)); + + new ReplicatorMessageAdapter>>(context, replicator, + Duration.ofSeconds(5)).askGet( + askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo), + InitialCandidateSync::new); + } + + public static Behavior create() { + return Behaviors.setup(OwnerSyncer::new); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(InitialCandidateSync.class, this::onInitialCandidateSync) + .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync) + .build(); + } + + private Behavior onInitialCandidateSync(final InitialCandidateSync rsp) { + final Replicator.GetResponse>> response = rsp.getResponse(); + if (response instanceof Replicator.GetSuccess) { + return doInitialSync((Replicator.GetSuccess>>) response); + } else if (response instanceof Replicator.NotFound) { + LOG.debug("No candidates found switching to supervisor"); + return switchToSupervisor(); + } else { + LOG.debug("Initial candidate sync failed, switching to supervisor. Sync reply: {}", response); + return switchToSupervisor(); + } + } + + private Behavior doInitialSync( + final Replicator.GetSuccess>> response) { + + final ORMap> candidates = response.get(CandidateRegistry.KEY); + candidates.getEntries().entrySet().forEach(entry -> { + currentCandidates.put(entry.getKey(), new HashSet<>(entry.getValue().getElements())); + }); + + toSync = candidates.keys().size(); + for (final DOMEntity entity : candidates.keys().getElements()) { + entityLookup.put(entity.toString(), entity); + + ownerReplicator.askGet( + askReplyTo -> new Replicator.Get<>( + new LWWRegisterKey<>(entity.toString()), + Replicator.readLocal(), + askReplyTo), + InitialOwnerSync::new); + } + + return this; + } + + private Behavior onInitialOwnerSync(final InitialOwnerSync rsp) { + final Replicator.GetResponse> response = rsp.getResponse(); + if (response instanceof Replicator.GetSuccess) { + handleOwnerRsp((Replicator.GetSuccess>) response); + } else if (response instanceof Replicator.NotFound) { + handleNotFoundOwnerRsp((Replicator.NotFound>) response); + } else { + LOG.debug("Initial sync failed response: {}", response); + } + + // count the responses, on last switch behaviors + toSync--; + if (toSync == 0) { + return switchToSupervisor(); + } + + return this; + } + + private Behavior switchToSupervisor() { + LOG.debug("Initial sync done, switching to supervisor. candidates: {}, owners: {}", + currentCandidates, currentOwners); + return Behaviors.setup(ctx -> + OwnerSupervisor.create(currentCandidates, currentOwners)); + } + + private void handleOwnerRsp(final Replicator.GetSuccess> rsp) { + final DOMEntity entity = entityLookup.get(rsp.key().id()); + final String owner = rsp.get(rsp.key()).getValue(); + + currentOwners.put(entity, owner); + } + + private static void handleNotFoundOwnerRsp(final Replicator.NotFound> rsp) { + LOG.debug("Owner not found. {}", rsp); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/CandidatesChanged.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/CandidatesChanged.java new file mode 100644 index 0000000000..6334d977db --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/CandidatesChanged.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import static java.util.Objects.requireNonNull; + +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse; +import com.google.common.base.MoreObjects; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public final class CandidatesChanged extends OwnerSupervisorCommand { + private final @NonNull SubscribeResponse>> response; + + public CandidatesChanged(final SubscribeResponse>> subscribeResponse) { + this.response = requireNonNull(subscribeResponse); + } + + public @NonNull SubscribeResponse>> getResponse() { + return response; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("response", response).toString(); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialCandidateSync.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialCandidateSync.java new file mode 100644 index 0000000000..53bf10ae12 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialCandidateSync.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public final class InitialCandidateSync extends OwnerSupervisorCommand { + private final @Nullable GetResponse>> response; + + public InitialCandidateSync(final GetResponse>> response) { + this.response = response; + } + + public @Nullable GetResponse>> getResponse() { + return response; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialOwnerSync.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialOwnerSync.java new file mode 100644 index 0000000000..e734c6664b --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialOwnerSync.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import static java.util.Objects.requireNonNull; + +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse; +import org.eclipse.jdt.annotation.NonNull; + +public final class InitialOwnerSync extends OwnerSupervisorCommand { + private final @NonNull GetResponse> response; + + public InitialOwnerSync(final GetResponse> response) { + this.response = requireNonNull(response); + } + + public @NonNull GetResponse> getResponse() { + return response; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InternalClusterEvent.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InternalClusterEvent.java new file mode 100644 index 0000000000..0825c3246c --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InternalClusterEvent.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import static java.util.Objects.requireNonNull; + +import akka.actor.Address; +import com.google.common.base.MoreObjects; +import java.util.Set; +import org.eclipse.jdt.annotation.NonNull; + +public abstract class InternalClusterEvent extends OwnerSupervisorCommand { + private final @NonNull Set roles; + private final @NonNull Address address; + + InternalClusterEvent(final Address address, final Set roles) { + this.address = requireNonNull(address); + this.roles = Set.copyOf(roles); + } + + public final @NonNull Address getAddress() { + return address; + } + + public final @NonNull Set getRoles() { + return roles; + } + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("address", address).add("roles", roles).toString(); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberDownEvent.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberDownEvent.java new file mode 100644 index 0000000000..9ff778900d --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberDownEvent.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import akka.actor.Address; +import java.util.Set; + +public final class MemberDownEvent extends InternalClusterEvent { + public MemberDownEvent(final Address address, final Set roles) { + super(address, roles); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberReachableEvent.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberReachableEvent.java new file mode 100644 index 0000000000..dc6d79838d --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberReachableEvent.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import akka.actor.Address; +import java.util.Set; + +public final class MemberReachableEvent extends InternalClusterEvent { + public MemberReachableEvent(final Address address, final Set roles) { + super(address, roles); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUnreachableEvent.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUnreachableEvent.java new file mode 100644 index 0000000000..24999fb3bc --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUnreachableEvent.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import akka.actor.Address; +import java.util.Set; + +public final class MemberUnreachableEvent extends InternalClusterEvent { + public MemberUnreachableEvent(final Address address, final Set roles) { + super(address, roles); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUpEvent.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUpEvent.java new file mode 100644 index 0000000000..18eb765d0f --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUpEvent.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import akka.actor.Address; +import java.util.Set; + +public final class MemberUpEvent extends InternalClusterEvent { + public MemberUpEvent(final Address address, final Set roles) { + super(address, roles); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerChanged.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerChanged.java new file mode 100644 index 0000000000..b7ce5b2d0c --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerChanged.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import static java.util.Objects.requireNonNull; + +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.typed.javadsl.Replicator.UpdateResponse; +import org.eclipse.jdt.annotation.NonNull; + +public final class OwnerChanged extends OwnerSupervisorCommand { + private final @NonNull UpdateResponse> rsp; + + public OwnerChanged(final UpdateResponse> rsp) { + this.rsp = requireNonNull(rsp); + } + + public @NonNull UpdateResponse> getResponse() { + return rsp; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorCommand.java new file mode 100644 index 0000000000..ba6ca1d9ab --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorCommand.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +public abstract class OwnerSupervisorCommand { + OwnerSupervisorCommand() { + // Hidden on purpose + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistry.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistry.java new file mode 100644 index 0000000000..16c2ab6258 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistry.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.candidate; + +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.ddata.Key; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORMapKey; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.SelfUniqueAddress; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator; +import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; +import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse; +import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate; +import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Actor responsible for handling registrations of candidates into distributed-data. + */ +public final class CandidateRegistry extends AbstractBehavior { + + private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistry.class); + + public static final Key>> KEY = new ORMapKey<>("candidateRegistry"); + + private final ReplicatorMessageAdapter>> replicatorAdapter; + private final SelfUniqueAddress node; + + private CandidateRegistry(final ActorContext context, + final ReplicatorMessageAdapter>> replicatorAdapter) { + super(context); + this.replicatorAdapter = replicatorAdapter; + + this.node = DistributedData.get(context.getSystem()).selfUniqueAddress(); + + LOG.debug("Candidate registry started"); + } + + public static Behavior create() { + return Behaviors.setup(ctx -> + DistributedData.withReplicatorMessageAdapter( + (ReplicatorMessageAdapter>> replicatorAdapter) -> + new CandidateRegistry(ctx, replicatorAdapter))); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(RegisterCandidate.class, this::onRegisterCandidate) + .onMessage(UnregisterCandidate.class, this::onUnregisterCandidate) + .onMessage(InternalUpdateResponse.class, this::onInternalUpdateResponse) + .build(); + } + + private Behavior onRegisterCandidate(final RegisterCandidate registerCandidate) { + LOG.debug("Registering candidate({}) for entity: {}", + registerCandidate.getCandidate(), registerCandidate.getEntity()); + replicatorAdapter.askUpdate( + askReplyTo -> new Replicator.Update<>( + KEY, + ORMap.empty(), + Replicator.writeLocal(), + askReplyTo, + map -> map.update(node, registerCandidate.getEntity(), ORSet.empty(), + value -> value.add(node, registerCandidate.getCandidate()))), + InternalUpdateResponse::new); + return this; + } + + private Behavior onUnregisterCandidate(final UnregisterCandidate unregisterCandidate) { + LOG.debug("Removing candidate({}) from entity: {}", + unregisterCandidate.getCandidate(), unregisterCandidate.getEntity()); + replicatorAdapter.askUpdate( + askReplyTo -> new Replicator.Update<>( + KEY, + ORMap.empty(), + Replicator.writeLocal(), + askReplyTo, + map -> map.update(node, unregisterCandidate.getEntity(), ORSet.empty(), + value -> value.remove(node, unregisterCandidate.getCandidate()))), + InternalUpdateResponse::new); + return this; + } + + private Behavior onInternalUpdateResponse(final InternalUpdateResponse updateResponse) { + LOG.debug("Received update response: {}", updateResponse.getRsp()); + return this; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/AbstractCandidateCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/AbstractCandidateCommand.java new file mode 100644 index 0000000000..5949f4d27a --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/AbstractCandidateCommand.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.candidate.command; + +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public abstract class AbstractCandidateCommand extends CandidateRegistryCommand { + private final @NonNull DOMEntity entity; + private final @NonNull String candidate; + + AbstractCandidateCommand(final DOMEntity entity, final String candidate) { + this.entity = requireNonNull(entity); + this.candidate = requireNonNull(candidate); + } + + public final @NonNull DOMEntity getEntity() { + return entity; + } + + public final @NonNull String getCandidate() { + return candidate; + } + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("entity", entity).add("candidate", candidate).toString(); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRegistryCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRegistryCommand.java new file mode 100644 index 0000000000..1cd96a4508 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRegistryCommand.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.candidate.command; + +public abstract class CandidateRegistryCommand { + CandidateRegistryCommand() { + // Hidden on purpose + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InternalUpdateResponse.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InternalUpdateResponse.java new file mode 100644 index 0000000000..1759615603 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InternalUpdateResponse.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.candidate.command; + +import static java.util.Objects.requireNonNull; + +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.typed.javadsl.Replicator.UpdateResponse; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public final class InternalUpdateResponse extends CandidateRegistryCommand { + private final @NonNull UpdateResponse>> rsp; + + public InternalUpdateResponse(final UpdateResponse>> rsp) { + this.rsp = requireNonNull(rsp); + } + + public @NonNull UpdateResponse>> getRsp() { + return rsp; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/RegisterCandidate.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/RegisterCandidate.java new file mode 100644 index 0000000000..b76a203681 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/RegisterCandidate.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.candidate.command; + +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +/** + * Sent to Candidate registry to register the candidate for a given entity. + */ +public final class RegisterCandidate extends AbstractCandidateCommand { + public RegisterCandidate(final DOMEntity entity, final String candidate) { + super(entity, candidate); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/UnregisterCandidate.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/UnregisterCandidate.java new file mode 100644 index 0000000000..a39f3d2430 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/UnregisterCandidate.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.candidate.command; + +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +/** + * Sent to CandidateRegistry to unregister the candidate for a given entity. + */ +public final class UnregisterCandidate extends AbstractCandidateCommand { + public UnregisterCandidate(final DOMEntity entity, final String candidate) { + super(entity, candidate); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/SingleEntityListenerActor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/SingleEntityListenerActor.java new file mode 100644 index 0000000000..279ee8fa8e --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/SingleEntityListenerActor.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.owner; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.LWWRegisterKey; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator; +import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; +import java.time.Duration; +import org.opendaylight.controller.eos.akka.registry.listener.owner.command.InitialOwnerSync; +import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand; +import org.opendaylight.controller.eos.akka.registry.listener.owner.command.OwnerChanged; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand; +import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Keeps track of owners for a single entity, which is mapped to a single LWWRegister in distributed-data. + * Notifies the listener responsible for tracking the whole entity-type of changes. + */ +public class SingleEntityListenerActor extends AbstractBehavior { + private static final Logger LOG = LoggerFactory.getLogger(SingleEntityListenerActor.class); + + private final String localMember; + private final DOMEntity entity; + private final ActorRef toNotify; + private final ReplicatorMessageAdapter> ownerReplicator; + + private String currentOwner = ""; + + public SingleEntityListenerActor(final ActorContext context, final String localMember, + final DOMEntity entity, final ActorRef toNotify) { + super(context); + this.localMember = localMember; + this.entity = entity; + this.toNotify = toNotify; + + final ActorRef replicator = DistributedData.get(context.getSystem()).replicator(); + ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5)); + + ownerReplicator.askGet( + replyTo -> new Replicator.Get<>(new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo), + InitialOwnerSync::new); + LOG.debug("OwnerListenerActor for {} started", entity.toString()); + } + + public static Behavior create(final String localMember, final DOMEntity entity, + final ActorRef toNotify) { + return Behaviors.setup(ctx -> new SingleEntityListenerActor(ctx, localMember, entity, toNotify)); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(OwnerChanged.class, this::onOwnerChanged) + .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync) + .build(); + } + + private Behavior onInitialOwnerSync(final InitialOwnerSync ownerSync) { + final Replicator.GetResponse> response = ownerSync.getResponse(); + LOG.debug("Received initial sync response for: {}, response: {}", entity, response); + + // only trigger initial notification when there is no owner present as we wont get a subscription callback + // when distributed-data does not have any data for a key + if (response instanceof Replicator.NotFound) { + + // no data is present, trigger initial notification with no owner + triggerNoOwnerNotification(); + } else if (response instanceof Replicator.GetSuccess) { + + // when we get a success just let subscribe callback handle the initial notification + LOG.debug("Owner present for entity: {} at the time of initial sync.", entity); + } else { + LOG.warn("Get has failed for entity: {}", response); + } + + // make sure to subscribe AFTER initial notification + ownerReplicator.subscribe(new LWWRegisterKey<>(entity.toString()), OwnerChanged::new); + + return this; + } + + private void triggerNoOwnerNotification() { + LOG.debug("Triggering initial notification without an owner for: {}", entity); + + toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange( + entity, EntityOwnershipChangeState.REMOTE_OWNERSHIP_LOST_NO_OWNER))); + } + + private Behavior onOwnerChanged(final OwnerChanged ownerChanged) { + + final Replicator.SubscribeResponse> response = ownerChanged.getResponse(); + if (response instanceof Replicator.Changed) { + + final Replicator.Changed> registerChanged = + (Replicator.Changed>) response; + LOG.debug("Owner changed for: {}, prevOwner: {}, newOwner: {}", + entity, currentOwner, registerChanged.get(registerChanged.key()).getValue()); + handleOwnerChange(registerChanged); + } else if (response instanceof Replicator.Deleted) { + handleOwnerLost((Replicator.Deleted>) response); + } + + return this; + } + + private void handleOwnerChange(final Replicator.Changed> changed) { + final String newOwner = changed.get(changed.key()).getValue(); + + final boolean wasOwner = currentOwner.equals(localMember); + final boolean isOwner = newOwner.equals(localMember); + final boolean hasOwner = !newOwner.equals(""); + + LOG.debug("Owner changed for entity:{}, currentOwner: {}, wasOwner: {}, isOwner: {}, hasOwner:{}", + entity, currentOwner, wasOwner, isOwner, hasOwner); + + currentOwner = newOwner; + + toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange( + entity, EntityOwnershipChangeState.from(wasOwner, isOwner, hasOwner)))); + } + + private void handleOwnerLost(final Replicator.Deleted> changed) { + final boolean wasOwner = currentOwner.equals(localMember); + + LOG.debug("Owner lost for entity:{}, currentOwner: {}, wasOwner: {}", entity, currentOwner, wasOwner); + + currentOwner = ""; + toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange( + entity, EntityOwnershipChangeState.from(wasOwner, false, false)))); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/InitialOwnerSync.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/InitialOwnerSync.java new file mode 100644 index 0000000000..402389d064 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/InitialOwnerSync.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.owner.command; + +import static java.util.Objects.requireNonNull; + +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse; +import org.eclipse.jdt.annotation.NonNull; + +public final class InitialOwnerSync extends ListenerCommand { + private final @NonNull GetResponse> response; + + public InitialOwnerSync(final GetResponse> response) { + this.response = requireNonNull(response); + } + + public @NonNull GetResponse> getResponse() { + return response; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/ListenerCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/ListenerCommand.java new file mode 100644 index 0000000000..b0502f5174 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/ListenerCommand.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.owner.command; + +public abstract class ListenerCommand { + ListenerCommand() { + // Hidden on purpose + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/OwnerChanged.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/OwnerChanged.java new file mode 100644 index 0000000000..4e1298ab3d --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/OwnerChanged.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.owner.command; + +import static java.util.Objects.requireNonNull; + +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse; +import org.eclipse.jdt.annotation.NonNull; + +/** + * Notification from distributed-data sent to the SingleEntityListenerActor when owner changes for the tracked entity. + */ +public final class OwnerChanged extends ListenerCommand { + private final @NonNull SubscribeResponse> response; + + public OwnerChanged(final SubscribeResponse> response) { + this.response = requireNonNull(response); + } + + public @NonNull SubscribeResponse> getResponse() { + return response; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java new file mode 100644 index 0000000000..81772cf155 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.type; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator.Changed; +import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse; +import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; +import org.opendaylight.controller.eos.akka.registry.listener.owner.SingleEntityListenerActor; +import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.CandidatesChanged; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EntityTypeListenerActor extends AbstractBehavior { + private static final Logger LOG = LoggerFactory.getLogger(EntityTypeListenerActor.class); + + private final Map> activeListeners = new HashMap<>(); + private final String localMember; + private final String entityType; + private final DOMEntityOwnershipListener listener; + + public EntityTypeListenerActor(final ActorContext context, final String localMember, + final String entityType, final DOMEntityOwnershipListener listener) { + super(context); + this.localMember = localMember; + this.entityType = entityType; + this.listener = listener; + + new ReplicatorMessageAdapter>>(context, + DistributedData.get(context.getSystem()).replicator(), Duration.ofSeconds(5)) + .subscribe(CandidateRegistry.KEY, CandidatesChanged::new); + } + + public static Behavior create(final String localMember, final String entityType, + final DOMEntityOwnershipListener listener) { + return Behaviors.setup(ctx -> new EntityTypeListenerActor(ctx, localMember, entityType, listener)); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(CandidatesChanged.class, this::onCandidatesChanged) + .onMessage(EntityOwnerChanged.class, this::onOwnerChanged) + .build(); + } + + private Behavior onCandidatesChanged(final CandidatesChanged notification) { + final SubscribeResponse>> response = notification.getResponse(); + if (response instanceof Changed) { + processCandidates(((Changed>>) response).get(response.key()).getEntries()); + } else { + LOG.warn("Unexpected notification from replicator: {}", response); + } + return this; + } + + private void processCandidates(final Map> entries) { + final Map> filteredCandidates = entries.entrySet().stream() + .filter(entry -> entry.getKey().getType().equals(entityType)) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + LOG.debug("Entity-type: {} current candidates: {}", entityType, filteredCandidates); + + final Set removed = + ImmutableSet.copyOf(Sets.difference(activeListeners.keySet(), filteredCandidates.keySet())); + if (!removed.isEmpty()) { + LOG.debug("Stopping listeners for {}", removed); + // kill actors for the removed + removed.forEach(removedEntity -> getContext().stop(activeListeners.remove(removedEntity))); + } + + for (final Entry> entry : filteredCandidates.entrySet()) { + activeListeners.computeIfAbsent(entry.getKey(), key -> { + // spawn actor for this entity + LOG.debug("Starting listener for {}", key); + return getContext().spawn(SingleEntityListenerActor.create(localMember, key, getContext().getSelf()), + "SingleEntityListener-" + encodeEntityToActorName(key)); + }); + } + } + + private Behavior onOwnerChanged(final EntityOwnerChanged rsp) { + LOG.debug("Entity-type: {} listener, owner change: {}", entityType, rsp); + + listener.ownershipChanged(rsp.getOwnershipChange()); + return this; + } + + private static String encodeEntityToActorName(final DOMEntity entity) { + return "type=" + entity.getType() + ",entity=" + + entity.getIdentifier().getLastPathArgument().getNodeType().getLocalName() + "-" + UUID.randomUUID(); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerRegistry.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerRegistry.java new file mode 100644 index 0000000000..c853124d9a --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerRegistry.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.type; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EntityTypeListenerRegistry extends AbstractBehavior { + private static final Logger LOG = LoggerFactory.getLogger(EntityTypeListenerRegistry.class); + + private final Map> spawnedListenerActors = + new HashMap<>(); + private final String localMember; + + public EntityTypeListenerRegistry(final ActorContext context, + final String localMember) { + super(context); + this.localMember = requireNonNull(localMember); + } + + public static Behavior create(final String role) { + return Behaviors.setup(ctx -> new EntityTypeListenerRegistry(ctx, role)); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(RegisterListener.class, this::onRegisterListener) + .onMessage(UnregisterListener.class, this::onUnregisterListener) + .build(); + } + + private Behavior onRegisterListener(final RegisterListener command) { + LOG.debug("Spawning entity type listener actor for: {}", command.getEntityType()); + + final ActorRef listenerActor = + getContext().spawn(EntityTypeListenerActor.create(localMember, + command.getEntityType(), command.getDelegateListener()), + "TypeListener:" + encodeEntityToActorName(command.getEntityType())); + spawnedListenerActors.put(command.getDelegateListener(), listenerActor); + return this; + } + + private Behavior onUnregisterListener(final UnregisterListener command) { + LOG.debug("Stopping entity type listener actor for: {}", command.getEntityType()); + + getContext().stop(spawnedListenerActors.remove(command.getDelegateListener())); + return this; + } + + private static String encodeEntityToActorName(final String entityType) { + return "type=" + entityType + "-" + UUID.randomUUID(); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/CandidatesChanged.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/CandidatesChanged.java new file mode 100644 index 0000000000..07a4994fdb --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/CandidatesChanged.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.type.command; + +import static java.util.Objects.requireNonNull; + +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse; +import com.google.common.base.MoreObjects; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +/** + * Adapted notification from distributed-data sent to EntityTypeListenerActor when candidates change. + */ +public final class CandidatesChanged extends TypeListenerCommand { + private final @NonNull SubscribeResponse>> response; + + public CandidatesChanged(final SubscribeResponse>> response) { + this.response = requireNonNull(response); + } + + public @NonNull SubscribeResponse>> getResponse() { + return response; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("response", response).toString(); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/EntityOwnerChanged.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/EntityOwnerChanged.java new file mode 100644 index 0000000000..02d0e2fe50 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/EntityOwnerChanged.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.type.command; + +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange; + +/** + * Notification sent to EntityTypeListenerActor when there is an owner change for an Entity of a given type. + */ +public final class EntityOwnerChanged extends TypeListenerCommand { + private final @NonNull DOMEntityOwnershipChange ownershipChange; + + public EntityOwnerChanged(final DOMEntityOwnershipChange ownershipChange) { + this.ownershipChange = requireNonNull(ownershipChange); + } + + public @NonNull DOMEntityOwnershipChange getOwnershipChange() { + return ownershipChange; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("ownershipChange", ownershipChange).toString(); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/RegisterListener.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/RegisterListener.java new file mode 100644 index 0000000000..ffa3d47ff2 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/RegisterListener.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.type.command; + +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; + +/** + * Register a DOMEntityOwnershipListener for a given entity-type. + */ +public final class RegisterListener extends TypeListenerRegistryCommand { + public RegisterListener(final String entityType, final DOMEntityOwnershipListener delegateListener) { + super(entityType, delegateListener); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerCommand.java new file mode 100644 index 0000000000..939d4a24a3 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerCommand.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.type.command; + +public abstract class TypeListenerCommand { + TypeListenerCommand() { + // Hidden on purpose + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerRegistryCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerRegistryCommand.java new file mode 100644 index 0000000000..3bfce42ed2 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerRegistryCommand.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.type.command; + +import static java.util.Objects.requireNonNull; + +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; + +public abstract class TypeListenerRegistryCommand { + private final @NonNull String entityType; + private final @NonNull DOMEntityOwnershipListener delegateListener; + + TypeListenerRegistryCommand(final String entityType, final DOMEntityOwnershipListener delegateListener) { + this.entityType = requireNonNull(entityType); + this.delegateListener = requireNonNull(delegateListener); + } + + public final @NonNull String getEntityType() { + return entityType; + } + + public final @NonNull DOMEntityOwnershipListener getDelegateListener() { + return delegateListener; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/UnregisterListener.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/UnregisterListener.java new file mode 100644 index 0000000000..4bd09c899f --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/UnregisterListener.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.listener.type.command; + +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; + +/** + * Unregister a listener from the EntityTypeListenerRegistry. + */ +public final class UnregisterListener extends TypeListenerRegistryCommand { + public UnregisterListener(final String entityType, final DOMEntityOwnershipListener delegateListener) { + super(entityType, delegateListener); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java new file mode 100644 index 0000000000..8d10119c79 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java @@ -0,0 +1,291 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; + +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.Adapter; +import akka.actor.typed.javadsl.AskPattern; +import akka.actor.typed.javadsl.Behaviors; +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.LWWRegisterKey; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.opendaylight.controller.eos.akka.bootstrap.EOSMain; +import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand; +import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext; +import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate; +import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractNativeEosTest { + + public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1"); + public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2"); + + protected static final List TWO_NODE_SEED_NODES = + List.of("akka://ClusterSystem@127.0.0.1:2550", + "akka://ClusterSystem@127.0.0.1:2551"); + + protected static final List THREE_NODE_SEED_NODES = + List.of("akka://ClusterSystem@127.0.0.1:2550", + "akka://ClusterSystem@127.0.0.1:2551", + "akka://ClusterSystem@127.0.0.1:2552"); + + private static final String REMOTE_PROTOCOL = "akka"; + private static final String PORT_PARAM = "akka.remote.artery.canonical.port"; + private static final String ROLE_PARAM = "akka.cluster.roles"; + private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes"; + + + protected static ClusterNode startupRemote(final int port, final List roles) + throws ExecutionException, InterruptedException { + return startup(port, roles, THREE_NODE_SEED_NODES); + } + + protected static ClusterNode startupRemote(final int port, final List roles, final List seedNodes) + throws ExecutionException, InterruptedException { + return startup(port, roles, seedNodes); + } + + protected static ClusterNode startup(final int port, final List roles) + throws ExecutionException, InterruptedException { + return startup(port, roles, List.of()); + } + + protected static ClusterNode startup(final int port, final List roles, final List seedNodes) + throws ExecutionException, InterruptedException { + + return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior); + } + + protected static ClusterNode startup(final int port, final List roles, final List seedNodes, + final Supplier> bootstrap) + throws ExecutionException, InterruptedException { + // Override the configuration + final Map overrides = new HashMap<>(4); + overrides.put(PORT_PARAM, port); + overrides.put(ROLE_PARAM, roles); + if (!seedNodes.isEmpty()) { + overrides.put(SEED_NODES_PARAM, seedNodes); + } + + final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load()); + + // Create a classic Akka system since thats what we will have in osgi + final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config); + final ActorRef eosBootstrap = + Adapter.spawn(system, bootstrap.get(), "EOSBootstrap"); + + final CompletionStage ask = AskPattern.ask(eosBootstrap, + GetRunningContext::new, + Duration.ofSeconds(5), + Adapter.toTyped(system.scheduler())); + final RunningContext runningContext = ask.toCompletableFuture().get(); + + return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(), + runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor()); + } + + private static Behavior rootBehavior() { + return Behaviors.setup(context -> EOSMain.create()); + } + + protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) { + final ActorRef candidateRegistry = node.getCandidateRegistry(); + registerCandidates(candidateRegistry, entity, members); + } + + protected static void registerCandidates(final ActorRef candidateRegistry, + final DOMEntity entity, final String... members) { + for (final String member : members) { + candidateRegistry.tell(new RegisterCandidate(entity, member)); + } + } + + protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity, + final String... members) { + final ActorRef candidateRegistry = node.getCandidateRegistry(); + for (final String member : members) { + candidateRegistry.tell(new UnregisterCandidate(entity, member)); + } + } + + protected static MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) { + final ActorRef listenerRegistry = node.getListenerRegistry(); + final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0)); + listenerRegistry.tell(new RegisterListener(entity.getType(), listener)); + + return listener; + } + + protected static void reachableMember(final ClusterNode node, final String role) { + reachableMember(node.getOwnerSupervisor(), role); + } + + protected static void reachableMember(final ActorRef ownerSupervisor, final String role) { + ownerSupervisor.tell(new MemberReachableEvent( + new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role))); + } + + protected static void unreachableMember(final ClusterNode node, final String role) { + unreachableMember(node.getOwnerSupervisor(), role); + } + + protected static void unreachableMember(final ActorRef ownerSupervisor, final String role) { + ownerSupervisor.tell(new MemberUnreachableEvent( + new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role))); + } + + protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) { + await().until(() -> { + final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem()); + final CompletionStage>> ask = + AskPattern.ask(distributedData.replicator(), + replyTo -> new Replicator.Get<>( + new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo), + Duration.ofSeconds(5), + clusterNode.getActorSystem().scheduler()); + + final Replicator.GetResponse> response = + ask.toCompletableFuture().get(5, TimeUnit.SECONDS); + + if (response instanceof Replicator.GetSuccess) { + final String owner = ((Replicator.GetSuccess>) response).dataValue().getValue(); + return !owner.isEmpty(); + } + + return false; + }); + } + + protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity, + final boolean hasOwner, final boolean isOwner, final boolean wasOwner) { + await().until(() -> !listener.getChanges().isEmpty()); + + await().untilAsserted(() -> { + final List changes = listener.getChanges(); + final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1); + assertEquals(entity, domEntityOwnershipChange.getEntity()); + + assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner()); + assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner()); + assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner()); + }); + } + + protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) { + await().pollDelay(2, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty()); + } + + protected static final class ClusterNode { + private final int port; + private final List roles; + private final akka.actor.typed.ActorSystem actorSystem; + private final ActorRef eosBootstrap; + private final ActorRef listenerRegistry; + private final ActorRef candidateRegistry; + private final ActorRef ownerSupervisor; + + private ClusterNode(final int port, + final List roles, + final ActorSystem actorSystem, + final ActorRef eosBootstrap, + final ActorRef listenerRegistry, + final ActorRef candidateRegistry, + final ActorRef ownerSupervisor) { + this.port = port; + this.roles = roles; + this.actorSystem = Adapter.toTyped(actorSystem); + this.eosBootstrap = eosBootstrap; + this.listenerRegistry = listenerRegistry; + this.candidateRegistry = candidateRegistry; + this.ownerSupervisor = ownerSupervisor; + } + + public int getPort() { + return port; + } + + public akka.actor.typed.ActorSystem getActorSystem() { + return actorSystem; + } + + public ActorRef getEosBootstrap() { + return eosBootstrap; + } + + public ActorRef getListenerRegistry() { + return listenerRegistry; + } + + public ActorRef getCandidateRegistry() { + return candidateRegistry; + } + + public ActorRef getOwnerSupervisor() { + return ownerSupervisor; + } + + public List getRoles() { + return roles; + } + } + + protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener { + + private final Logger log = LoggerFactory.getLogger(MockEntityOwnershipListener.class); + + private final List changes = new ArrayList<>(); + private final String member; + + public MockEntityOwnershipListener(final String member) { + + this.member = member; + } + + @Override + public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) { + log.info("{} Received ownershipCHanged: {}", member, ownershipChange); + log.info("{} changes: {}", member, changes.size()); + changes.add(ownershipChange); + } + + public List getChanges() { + return changes; + } + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java new file mode 100644 index 0000000000..0c4b5ea9f6 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java @@ -0,0 +1,245 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import akka.actor.ActorSystem; +import akka.actor.testkit.typed.javadsl.ActorTestKit; +import akka.actor.typed.ActorRef; +import akka.actor.typed.javadsl.Adapter; +import akka.actor.typed.javadsl.AskPattern; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator; +import com.typesafe.config.ConfigFactory; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import org.awaitility.Durations; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; +import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException; +import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +public class AkkaEntityOwnershipServiceTest extends AbstractNativeEosTest { + static final String ENTITY_TYPE = "test"; + static final String ENTITY_TYPE2 = "test2"; + static final QName QNAME = QName.create("test", "2015-08-11", "foo"); + static int ID_COUNTER = 1; + + private ActorSystem system; + private akka.actor.typed.ActorSystem typedSystem; + private AkkaEntityOwnershipService service; + private ActorRef replicator; + + @Before + public void setUp() throws Exception { + system = ActorSystem.create("ClusterSystem", ConfigFactory.load()); + typedSystem = Adapter.toTyped(this.system); + replicator = DistributedData.get(typedSystem).replicator(); + + service = new AkkaEntityOwnershipService(system); + } + + @After + public void tearDown() throws InterruptedException, ExecutionException { + service.close(); + ActorTestKit.shutdown(Adapter.toTyped(system)); + } + + @Test + public void testRegisterCandidate() throws Exception { + final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME); + final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); + + final DOMEntityOwnershipCandidateRegistration reg = service.registerCandidate(entity); + + verifyEntityOwnershipCandidateRegistration(entity, reg); + verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1"); + + try { + service.registerCandidate(entity); + fail("Expected CandidateAlreadyRegisteredException"); + } catch (final CandidateAlreadyRegisteredException e) { + // expected + assertEquals("getEntity", entity, e.getEntity()); + } + + final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE2, entityId); + final DOMEntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2); + + verifyEntityOwnershipCandidateRegistration(entity2, reg2); + verifyEntityCandidateRegistered(ENTITY_TYPE2, entityId, "member-1"); + } + + @Test + public void testUnregisterCandidate() throws Exception { + final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME); + final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); + + final DOMEntityOwnershipCandidateRegistration reg = service.registerCandidate(entity); + + verifyEntityOwnershipCandidateRegistration(entity, reg); + verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1"); + + reg.close(); + verifyEntityCandidateMissing(ENTITY_TYPE, entityId, "member-1"); + + service.registerCandidate(entity); + verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1"); + } + + @Test + public void testListenerRegistration() throws Exception { + + final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME); + final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); + final MockEntityOwnershipListener listener = new MockEntityOwnershipListener("member-1"); + + final DOMEntityOwnershipListenerRegistration reg = service.registerListener(entity.getType(), listener); + + assertNotNull("EntityOwnershipListenerRegistration null", reg); + assertEquals("getEntityType", entity.getType(), reg.getEntityType()); + assertEquals("getInstance", listener, reg.getInstance()); + + final DOMEntityOwnershipCandidateRegistration candidate = service.registerCandidate(entity); + + verifyListenerState(listener, entity, true, true, false); + final int changes = listener.getChanges().size(); + + reg.close(); + candidate.close(); + + verifyEntityCandidateMissing(ENTITY_TYPE, entityId, "member-1"); + + service.registerCandidate(entity); + // check listener not called when listener registration closed + await().pollDelay(Durations.TWO_SECONDS).until(() -> listener.getChanges().size() == changes); + } + + @Test + public void testGetOwnershipState() throws Exception { + final DOMEntity entity = new DOMEntity(ENTITY_TYPE, "one"); + + final DOMEntityOwnershipCandidateRegistration registration = service.registerCandidate(entity); + verifyGetOwnershipState(service, entity, EntityOwnershipState.IS_OWNER); + + final RunningContext runningContext = service.getRunningContext(); + registerCandidates(runningContext.getCandidateRegistry(), entity, "member-2"); + + final ActorRef ownerSupervisor = runningContext.getOwnerSupervisor(); + reachableMember(ownerSupervisor, "member-2"); + unreachableMember(ownerSupervisor, "member-1"); + verifyGetOwnershipState(service, entity, EntityOwnershipState.OWNED_BY_OTHER); + + final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, "two"); + final Optional state = service.getOwnershipState(entity2); + assertFalse(state.isPresent()); + + unreachableMember(ownerSupervisor, "member-2"); + verifyGetOwnershipState(service, entity, EntityOwnershipState.NO_OWNER); + } + + @Test + public void testIsCandidateRegistered() throws Exception { + final DOMEntity test = new DOMEntity("test-type", "test"); + + assertFalse(service.isCandidateRegistered(test)); + + service.registerCandidate(test); + + assertTrue(service.isCandidateRegistered(test)); + } + + private static void verifyGetOwnershipState(final DOMEntityOwnershipService service, final DOMEntity entity, + final EntityOwnershipState expState) { + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + final Optional state = service.getOwnershipState(entity); + assertTrue("getOwnershipState present", state.isPresent()); + assertEquals("EntityOwnershipState", expState, state.get()); + }); + } + + private void verifyEntityCandidateRegistered(final String entityType, + final YangInstanceIdentifier entityId, + final String candidateName) { + await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> doVerifyEntityCandidateRegistered(entityType, entityId, candidateName)); + } + + private void doVerifyEntityCandidateRegistered(final String entityType, + final YangInstanceIdentifier entityId, + final String candidateName) + throws ExecutionException, InterruptedException { + final Map> entries = getCandidateData(); + final DOMEntity entity = new DOMEntity(entityType, entityId); + assertTrue(entries.containsKey(entity)); + assertTrue(entries.get(entity).getElements().contains(candidateName)); + } + + private void verifyEntityCandidateMissing(final String entityType, + final YangInstanceIdentifier entityId, + final String candidateName) { + await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> doVerifyEntityCandidateMissing(entityType, entityId, candidateName)); + } + + private void doVerifyEntityCandidateMissing(final String entityType, + final YangInstanceIdentifier entityId, + final String candidateName) + throws ExecutionException, InterruptedException { + final Map> entries = getCandidateData(); + final DOMEntity entity = new DOMEntity(entityType, entityId); + assertTrue(entries.containsKey(entity)); + assertFalse(entries.get(entity).getElements().contains(candidateName)); + } + + private Map> getCandidateData() throws ExecutionException, InterruptedException { + final CompletionStage>>> ask = + AskPattern.ask(replicator, replyTo -> + new Replicator.Get<>( + CandidateRegistry.KEY, + Replicator.readLocal(), + replyTo), + Duration.ofSeconds(5), + typedSystem.scheduler()); + + final Replicator.GetResponse>> response = ask.toCompletableFuture().get(); + assertTrue(response instanceof Replicator.GetSuccess); + + final Replicator.GetSuccess>> success = + (Replicator.GetSuccess>>) response; + + return success.get(CandidateRegistry.KEY).getEntries(); + } + + private static void verifyEntityOwnershipCandidateRegistration(final DOMEntity entity, + final DOMEntityOwnershipCandidateRegistration reg) { + assertNotNull("EntityOwnershipCandidateRegistration null", reg); + assertEquals("getInstance", entity, reg.getInstance()); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/SingleNodeTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/SingleNodeTest.java new file mode 100644 index 0000000000..043af99aa4 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/SingleNodeTest.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka; + +import akka.actor.testkit.typed.javadsl.ActorTestKit; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public class SingleNodeTest extends AbstractNativeEosTest { + + public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1"); + public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2"); + + private ClusterNode clusterNode; + + @Before + public void setUp() throws Exception { + clusterNode = startup(2550, List.of("member-1")); + + reachableMember(clusterNode, "member-2"); + reachableMember(clusterNode, "member-3"); + } + + @After + public void tearDown() { + ActorTestKit.shutdown(clusterNode.getActorSystem()); + } + + @Test + public void testNotificationPriorToCandidateRegistration() { + final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1); + verifyNoNotifications(listener); + + registerCandidates(clusterNode, ENTITY_1, "member-1"); + verifyListenerState(listener, ENTITY_1, true, true, false); + } + + @Test + public void testListenerPriorToAddingCandidates() { + final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1); + + registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3"); + verifyListenerState(listener, ENTITY_1, true, true, false); + + unregisterCandidates(clusterNode, ENTITY_1, "member-1"); + verifyListenerState(listener, ENTITY_1, true, false, true); + } + + @Test + public void testListenerRegistrationAfterCandidates() { + registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3"); + waitUntillOwnerPresent(clusterNode, ENTITY_1); + + final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1); + verifyListenerState(listener, ENTITY_1, true, true, false); + + unregisterCandidates(clusterNode, ENTITY_1, "member-1", "member-2"); + verifyListenerState(listener, ENTITY_1, true, false, true); + } + + @Test + public void testMultipleEntities() { + registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3"); + waitUntillOwnerPresent(clusterNode, ENTITY_1); + + final MockEntityOwnershipListener listener1 = registerListener(clusterNode, ENTITY_1); + final MockEntityOwnershipListener listener2 = registerListener(clusterNode, ENTITY_2); + + verifyListenerState(listener1, ENTITY_1, true, true, false); + verifyNoNotifications(listener2); + + unregisterCandidates(clusterNode, ENTITY_1, "member-1"); + verifyListenerState(listener1, ENTITY_1, true, false, true); + verifyNoNotifications(listener2); + + registerCandidates(clusterNode, ENTITY_2, "member-2"); + verifyListenerState(listener1, ENTITY_1, true, false, true); + verifyListenerState(listener2, ENTITY_2, true, false, false); + + unregisterCandidates(clusterNode, ENTITY_2, "member-2"); + + verifyListenerState(listener1, ENTITY_1, true, false, true); + verifyListenerState(listener2, ENTITY_2, false, false, false); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeBaseTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeBaseTest.java new file mode 100644 index 0000000000..352a842273 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeBaseTest.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka; + +import akka.actor.testkit.typed.javadsl.ActorTestKit; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import akka.cluster.typed.Cluster; +import com.google.common.collect.ImmutableList; +import java.time.Duration; +import java.util.List; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public class ThreeNodeBaseTest extends AbstractNativeEosTest { + public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1"); + public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2"); + + private ClusterNode node1; + private ClusterNode node2; + private ClusterNode node3; + + @Before + public void setUp() throws Exception { + node1 = startupRemote(2550, List.of("member-1")); + node2 = startupRemote(2551, List.of("member-2")); + node3 = startupRemote(2552, List.of("member-3")); + + // need to wait until all nodes are ready + final Cluster cluster = Cluster.get(node3.getActorSystem()); + // need a longer timeout with classic remoting, artery.tcp doesnt need to wait as long for init + Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> { + final List members = ImmutableList.copyOf(cluster.state().getMembers()); + if (members.size() != 3) { + return false; + } + + for (final Member member : members) { + if (!member.status().equals(MemberStatus.up())) { + return false; + } + } + + return true; + }); + } + + @After + public void tearDown() { + // same issue with classic remoting as in setup + ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20)); + ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20)); + ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20)); + } + + @Test + public void testInitialNotificationsWithoutOwner() throws Exception { + final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1); + verifyNoNotifications(listener1); + + final MockEntityOwnershipListener listener2 = registerListener(node2, ENTITY_1); + verifyNoNotifications(listener2); + + final MockEntityOwnershipListener listener3 = registerListener(node3, ENTITY_1); + verifyNoNotifications(listener3); + } + + @Test + public void testInitialNotificationsWithOwner() { + registerCandidates(node1, ENTITY_1, "member-1"); + // make sure we register other candidates after the first is seen everywhere to prevent different results due + // to timing + waitUntillOwnerPresent(node3, ENTITY_1); + + registerCandidates(node2, ENTITY_1, "member-2"); + registerCandidates(node3, ENTITY_1, "member-3"); + + final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1); + verifyListenerState(listener1, ENTITY_1, true, true, false); + + final MockEntityOwnershipListener listener2 = registerListener(node2, ENTITY_1); + verifyListenerState(listener2, ENTITY_1, true, false, false); + + final MockEntityOwnershipListener listener3 = registerListener(node3, ENTITY_1); + verifyListenerState(listener3, ENTITY_1, true, false, false); + } + + @Test + public void testMultipleEntities() { + registerCandidates(node1, ENTITY_1, "member-1"); + registerCandidates(node2, ENTITY_1, "member-2"); + registerCandidates(node3, ENTITY_1, "member-3"); + + waitUntillOwnerPresent(node3, ENTITY_1); + + registerCandidates(node2, ENTITY_2, "member-2"); + waitUntillOwnerPresent(node2, ENTITY_2); + registerCandidates(node1, ENTITY_2, "member-1"); + + final MockEntityOwnershipListener firstEntityListener1 = registerListener(node1, ENTITY_1); + final MockEntityOwnershipListener firstEntityListener2 = registerListener(node2, ENTITY_1); + final MockEntityOwnershipListener firstEntityListener3 = registerListener(node3, ENTITY_1); + + verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false); + verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false); + verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false); + + final MockEntityOwnershipListener secondEntityListener1 = registerListener(node1, ENTITY_2); + final MockEntityOwnershipListener secondEntityListener2 = registerListener(node2, ENTITY_2); + final MockEntityOwnershipListener secondEntityListener3 = registerListener(node3, ENTITY_2); + + verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false); + verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false); + verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false); + + unregisterCandidates(node1, ENTITY_1, "member-1"); + + verifyListenerState(firstEntityListener1, ENTITY_1, true, false, true); + verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false); + verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false); + + unregisterCandidates(node2, ENTITY_1, "member-2"); + + verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false); + verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true); + verifyListenerState(firstEntityListener3, ENTITY_1, true, true, false); + + unregisterCandidates(node3, ENTITY_1, "member-3"); + + verifyListenerState(firstEntityListener1, ENTITY_1, false, false, false); + verifyListenerState(firstEntityListener2, ENTITY_1, false, false, false); + verifyListenerState(firstEntityListener3, ENTITY_1, false, false, true); + + // check second listener hasnt moved + verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false); + verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false); + verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false); + + registerCandidates(node1, ENTITY_1, "member-1"); + + verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false); + verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false); + verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeReachabilityTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeReachabilityTest.java new file mode 100644 index 0000000000..871bc005cc --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeReachabilityTest.java @@ -0,0 +1,251 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka; + +import static org.awaitility.Awaitility.await; + +import akka.actor.testkit.typed.javadsl.ActorTestKit; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import akka.cluster.typed.Cluster; +import com.google.common.collect.ImmutableList; +import java.time.Duration; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public class ThreeNodeReachabilityTest extends AbstractNativeEosTest { + public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1"); + public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2"); + + private ClusterNode node1 = null; + private ClusterNode node2 = null; + private ClusterNode node3 = null; + + @Before + public void setUp() throws Exception { + node1 = startupRemote(2550, List.of("member-1"), TWO_NODE_SEED_NODES); + node2 = startupRemote(2551, List.of("member-2"), TWO_NODE_SEED_NODES); + + // need to wait until all nodes are ready + final Cluster cluster = Cluster.get(node2.getActorSystem()); + await().atMost(Duration.ofSeconds(20)).until(() -> { + final List members = ImmutableList.copyOf(cluster.state().getMembers()); + if (members.size() != 2) { + return false; + } + + for (final Member member : members) { + if (!member.status().equals(MemberStatus.up())) { + return false; + } + } + + return true; + }); + } + + @After + public void tearDown() { + ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20)); + ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20)); + + + if (node3 != null) { + ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20)); + } + } + + @Test + public void testNodeLateStart() throws Exception { + registerCandidates(node1, ENTITY_1, "member-1"); + registerCandidates(node2, ENTITY_1, "member-2"); + + registerCandidates(node2, ENTITY_2, "member-2"); + waitUntillOwnerPresent(node2, ENTITY_2); + registerCandidates(node1, ENTITY_2, "member-1"); + + final MockEntityOwnershipListener firstEntityListener1 = registerListener(node1, ENTITY_1); + final MockEntityOwnershipListener firstEntityListener2 = registerListener(node2, ENTITY_1); + + verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false); + verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false); + + final MockEntityOwnershipListener secondEntityListener1 = registerListener(node1, ENTITY_2); + final MockEntityOwnershipListener secondEntityListener2 = registerListener(node2, ENTITY_2); + + verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false); + verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false); + + unregisterCandidates(node1, ENTITY_1, "member-1"); + + verifyListenerState(firstEntityListener1, ENTITY_1, true, false, true); + verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false); + + unregisterCandidates(node2, ENTITY_1, "member-2"); + + verifyListenerState(firstEntityListener1, ENTITY_1, false, false, false); + verifyListenerState(firstEntityListener2, ENTITY_1, false, false, true); + + startNode3(); + + final MockEntityOwnershipListener firstEntityListener3 = registerListener(node3, ENTITY_1); + verifyListenerState(firstEntityListener3, ENTITY_1, false, false, false); + + final MockEntityOwnershipListener secondEntityListener3 = registerListener(node3, ENTITY_2); + verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false); + + registerCandidates(node3, ENTITY_1, "member-3"); + waitUntillOwnerPresent(node3, ENTITY_1); + + verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false); + verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false); + + verifyListenerState(firstEntityListener3, ENTITY_1, true, true, false); + } + + @Test + public void testReachabilityChangesDuringRuntime() throws Exception { + startNode3(); + + registerCandidates(node2, ENTITY_1, "member-2"); + // we want singleton on node1 but owner on node2 + waitUntillOwnerPresent(node2, ENTITY_1); + + registerCandidates(node1, ENTITY_1, "member-1"); + registerCandidates(node3, ENTITY_1, "member-3"); + + registerCandidates(node2, ENTITY_2, "member-2"); + waitUntillOwnerPresent(node2, ENTITY_2); + registerCandidates(node1, ENTITY_2, "member-1"); + + final MockEntityOwnershipListener firstEntityListener1 = registerListener(node1, ENTITY_1); + final MockEntityOwnershipListener firstEntityListener2 = registerListener(node2, ENTITY_1); + final MockEntityOwnershipListener firstEntityListener3 = registerListener(node3, ENTITY_1); + + verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false); + verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false); + verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false); + + final MockEntityOwnershipListener secondEntityListener1 = registerListener(node1, ENTITY_2); + final MockEntityOwnershipListener secondEntityListener2 = registerListener(node2, ENTITY_2); + final MockEntityOwnershipListener secondEntityListener3 = registerListener(node3, ENTITY_2); + + verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false); + verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false); + verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false); + + unreachableMember(node1, "member-2"); + + verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false); + verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true); + verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false); + + verifyListenerState(secondEntityListener1, ENTITY_2, true, true, false); + verifyListenerState(secondEntityListener2, ENTITY_2, true, false, true); + verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false); + + unreachableMember(node1, "member-3"); + + verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false); + verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true); + verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false); + + unregisterCandidates(node1, ENTITY_1, "member-1"); + unregisterCandidates(node1, ENTITY_2, "member-1"); + + verifyListenerState(firstEntityListener1, ENTITY_1, false, false, true); + verifyListenerState(firstEntityListener2, ENTITY_1, false, false, false); + verifyListenerState(firstEntityListener3, ENTITY_1, false, false, false); + + verifyListenerState(secondEntityListener1, ENTITY_2, false, false, true); + verifyListenerState(secondEntityListener2, ENTITY_2, false, false, false); + verifyListenerState(secondEntityListener3, ENTITY_2, false, false, false); + + reachableMember(node1, "member-2"); + verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false); + verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false); + verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false); + } + + @Test + public void testSingletonMoving() throws Exception { + final MockEntityOwnershipListener listener1 = registerListener(node2, ENTITY_1); + final MockEntityOwnershipListener listener2 = registerListener(node2, ENTITY_2); + verifyNoNotifications(listener1); + verifyNoNotifications(listener2); + + registerCandidates(node1, ENTITY_1, "member-1"); + registerCandidates(node2, ENTITY_1, "member-2"); + + registerCandidates(node2, ENTITY_2, "member-2"); + waitUntillOwnerPresent(node2, ENTITY_2); + registerCandidates(node1, ENTITY_2, "member-1"); + // end up with node1 - member-1, node2 - member-2 owners + verifyListenerState(listener1, ENTITY_1, true, false, false); + verifyListenerState(listener2, ENTITY_2, true, true, false); + + ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20)); + + verifyListenerState(listener1, ENTITY_1, true, true, false); + verifyListenerState(listener2, ENTITY_2, true, true, false); + + startNode3(2); + + final MockEntityOwnershipListener listener3 = registerListener(node3, ENTITY_2); + verifyListenerState(listener3, ENTITY_2, true, false, false); + + node1 = startupRemote(2550, List.of("member-1")); + + final Cluster cluster = Cluster.get(node2.getActorSystem()); + await().atMost(Duration.ofSeconds(20)).until(() -> { + final List members = ImmutableList.copyOf(cluster.state().getMembers()); + if (members.size() != 3) { + return false; + } + + for (final Member member : members) { + if (!member.status().equals(MemberStatus.up())) { + return false; + } + } + + return true; + }); + + final MockEntityOwnershipListener node1Listener = registerListener(node1, ENTITY_1); + verifyListenerState(node1Listener, ENTITY_1, true, false, false); + } + + private void startNode3() throws Exception { + startNode3(3); + } + + private void startNode3(final int membersPresent) throws Exception { + node3 = startupRemote(2552, List.of("member-3"), THREE_NODE_SEED_NODES); + + // need to wait until all nodes are ready + final Cluster cluster = Cluster.get(node2.getActorSystem()); + await().atMost(Duration.ofSeconds(20)).until(() -> { + final List members = ImmutableList.copyOf(cluster.state().getMembers()); + if (members.size() != membersPresent) { + return false; + } + + for (final Member member : members) { + if (!member.status().equals(MemberStatus.up())) { + return false; + } + } + + return true; + }); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java new file mode 100644 index 0000000000..61747622d1 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor; + +import akka.actor.testkit.typed.javadsl.ActorTestKit; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.typed.Cluster; +import akka.cluster.typed.ClusterSingleton; +import akka.cluster.typed.SingletonActor; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.junit.Test; +import org.opendaylight.controller.eos.akka.AbstractNativeEosTest; +import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand; +import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext; +import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext; +import org.opendaylight.controller.eos.akka.owner.checker.OwnerStateChecker; +import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; +import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; +import org.opendaylight.controller.eos.akka.registry.listener.type.EntityTypeListenerRegistry; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public class OwnerSupervisorTest extends AbstractNativeEosTest { + + @Test + public void testCandidatePickingWhenUnreachableCandidates() throws Exception { + + final ClusterNode node = startup(2550, Collections.singletonList("member-1")); + try { + reachableMember(node, "member-2"); + reachableMember(node, "member-3"); + registerCandidates(node, ENTITY_1, "member-1", "member-2", "member-3"); + + final MockEntityOwnershipListener listener = registerListener(node, ENTITY_1); + verifyListenerState(listener, ENTITY_1,true, true, false); + + unreachableMember(node, "member-1"); + verifyListenerState(listener, ENTITY_1, true, false, true); + + unreachableMember(node, "member-2"); + verifyListenerState(listener, ENTITY_1, true, false, false); + + unreachableMember(node, "member-3"); + verifyListenerState(listener, ENTITY_1, false, false, false); + + reachableMember(node, "member-2"); + verifyListenerState(listener, ENTITY_1, true, false, false); + + // no notification here as member-2 is already the owner + reachableMember(node, "member-1"); + + unreachableMember(node, "member-2"); + verifyListenerState(listener, ENTITY_1,true, true, false); + } finally { + ActorTestKit.shutdown(node.getActorSystem()); + } + } + + @Test + public void testSupervisorInitWithMissingOwners() throws Exception { + final Map> candidates = new HashMap<>(); + candidates.put(ENTITY_1, Set.of("member-1")); + candidates.put(ENTITY_2, Set.of("member-2")); + + final ClusterNode node = startup(2550, Collections.singletonList("member-1"), Collections.emptyList(), + () -> mockedBootstrap(candidates, new HashMap<>())); + + try { + waitUntillOwnerPresent(node, ENTITY_1); + + // also do a proper register so the listener from the type lister actor are spawned + registerCandidates(node, ENTITY_1, "member-1"); + registerCandidates(node, ENTITY_2, "member-2"); + + final MockEntityOwnershipListener listener1 = registerListener(node, ENTITY_1); + final MockEntityOwnershipListener listener2 = registerListener(node, ENTITY_2); + + // first entity should have correctly assigned owner as its reachable + verifyListenerState(listener1, ENTITY_1, true, true, false); + // this one could not be assigned during init as we dont have member-2 thats reachable + verifyListenerState(listener2, ENTITY_2, false, false, false); + + reachableMember(node, "member-2"); + verifyListenerState(listener2, ENTITY_2, true, false, false); + } finally { + ActorTestKit.shutdown(node.getActorSystem()); + } + } + + private static Behavior mockedBootstrap(final Map> currentCandidates, + final Map currentOwners) { + return Behaviors.setup(context -> MockBootstrap.create(currentCandidates, currentOwners)); + } + + /** + * Initial behavior that skips initial sync and instead initializes OwnerSupervisor with provided values. + */ + private static final class MockSyncer extends AbstractBehavior { + + private final Map> currentCandidates; + private final Map currentOwners; + + private MockSyncer(final ActorContext context, + final Map> currentCandidates, + final Map currentOwners) { + super(context); + this.currentCandidates = currentCandidates; + this.currentOwners = currentOwners; + + context.getSelf().tell(new InitialCandidateSync(null)); + } + + public static Behavior create(final Map> currentCandidates, + final Map currentOwners) { + return Behaviors.setup(ctx -> new MockSyncer(ctx, currentCandidates, currentOwners)); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(InitialCandidateSync.class, this::switchToSupervisor) + .build(); + } + + private Behavior switchToSupervisor(final InitialCandidateSync message) { + return OwnerSupervisor.create(currentCandidates, currentOwners); + } + } + + /** + * Bootstrap with OwnerSyncer replaced with the testing syncer behavior. + */ + private static final class MockBootstrap extends AbstractBehavior { + + private final ActorRef listenerRegistry; + private final ActorRef candidateRegistry; + private final ActorRef ownerStateChecker; + private final ActorRef ownerSupervisor; + + private MockBootstrap(final ActorContext context, + final Map> currentCandidates, + final Map currentOwners) { + super(context); + + final Cluster cluster = Cluster.get(context.getSystem()); + final String role = cluster.selfMember().getRoles().iterator().next(); + + listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry"); + candidateRegistry = context.spawn(CandidateRegistry.create(), "CandidateRegistry"); + ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker"); + + final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem()); + // start the initial sync behavior that switches to the regular one after syncing + ownerSupervisor = clusterSingleton.init(SingletonActor.of( + MockSyncer.create(currentCandidates, currentOwners), "OwnerSupervisor")); + } + + public static Behavior create(final Map> currentCandidates, + final Map currentOwners) { + return Behaviors.setup(ctx -> new MockBootstrap(ctx, currentCandidates, currentOwners)); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(GetRunningContext.class, this::onGetRunningContext) + .build(); + } + + private Behavior onGetRunningContext(final GetRunningContext request) { + request.getReplyTo().tell( + new RunningContext(listenerRegistry, candidateRegistry,ownerStateChecker, ownerSupervisor)); + return this; + } + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/resources/application.conf b/opendaylight/md-sal/eos-dom-akka/src/test/resources/application.conf new file mode 100644 index 0000000000..71c983a3fc --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/test/resources/application.conf @@ -0,0 +1,25 @@ +akka { + loglevel = debug + actor { + warn-about-java-serializer-usage = off + allow-java-serialization = on + provider = cluster + } + + remote { + artery { + enabled = on + canonical.hostname = "127.0.0.1" + canonical.port = 2550 + } + } + cluster { + seed-nodes = [ + "akka://ClusterSystem@127.0.0.1:2550"] + roles = [ + "member-1" + ] + downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + } +} + diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties b/opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties new file mode 100644 index 0000000000..942eeee610 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties @@ -0,0 +1,7 @@ +org.slf4j.simpleLogger.defaultLogLevel=info +org.slf4j.simpleLogger.showDateTime=true +org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a +org.slf4j.simpleLogger.logFile=System.out +org.slf4j.simpleLogger.showShortLogName=true +org.slf4j.simpleLogger.levelInBrackets=true +org.slf4j.simpleLogger.log.org.opendaylight.controller.eos.akka=debug diff --git a/opendaylight/md-sal/pom.xml b/opendaylight/md-sal/pom.xml index 976d74bcef..6b63828472 100644 --- a/opendaylight/md-sal/pom.xml +++ b/opendaylight/md-sal/pom.xml @@ -47,6 +47,7 @@ sal-cluster-admin-api sal-cluster-admin-impl sal-distributed-eos + eos-dom-akka sal-test-model -- 2.36.6