Introduce DOMEntityOwnershipService replacement 31/95731/19
authorTomas Cere <tomas.cere@pantheon.tech>
Thu, 18 Mar 2021 10:32:54 +0000 (11:32 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 3 Jun 2021 12:20:39 +0000 (14:20 +0200)
Introduce a new DOMEntityOwnershipService implementation backed by
Akka's distributed-data and cluster-singleton. This severs an
implementation link to sal-distributed-datastore, reducing overall
complexity.

JIRA: CONTROLLER-1982
Change-Id: I30753e83d1af10658141311842bdceb315b9237f
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
65 files changed:
akka/repackaged-akka-jar/pom.xml
akka/repackaged-akka-jar/src/main/resources/actor_typed_reference.conf [new file with mode: 0644]
akka/repackaged-akka-jar/src/main/resources/cluster_tools_reference.conf [new file with mode: 0644]
akka/repackaged-akka-jar/src/main/resources/cluster_typed_reference.conf [new file with mode: 0644]
akka/repackaged-akka-jar/src/main/resources/distributed_data_reference.conf [new file with mode: 0644]
akka/repackaged-akka-jar/src/main/resources/reference.conf
akka/repackaged-akka/pom.xml
artifacts/pom.xml
bundle-parent/pom.xml
features/odl-controller-akka/src/main/history/dependencies.xml
features/odl-mdsal-distributed-datastore/pom.xml
opendaylight/md-sal/eos-dom-akka/pom.xml [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/CandidateRegistration.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/ListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/BootstrapCommand.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/GetRunningContext.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/RunningContext.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/Terminate.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipState.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipStateReply.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/InternalGetReply.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerCommand.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerReply.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/CandidatesChanged.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialCandidateSync.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialOwnerSync.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InternalClusterEvent.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberDownEvent.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberReachableEvent.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUnreachableEvent.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUpEvent.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerChanged.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorCommand.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistry.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/AbstractCandidateCommand.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRegistryCommand.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InternalUpdateResponse.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/RegisterCandidate.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/UnregisterCandidate.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/SingleEntityListenerActor.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/InitialOwnerSync.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/ListenerCommand.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/OwnerChanged.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerRegistry.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/CandidatesChanged.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/EntityOwnerChanged.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/RegisterListener.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerCommand.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerRegistryCommand.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/UnregisterListener.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/SingleNodeTest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeBaseTest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeReachabilityTest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/resources/application.conf [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties [new file with mode: 0644]
opendaylight/md-sal/pom.xml

index eb2393b..e637cad 100644 (file)
             <artifactId>akka-actor_2.13</artifactId>
             <version>2.6.12</version>
         </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor-typed_2.13</artifactId>
+            <version>2.6.12</version>
+        </dependency>
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-cluster_2.13</artifactId>
             <version>2.6.12</version>
         </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-cluster-typed_2.13</artifactId>
+            <version>2.6.12</version>
+        </dependency>
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-osgi_2.13</artifactId>
diff --git a/akka/repackaged-akka-jar/src/main/resources/actor_typed_reference.conf b/akka/repackaged-akka-jar/src/main/resources/actor_typed_reference.conf
new file mode 100644 (file)
index 0000000..d34d52a
--- /dev/null
@@ -0,0 +1,129 @@
+akka.actor.typed {
+
+  # List FQCN of `akka.actor.typed.ExtensionId`s which shall be loaded at actor system startup.
+  # Should be on the format: 'extensions = ["com.example.MyExtId1", "com.example.MyExtId2"]' etc.
+  # See the Akka Documentation for more info about Extensions
+  extensions = []
+
+  # List FQCN of extensions which shall be loaded at actor system startup.
+  # Library extensions are regular extensions that are loaded at startup and are
+  # available for third party library authors to enable auto-loading of extensions when
+  # present on the classpath. This is done by appending entries:
+  # 'library-extensions += "Extension"' in the library `reference.conf`.
+  #
+  # Should not be set by end user applications in 'application.conf', use the extensions property for that
+  #
+  library-extensions = ${?akka.actor.typed.library-extensions} []
+
+  # Receptionist is started eagerly to allow clustered receptionist to gather remote registrations early on.
+  library-extensions += "akka.actor.typed.receptionist.Receptionist$"
+
+  # While an actor is restarted (waiting for backoff to expire and children to stop)
+  # incoming messages and signals are stashed, and delivered later to the newly restarted
+  # behavior. This property defines the capacity in number of messages of the stash
+  # buffer. If the capacity is exceed then additional incoming messages are dropped.
+  restart-stash-capacity = 1000
+
+  # Typed mailbox defaults to the single consumber mailbox as balancing dispatcher is not supported
+  default-mailbox {
+    mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
+  }
+}
+
+# Load typed extensions by a classic extension.
+akka.library-extensions += "akka.actor.typed.internal.adapter.ActorSystemAdapter$LoadTypedExtensions"
+
+akka.actor {
+  serializers {
+    typed-misc = "akka.actor.typed.internal.MiscMessageSerializer"
+    service-key = "akka.actor.typed.internal.receptionist.ServiceKeySerializer"
+  }
+
+  serialization-identifiers {
+    "akka.actor.typed.internal.MiscMessageSerializer" = 24
+    "akka.actor.typed.internal.receptionist.ServiceKeySerializer" = 26
+  }
+
+  serialization-bindings {
+    "akka.actor.typed.ActorRef" = typed-misc
+    "akka.actor.typed.internal.adapter.ActorRefAdapter" = typed-misc
+    "akka.actor.typed.internal.receptionist.DefaultServiceKey" = service-key
+  }
+}
+
+# When using Akka Typed (having akka-actor-typed in classpath) the
+# akka.event.slf4j.Slf4jLogger is enabled instead of the DefaultLogger
+# even though it has not been explicitly defined in `akka.loggers`
+# configuration.
+#
+# Slf4jLogger will be used for all Akka classic logging via eventStream,
+# including logging from Akka internals. The Slf4jLogger is then using
+# an ordinary org.slf4j.Logger to emit the log events.
+#
+# The Slf4jLoggingFilter is also enabled automatically.
+#
+# This behavior can be disabled by setting this property to `off`.
+akka.use-slf4j = on
+
+akka.reliable-delivery {
+  producer-controller {
+
+    # To avoid head of line blocking from serialization and transfer
+    # of large messages this can be enabled.
+    # Large messages are chunked into pieces of the given size in bytes. The
+    # chunked messages are sent separatetely and assembled on the consumer side.
+    # Serialization and deserialization is performed by the ProducerController and
+    # ConsumerController respectively instead of in the remote transport layer.
+    chunk-large-messages = off
+
+    durable-queue {
+      # The ProducerController uses this timeout for the requests to
+      # the durable queue. If there is no reply within the timeout it
+      # will be retried.
+      request-timeout = 3s
+
+      # The ProducerController retries requests to the durable queue this
+      # number of times before failing.
+      retry-attempts = 10
+
+      # The ProducerController retries sending the first message with this interval
+      # until it has been confirmed.
+      resend-first-interval = 1s
+    }
+  }
+
+  consumer-controller {
+    # Number of messages in flight between ProducerController and
+    # ConsumerController. The ConsumerController requests for more messages
+    # when half of the window has been used.
+    flow-control-window = 50
+
+    # The ConsumerController resends flow control messages to the
+    # ProducerController with the resend-interval-min, and increasing
+    # it gradually to resend-interval-max when idle.
+    resend-interval-min = 2s
+    resend-interval-max = 30s
+
+    # If this is enabled lost messages will not be resent, but flow control is used.
+    # This can be more efficient since messages don't have to be
+    # kept in memory in the `ProducerController` until they have been
+    # confirmed, but the drawback is that lost messages will not be delivered.
+    only-flow-control = false
+  }
+
+  work-pulling {
+    producer-controller = ${akka.reliable-delivery.producer-controller}
+    producer-controller {
+      # Limit of how many messages that can be buffered when there
+      # is no demand from the consumer side.
+      buffer-size = 1000
+
+      # Ask timeout for sending message to worker until receiving Ack from worker
+      internal-ask-timeout = 60s
+
+      # Chunked messages not implemented for work-pulling yet. Override to not
+      # propagate property from akka.reliable-delivery.producer-controller.
+      chunk-large-messages = off
+    }
+  }
+}
diff --git a/akka/repackaged-akka-jar/src/main/resources/cluster_tools_reference.conf b/akka/repackaged-akka-jar/src/main/resources/cluster_tools_reference.conf
new file mode 100644 (file)
index 0000000..6fb9b32
--- /dev/null
@@ -0,0 +1,231 @@
+############################################
+# Akka Cluster Tools Reference Config File #
+############################################
+
+# This is the reference config file that contains all the default settings.
+# Make your edits/overrides in your application.conf.
+
+# //#pub-sub-ext-config
+# Settings for the DistributedPubSub extension
+akka.cluster.pub-sub {
+  # Actor name of the mediator actor, /system/distributedPubSubMediator
+  name = distributedPubSubMediator
+
+  # Start the mediator on members tagged with this role.
+  # All members are used if undefined or empty.
+  role = ""
+
+  # The routing logic to use for 'Send'
+  # Possible values: random, round-robin, broadcast
+  routing-logic = random
+
+  # How often the DistributedPubSubMediator should send out gossip information
+  gossip-interval = 1s
+
+  # Removed entries are pruned after this duration
+  removed-time-to-live = 120s
+
+  # Maximum number of elements to transfer in one message when synchronizing the registries.
+  # Next chunk will be transferred in next round of gossip.
+  max-delta-elements = 3000
+
+  # When a message is published to a topic with no subscribers send it to the dead letters.
+  send-to-dead-letters-when-no-subscribers = on
+
+  # The id of the dispatcher to use for DistributedPubSubMediator actors.
+  # If specified you need to define the settings of the actual dispatcher.
+  use-dispatcher = "akka.actor.internal-dispatcher"
+}
+# //#pub-sub-ext-config
+
+# Protobuf serializer for cluster DistributedPubSubMeditor messages
+akka.actor {
+  serializers {
+    akka-pubsub = "akka.cluster.pubsub.protobuf.DistributedPubSubMessageSerializer"
+  }
+  serialization-bindings {
+    "akka.cluster.pubsub.DistributedPubSubMessage" = akka-pubsub
+    "akka.cluster.pubsub.DistributedPubSubMediator$Internal$SendToOneSubscriber" = akka-pubsub
+  }
+  serialization-identifiers {
+    "akka.cluster.pubsub.protobuf.DistributedPubSubMessageSerializer" = 9
+  }
+}
+
+
+# //#receptionist-ext-config
+# Settings for the ClusterClientReceptionist extension
+akka.cluster.client.receptionist {
+  # Actor name of the ClusterReceptionist actor, /system/receptionist
+  name = receptionist
+
+  # Start the receptionist on members tagged with this role.
+  # All members are used if undefined or empty.
+  role = ""
+
+  # The receptionist will send this number of contact points to the client
+  number-of-contacts = 3
+
+  # The actor that tunnel response messages to the client will be stopped
+  # after this time of inactivity.
+  response-tunnel-receive-timeout = 30s
+
+  # The id of the dispatcher to use for ClusterReceptionist actors.
+  # If specified you need to define the settings of the actual dispatcher.
+  use-dispatcher = "akka.actor.internal-dispatcher"
+
+  # How often failure detection heartbeat messages should be received for
+  # each ClusterClient
+  heartbeat-interval = 2s
+
+  # Number of potentially lost/delayed heartbeats that will be
+  # accepted before considering it to be an anomaly.
+  # The ClusterReceptionist is using the akka.remote.DeadlineFailureDetector, which
+  # will trigger if there are no heartbeats within the duration
+  # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
+  # the default settings.
+  acceptable-heartbeat-pause = 13s
+
+  # Failure detection checking interval for checking all ClusterClients
+  failure-detection-interval = 2s
+}
+# //#receptionist-ext-config
+
+# //#cluster-client-config
+# Settings for the ClusterClient
+akka.cluster.client {
+  # Actor paths of the ClusterReceptionist actors on the servers (cluster nodes)
+  # that the client will try to contact initially. It is mandatory to specify
+  # at least one initial contact.
+  # Comma separated full actor paths defined by a string on the form of
+  # "akka://system@hostname:port/system/receptionist"
+  initial-contacts = []
+
+  # Interval at which the client retries to establish contact with one of
+  # ClusterReceptionist on the servers (cluster nodes)
+  establishing-get-contacts-interval = 3s
+
+  # Interval at which the client will ask the ClusterReceptionist for
+  # new contact points to be used for next reconnect.
+  refresh-contacts-interval = 60s
+
+  # How often failure detection heartbeat messages should be sent
+  heartbeat-interval = 2s
+
+  # Number of potentially lost/delayed heartbeats that will be
+  # accepted before considering it to be an anomaly.
+  # The ClusterClient is using the akka.remote.DeadlineFailureDetector, which
+  # will trigger if there are no heartbeats within the duration
+  # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
+  # the default settings.
+  acceptable-heartbeat-pause = 13s
+
+  # If connection to the receptionist is not established the client will buffer
+  # this number of messages and deliver them the connection is established.
+  # When the buffer is full old messages will be dropped when new messages are sent
+  # via the client. Use 0 to disable buffering, i.e. messages will be dropped
+  # immediately if the location of the singleton is unknown.
+  # Maximum allowed buffer size is 10000.
+  buffer-size = 1000
+
+  # If connection to the receiptionist is lost and the client has not been
+  # able to acquire a new connection for this long the client will stop itself.
+  # This duration makes it possible to watch the cluster client and react on a more permanent
+  # loss of connection with the cluster, for example by accessing some kind of
+  # service registry for an updated set of initial contacts to start a new cluster client with.
+  # If this is not wanted it can be set to "off" to disable the timeout and retry
+  # forever.
+  reconnect-timeout = off
+}
+# //#cluster-client-config
+
+# Protobuf serializer for ClusterClient messages
+akka.actor {
+  serializers {
+    akka-cluster-client = "akka.cluster.client.protobuf.ClusterClientMessageSerializer"
+  }
+  serialization-bindings {
+    "akka.cluster.client.ClusterClientMessage" = akka-cluster-client
+  }
+  serialization-identifiers {
+    "akka.cluster.client.protobuf.ClusterClientMessageSerializer" = 15
+  }
+}
+
+# //#singleton-config
+akka.cluster.singleton {
+  # The actor name of the child singleton actor.
+  singleton-name = "singleton"
+
+  # Singleton among the nodes tagged with specified role.
+  # If the role is not specified it's a singleton among all nodes in the cluster.
+  role = ""
+
+  # When a node is becoming oldest it sends hand-over request to previous oldest,
+  # that might be leaving the cluster. This is retried with this interval until
+  # the previous oldest confirms that the hand over has started or the previous
+  # oldest member is removed from the cluster (+ akka.cluster.down-removal-margin).
+  hand-over-retry-interval = 1s
+
+  # The number of retries are derived from hand-over-retry-interval and
+  # akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin),
+  # but it will never be less than this property.
+  # After the hand over retries and it's still not able to exchange the hand over messages
+  # with the previous oldest it will restart itself by throwing ClusterSingletonManagerIsStuck,
+  # to start from a clean state. After that it will still not start the singleton instance
+  # until the previous oldest node has been removed from the cluster.
+  # On the other side, on the previous oldest node, the same number of retries - 3 are used
+  # and after that the singleton instance is stopped.
+  # For large clusters it might be necessary to increase this to avoid too early timeouts while
+  # gossip dissemination of the Leaving to Exiting phase occurs. For normal leaving scenarios
+  # it will not be a quicker hand over by reducing this value, but in extreme failure scenarios
+  # the recovery might be faster.
+  min-number-of-hand-over-retries = 15
+
+  # Config path of the lease to be taken before creating the singleton actor
+  # if the lease is lost then the actor is restarted and it will need to re-acquire the lease
+  # the default is no lease
+  use-lease = ""
+
+  # The interval between retries for acquiring the lease
+  lease-retry-interval = 5s
+}
+# //#singleton-config
+
+# //#singleton-proxy-config
+akka.cluster.singleton-proxy {
+  # The actor name of the singleton actor that is started by the ClusterSingletonManager
+  singleton-name = ${akka.cluster.singleton.singleton-name}
+
+  # The role of the cluster nodes where the singleton can be deployed.
+  # Corresponding to the role used by the `ClusterSingletonManager`. If the role is not
+  # specified it's a singleton among all nodes in the cluster, and the `ClusterSingletonManager`
+  # must then also be configured in same way.
+  role = ""
+
+  # Interval at which the proxy will try to resolve the singleton instance.
+  singleton-identification-interval = 1s
+
+  # If the location of the singleton is unknown the proxy will buffer this
+  # number of messages and deliver them when the singleton is identified.
+  # When the buffer is full old messages will be dropped when new messages are
+  # sent via the proxy.
+  # Use 0 to disable buffering, i.e. messages will be dropped immediately if
+  # the location of the singleton is unknown.
+  # Maximum allowed buffer size is 10000.
+  buffer-size = 1000
+}
+# //#singleton-proxy-config
+
+# Serializer for cluster ClusterSingleton messages
+akka.actor {
+  serializers {
+    akka-singleton = "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer"
+  }
+  serialization-bindings {
+    "akka.cluster.singleton.ClusterSingletonMessage" = akka-singleton
+  }
+  serialization-identifiers {
+    "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer" = 14
+  }
+}
\ No newline at end of file
diff --git a/akka/repackaged-akka-jar/src/main/resources/cluster_typed_reference.conf b/akka/repackaged-akka-jar/src/main/resources/cluster_typed_reference.conf
new file mode 100644 (file)
index 0000000..4cd45a5
--- /dev/null
@@ -0,0 +1,66 @@
+############################################
+# Akka Cluster Typed Reference Config File #
+############################################
+
+# This is the reference config file that contains all the default settings.
+# Make your edits/overrides in your application.conf.
+
+akka.cluster.typed.receptionist {
+  # Updates with Distributed Data are done with this consistency level.
+  # Possible values: local, majority, all, 2, 3, 4 (n)
+  write-consistency = local
+
+  # Period task to remove actor references that are hosted by removed nodes,
+  # in case of abrupt termination.
+  pruning-interval = 3 s
+
+  # The periodic task to remove actor references that are hosted by removed nodes
+  # will only remove entries older than this duration. The reason for this
+  # is to avoid removing entries of nodes that haven't been visible as joining.
+  prune-removed-older-than = 60 s
+
+  # Shard the services over this many Distributed Data keys, with large amounts of different
+  # service keys storing all of them in the same Distributed Data entry would lead to large updates
+  # etc. instead the keys are sharded across this number of keys. This must be the same on all nodes
+  # in a cluster, changing it requires a full cluster restart (stopping all nodes before starting them again)
+  distributed-key-count = 5
+
+  # Settings for the Distributed Data replicator used by Receptionist.
+  # Same layout as akka.cluster.distributed-data.
+  distributed-data = ${akka.cluster.distributed-data}
+  # make sure that by default it's for all roles (Play loads config in different way)
+  distributed-data.role = ""
+}
+
+akka.cluster.ddata.typed {
+  # The timeout to use for ask operations in ReplicatorMessageAdapter.
+  # This should be longer than the timeout given in Replicator.WriteConsistency and
+  # Replicator.ReadConsistency. The replicator will always send a reply within those
+  # timeouts so the unexpected ask timeout should not occur, but for cleanup in a
+  # failure situation it must still exist.
+  # If askUpdate, askGet or askDelete takes longer then this timeout a
+  # java.util.concurrent.TimeoutException will be thrown by the requesting actor and
+  # may be handled by supervision.
+  replicator-message-adapter-unexpected-ask-timeout = 20 s
+}
+
+akka {
+  actor {
+    serialization-identifiers {
+      "akka.cluster.typed.internal.AkkaClusterTypedSerializer" = 28
+      "akka.cluster.typed.internal.delivery.ReliableDeliverySerializer" = 36
+    }
+    serializers {
+      typed-cluster = "akka.cluster.typed.internal.AkkaClusterTypedSerializer"
+      reliable-delivery = "akka.cluster.typed.internal.delivery.ReliableDeliverySerializer"
+    }
+    serialization-bindings {
+      "akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster
+      "akka.actor.typed.internal.pubsub.TopicImpl$MessagePublished" = typed-cluster
+      "akka.actor.typed.delivery.internal.DeliverySerializable" = reliable-delivery
+    }
+  }
+  cluster.configuration-compatibility-check.checkers {
+    receptionist = "akka.cluster.typed.internal.receptionist.ClusterReceptionistConfigCompatChecker"
+  }
+}
diff --git a/akka/repackaged-akka-jar/src/main/resources/distributed_data_reference.conf b/akka/repackaged-akka-jar/src/main/resources/distributed_data_reference.conf
new file mode 100644 (file)
index 0000000..f716157
--- /dev/null
@@ -0,0 +1,159 @@
+##############################################
+# Akka Distributed DataReference Config File #
+##############################################
+
+# This is the reference config file that contains all the default settings.
+# Make your edits/overrides in your application.conf.
+
+
+#//#distributed-data
+# Settings for the DistributedData extension
+akka.cluster.distributed-data {
+  # Actor name of the Replicator actor, /system/ddataReplicator
+  name = ddataReplicator
+
+  # Replicas are running on members tagged with this role.
+  # All members are used if undefined or empty.
+  role = ""
+
+  # How often the Replicator should send out gossip information
+  gossip-interval = 2 s
+  
+  # How often the subscribers will be notified of changes, if any
+  notify-subscribers-interval = 500 ms
+
+  # Logging of data with payload size in bytes larger than
+  # this value. Maximum detected size per key is logged once,
+  # with an increase threshold of 10%.
+  # It can be disabled by setting the property to off.
+  log-data-size-exceeding = 10 KiB
+
+  # Maximum number of entries to transfer in one round of gossip exchange when
+  # synchronizing the replicas. Next chunk will be transferred in next round of gossip.
+  # The actual number of data entries in each Gossip message is dynamically
+  # adjusted to not exceed the maximum remote message size (maximum-frame-size).
+  max-delta-elements = 500
+  
+  # The id of the dispatcher to use for Replicator actors.
+  # If specified you need to define the settings of the actual dispatcher.
+  use-dispatcher = "akka.actor.internal-dispatcher"
+
+  # How often the Replicator checks for pruning of data associated with
+  # removed cluster nodes. If this is set to 'off' the pruning feature will
+  # be completely disabled.
+  pruning-interval = 120 s
+  
+  # How long time it takes to spread the data to all other replica nodes.
+  # This is used when initiating and completing the pruning process of data associated
+  # with removed cluster nodes. The time measurement is stopped when any replica is 
+  # unreachable, but it's still recommended to configure this with certain margin.
+  # It should be in the magnitude of minutes even though typical dissemination time
+  # is shorter (grows logarithmic with number of nodes). There is no advantage of 
+  # setting this too low. Setting it to large value will delay the pruning process.
+  max-pruning-dissemination = 300 s
+  
+  # The markers of that pruning has been performed for a removed node are kept for this
+  # time and thereafter removed. If and old data entry that was never pruned is somehow
+  # injected and merged with existing data after this time the value will not be correct.
+  # This would be possible (although unlikely) in the case of a long network partition.
+  # It should be in the magnitude of hours. For durable data it is configured by 
+  # 'akka.cluster.distributed-data.durable.pruning-marker-time-to-live'.
+ pruning-marker-time-to-live = 6 h
+  
+  # Serialized Write and Read messages are cached when they are sent to 
+  # several nodes. If no further activity they are removed from the cache
+  # after this duration.
+  serializer-cache-time-to-live = 10s
+
+  # Update and Get operations are sent to oldest nodes first.
+  # This is useful together with Cluster Singleton, which is running on oldest nodes.
+  prefer-oldest = off
+  
+  # Settings for delta-CRDT
+  delta-crdt {
+    # enable or disable delta-CRDT replication
+    enabled = on
+    
+    # Some complex deltas grow in size for each update and above this
+    # threshold such deltas are discarded and sent as full state instead.
+    # This is number of elements or similar size hint, not size in bytes.
+    max-delta-size = 50
+  }
+  
+  durable {
+    # List of keys that are durable. Prefix matching is supported by using * at the
+    # end of a key.  
+    keys = []
+    
+    # The markers of that pruning has been performed for a removed node are kept for this
+    # time and thereafter removed. If and old data entry that was never pruned is
+    # injected and merged with existing data after this time the value will not be correct.
+    # This would be possible if replica with durable data didn't participate in the pruning
+    # (e.g. it was shutdown) and later started after this time. A durable replica should not 
+    # be stopped for longer time than this duration and if it is joining again after this
+    # duration its data should first be manually removed (from the lmdb directory).
+    # It should be in the magnitude of days. Note that there is a corresponding setting
+    # for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'.
+    pruning-marker-time-to-live = 10 d
+    
+    # Fully qualified class name of the durable store actor. It must be a subclass
+    # of akka.actor.Actor and handle the protocol defined in 
+    # akka.cluster.ddata.DurableStore. The class must have a constructor with 
+    # com.typesafe.config.Config parameter.
+    store-actor-class = akka.cluster.ddata.LmdbDurableStore
+    
+    use-dispatcher = akka.cluster.distributed-data.durable.pinned-store
+    
+    pinned-store {
+      executor = thread-pool-executor
+      type = PinnedDispatcher
+    }
+    
+    # Config for the LmdbDurableStore
+    lmdb {
+      # Directory of LMDB file. There are two options:
+      # 1. A relative or absolute path to a directory that ends with 'ddata'
+      #    the full name of the directory will contain name of the ActorSystem
+      #    and its remote port.
+      # 2. Otherwise the path is used as is, as a relative or absolute path to
+      #    a directory.
+      #
+      # When running in production you may want to configure this to a specific
+      # path (alt 2), since the default directory contains the remote port of the
+      # actor system to make the name unique. If using a dynamically assigned 
+      # port (0) it will be different each time and the previously stored data 
+      # will not be loaded.
+      dir = "ddata"
+      
+      # Size in bytes of the memory mapped file.
+      map-size = 100 MiB
+      
+      # Accumulate changes before storing improves performance with the
+      # risk of losing the last writes if the JVM crashes.
+      # The interval is by default set to 'off' to write each update immediately.
+      # Enabling write behind by specifying a duration, e.g. 200ms, is especially 
+      # efficient when performing many writes to the same key, because it is only 
+      # the last value for each key that will be serialized and stored.  
+      # write-behind-interval = 200 ms
+      write-behind-interval = off
+    }
+  }
+  
+}
+#//#distributed-data
+
+# Protobuf serializer for cluster DistributedData messages
+akka.actor {
+  serializers {
+    akka-data-replication = "akka.cluster.ddata.protobuf.ReplicatorMessageSerializer"
+    akka-replicated-data = "akka.cluster.ddata.protobuf.ReplicatedDataSerializer"
+  }
+  serialization-bindings {
+    "akka.cluster.ddata.Replicator$ReplicatorMessage" = akka-data-replication
+    "akka.cluster.ddata.ReplicatedDataSerialization" = akka-replicated-data
+  }
+  serialization-identifiers {
+    "akka.cluster.ddata.protobuf.ReplicatedDataSerializer" = 11
+    "akka.cluster.ddata.protobuf.ReplicatorMessageSerializer" = 12
+  }
+}
index e096415..7e60da8 100644 (file)
@@ -1,5 +1,9 @@
 include "actor_reference.conf"
+include "actor_typed_reference.conf"
 include "cluster_reference.conf"
+include "cluster_tools_reference.conf"
+include "cluster_typed_reference.conf"
+include "distributed_data_reference.conf"
 include "persistence_reference.conf"
 include "remote_reference.conf"
 include "stream_reference.conf"
index 4416bf2..1b436a9 100644 (file)
             <groupId>org.reactivestreams</groupId>
             <artifactId>reactive-streams</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.lmdbjava</groupId>
+            <artifactId>lmdbjava</artifactId>
+            <version>0.7.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.github.jnr</groupId>
+                    <artifactId>jffi</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.github.jnr</groupId>
+                    <artifactId>jnr-ffi</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.github.jnr</groupId>
+                    <artifactId>jnr-constants</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
index 33c5a42..6e83e39 100644 (file)
                 <artifactId>cds-mgmt-api</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>eos-dom-akka</artifactId>
+                <version>${project.version}</version>
+            </dependency>
 
             <!-- Toaster -->
             <dependency>
index 7bc1df3..c2776c0 100644 (file)
                     </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>com.typesafe.akka</groupId>
+                <artifactId>akka-actor-testkit-typed_2.13</artifactId>
+                <version>2.6.12</version>
+                <scope>test</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>com.typesafe.akka</groupId>
+                        <artifactId>akka-actor-typed_2.13</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.typesafe.akka</groupId>
+                        <artifactId>akka-slf4j_2.13</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
             <dependency>
                 <groupId>com.typesafe.akka</groupId>
                 <artifactId>akka-persistence-tck_2.13</artifactId>
index eec1a76..35f150f 100644 (file)
@@ -10,5 +10,7 @@
         <bundle>mvn:org.agrona/agrona/1.8.0</bundle>
         <bundle>mvn:org.opendaylight.controller/repackaged-akka/${project.version}</bundle>
         <bundle>mvn:org.reactivestreams/reactive-streams/1.0.3</bundle>
+        <feature>wrap</feature>
+        <bundle>wrap:mvn:org.lmdbjava/lmdbjava/0.7.0</bundle>
     </feature>
 </features>
index 800b038..dd3bc1e 100644 (file)
@@ -87,7 +87,7 @@
         </dependency>
         <dependency>
             <groupId>org.opendaylight.controller</groupId>
-            <artifactId>sal-distributed-eos</artifactId>
+            <artifactId>eos-dom-akka</artifactId>
         </dependency>
         <dependency>
             <groupId>org.opendaylight.controller</groupId>
diff --git a/opendaylight/md-sal/eos-dom-akka/pom.xml b/opendaylight/md-sal/eos-dom-akka/pom.xml
new file mode 100644 (file)
index 0000000..510ef37
--- /dev/null
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+  ~
+  ~ This program and the accompanying materials are made available under the
+  ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+  ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>bundle-parent</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
+        <relativePath>../../../bundle-parent</relativePath>
+    </parent>
+
+    <artifactId>eos-dom-akka</artifactId>
+    <packaging>bundle</packaging>
+
+    <properties>
+        <odlparent.dependency.enforce>true</odlparent.dependency.enforce>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>repackaged-akka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-clustering-commons</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-eos-common-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-eos-dom-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>concepts</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-data-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.component.annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.guicedee.services</groupId>
+            <artifactId>javax.inject</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>javax.annotation</groupId>
+            <artifactId>javax.annotation-api</artifactId>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor-testkit-typed_2.13</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java
new file mode 100644 (file)
index 0000000..27fbad4
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Scheduler;
+import akka.actor.typed.javadsl.Adapter;
+import akka.actor.typed.javadsl.AskPattern;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.cluster.typed.Cluster;
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
+import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
+import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
+import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DOMEntityOwnershipService implementation backed by native Akka clustering constructs. We use distributed-data
+ * to track all registered candidates and cluster-singleton to maintain a single cluster-wide authority which selects
+ * the appropriate owners.
+ */
+@Singleton
+@Component(immediate = true, service = DOMEntityOwnershipService.class)
+public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
+    private static final String DATACENTER_PREFIX = "dc";
+
+    private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
+    private final String localCandidate;
+    private final Scheduler scheduler;
+
+    private final ActorRef<BootstrapCommand> bootstrap;
+    private final RunningContext runningContext;
+    private final ActorRef<CandidateRegistryCommand> candidateRegistry;
+    private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
+    private final ActorRef<StateCheckerCommand> ownerStateChecker;
+
+    @VisibleForTesting
+    AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException {
+        final var typedActorSystem = Adapter.toTyped(actorSystem);
+
+        scheduler = typedActorSystem.scheduler();
+        localCandidate = Cluster.get(typedActorSystem).selfMember().getRoles().stream()
+            .filter(role -> !role.contains(DATACENTER_PREFIX))
+            .findFirst()
+            .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+
+        bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(context -> EOSMain.create()), "EOSBootstrap");
+
+        final CompletionStage<RunningContext> ask = AskPattern.ask(bootstrap,
+                GetRunningContext::new, Duration.ofSeconds(5), scheduler);
+        runningContext = ask.toCompletableFuture().get();
+
+        candidateRegistry = runningContext.getCandidateRegistry();
+        listenerRegistry = runningContext.getListenerRegistry();
+        ownerStateChecker = runningContext.getOwnerStateChecker();
+    }
+
+    @Inject
+    @Activate
+    public AkkaEntityOwnershipService(@Reference final ActorSystemProvider provider)
+            throws ExecutionException, InterruptedException {
+        this(provider.getActorSystem());
+    }
+
+    @PreDestroy
+    @Deactivate
+    @Override
+    public void close() throws InterruptedException, ExecutionException {
+        AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get();
+    }
+
+    @Override
+    public DOMEntityOwnershipCandidateRegistration registerCandidate(final DOMEntity entity)
+            throws CandidateAlreadyRegisteredException {
+        if (!registeredEntities.add(entity)) {
+            throw new CandidateAlreadyRegisteredException(entity);
+        }
+
+        final RegisterCandidate msg = new RegisterCandidate(entity, localCandidate);
+        LOG.debug("Registering candidate with message: {}", msg);
+        candidateRegistry.tell(msg);
+
+        return new CandidateRegistration(entity, this);
+    }
+
+    @Override
+    public DOMEntityOwnershipListenerRegistration registerListener(final String entityType,
+                                                                   final DOMEntityOwnershipListener listener) {
+        LOG.debug("Registering listener {} for type {}", listener, entityType);
+        listenerRegistry.tell(new RegisterListener(entityType, listener));
+
+        return new ListenerRegistration(listener, entityType, this);
+    }
+
+    @Override
+    public Optional<EntityOwnershipState> getOwnershipState(final DOMEntity entity) {
+        LOG.debug("Retrieving ownership state for {}", entity);
+
+        final CompletionStage<GetOwnershipStateReply> result = AskPattern.ask(ownerStateChecker,
+            replyTo -> new GetOwnershipState(entity, replyTo),
+            Duration.ofSeconds(5), scheduler);
+
+        final GetOwnershipStateReply reply;
+        try {
+            reply = result.toCompletableFuture().get();
+        } catch (final InterruptedException | ExecutionException exception) {
+            LOG.warn("Failed to retrieve ownership state for {}", entity, exception);
+            return Optional.empty();
+        }
+
+        return Optional.ofNullable(reply.getOwnershipState());
+    }
+
+    @Override
+    public boolean isCandidateRegistered(final DOMEntity forEntity) {
+        return registeredEntities.contains(forEntity);
+    }
+
+    void unregisterCandidate(final DOMEntity entity) {
+        LOG.debug("Unregistering candidate for {}", entity);
+
+        if (registeredEntities.remove(entity)) {
+            candidateRegistry.tell(new UnregisterCandidate(entity, localCandidate));
+        }
+    }
+
+    void unregisterListener(final String entityType, final DOMEntityOwnershipListener listener) {
+        LOG.debug("Unregistering listener {} for type {}", listener, entityType);
+
+        listenerRegistry.tell(new UnregisterListener(entityType, listener));
+    }
+
+    @VisibleForTesting
+    RunningContext getRunningContext() {
+        return runningContext;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/CandidateRegistration.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/CandidateRegistration.java
new file mode 100644 (file)
index 0000000..56a2f09
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static java.util.Objects.requireNonNull;
+
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+
+final class CandidateRegistration extends AbstractObjectRegistration<DOMEntity>
+        implements DOMEntityOwnershipCandidateRegistration {
+    private final AkkaEntityOwnershipService service;
+
+    CandidateRegistration(final DOMEntity instance, final AkkaEntityOwnershipService service) {
+        super(instance);
+        this.service = requireNonNull(service);
+    }
+
+    @Override
+    protected void removeRegistration() {
+        service.unregisterCandidate(getInstance());
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/ListenerRegistration.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/ListenerRegistration.java
new file mode 100644 (file)
index 0000000..435babe
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+
+final class ListenerRegistration extends AbstractObjectRegistration<DOMEntityOwnershipListener>
+        implements DOMEntityOwnershipListenerRegistration {
+    private final AkkaEntityOwnershipService service;
+    private final @NonNull String entityType;
+
+    ListenerRegistration(final DOMEntityOwnershipListener listener, final String entityType,
+            final AkkaEntityOwnershipService service) {
+        super(listener);
+        this.entityType = requireNonNull(entityType);
+        this.service = requireNonNull(service);
+    }
+
+    @Override
+    public  String getEntityType() {
+        return entityType;
+    }
+
+    @Override
+    protected void removeRegistration() {
+        service.unregisterListener(entityType, getInstance());
+    }
+
+    @Override
+    protected MoreObjects.ToStringHelper addToStringAttributes(final MoreObjects.ToStringHelper toStringHelper) {
+        return toStringHelper.add("entityType", entityType);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java
new file mode 100644 (file)
index 0000000..54f9a6b
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.bootstrap;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.typed.Cluster;
+import akka.cluster.typed.ClusterSingleton;
+import akka.cluster.typed.SingletonActor;
+import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
+import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
+import org.opendaylight.controller.eos.akka.owner.checker.OwnerStateChecker;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.OwnerSyncer;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.EntityTypeListenerRegistry;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.yangtools.yang.common.Empty;
+
+public final class EOSMain extends AbstractBehavior<BootstrapCommand> {
+    private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
+    private final ActorRef<CandidateRegistryCommand> candidateRegistry;
+    private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+    private final ActorRef<StateCheckerCommand> ownerStateChecker;
+
+    private EOSMain(final ActorContext<BootstrapCommand> context) {
+        super(context);
+
+        final String role = Cluster.get(context.getSystem()).selfMember().getRoles().iterator().next();
+
+        listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry");
+        candidateRegistry = context.spawn(CandidateRegistry.create(), "CandidateRegistry");
+        ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker");
+
+        final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
+        // start the initial sync behavior that switches to the regular one after syncing
+        ownerSupervisor = clusterSingleton.init(SingletonActor.of(OwnerSyncer.create(), "OwnerSupervisor"));
+    }
+
+    public static Behavior<BootstrapCommand> create() {
+        return Behaviors.setup(EOSMain::new);
+    }
+
+    @Override
+    public Receive<BootstrapCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(GetRunningContext.class, this::onGetRunningContext)
+                .onMessage(Terminate.class, this::onTerminate)
+                .build();
+    }
+
+    private Behavior<BootstrapCommand> onGetRunningContext(final GetRunningContext request) {
+        request.getReplyTo().tell(
+                new RunningContext(listenerRegistry, candidateRegistry, ownerStateChecker, ownerSupervisor));
+        return this;
+    }
+
+    private Behavior<BootstrapCommand> onTerminate(final Terminate request) {
+        request.getReplyTo().tell(Empty.getInstance());
+        return Behaviors.stopped();
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/BootstrapCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/BootstrapCommand.java
new file mode 100644 (file)
index 0000000..122a53f
--- /dev/null
@@ -0,0 +1,14 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.bootstrap.command;
+
+public abstract class BootstrapCommand {
+    BootstrapCommand() {
+        // Hidden on purpose
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/GetRunningContext.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/GetRunningContext.java
new file mode 100644 (file)
index 0000000..6804dcb
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.bootstrap.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+
+public final class GetRunningContext extends BootstrapCommand {
+    private final ActorRef<RunningContext> replyTo;
+
+    public GetRunningContext(final ActorRef<RunningContext> replyTo) {
+        this.replyTo = requireNonNull(replyTo);
+    }
+
+    public ActorRef<RunningContext> getReplyTo() {
+        return replyTo;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/RunningContext.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/RunningContext.java
new file mode 100644 (file)
index 0000000..6bbffca
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.bootstrap.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+
+public final class RunningContext extends BootstrapCommand {
+    private final @NonNull ActorRef<TypeListenerRegistryCommand> listenerRegistry;
+    private final @NonNull ActorRef<CandidateRegistryCommand> candidateRegistry;
+    private final @NonNull ActorRef<StateCheckerCommand> ownerStateChecker;
+    private final @NonNull ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+
+    public RunningContext(final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
+                          final ActorRef<CandidateRegistryCommand> candidateRegistry,
+                          final ActorRef<StateCheckerCommand> ownerStateChecker,
+                          final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
+        this.listenerRegistry = requireNonNull(listenerRegistry);
+        this.candidateRegistry = requireNonNull(candidateRegistry);
+        this.ownerStateChecker = requireNonNull(ownerStateChecker);
+        this.ownerSupervisor = requireNonNull(ownerSupervisor);
+    }
+
+    public @NonNull ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
+        return listenerRegistry;
+    }
+
+    public @NonNull ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
+        return candidateRegistry;
+    }
+
+    public @NonNull ActorRef<StateCheckerCommand> getOwnerStateChecker() {
+        return ownerStateChecker;
+    }
+
+    public @NonNull ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
+        return ownerSupervisor;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/Terminate.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/command/Terminate.java
new file mode 100644 (file)
index 0000000..116b5e4
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.bootstrap.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yangtools.yang.common.Empty;
+
+public final class Terminate extends BootstrapCommand {
+    private final @NonNull ActorRef<Empty> replyTo;
+
+    public Terminate(final ActorRef<Empty> replyTo) {
+        this.replyTo = requireNonNull(replyTo);
+    }
+
+    public @NonNull ActorRef<Empty> getReplyTo() {
+        return replyTo;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java
new file mode 100644 (file)
index 0000000..57c40ac
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator.Get;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetFailure;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetSuccess;
+import akka.cluster.ddata.typed.javadsl.Replicator.NotFound;
+import akka.cluster.ddata.typed.javadsl.Replicator.ReadMajority;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.InternalGetReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class OwnerStateChecker extends AbstractBehavior<StateCheckerCommand> {
+    private static final Logger LOG = LoggerFactory.getLogger(OwnerStateChecker.class);
+    private static final Duration GET_OWNERSHIP_TIMEOUT = Duration.ofSeconds(5);
+    private static final Duration UNEXPECTED_ASK_TIMEOUT = Duration.ofSeconds(5);
+
+    private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> replicatorAdapter;
+    private final String localMember;
+
+    private OwnerStateChecker(final ActorContext<StateCheckerCommand> context, final String localMember) {
+        super(context);
+        this.localMember = requireNonNull(localMember);
+        replicatorAdapter = new ReplicatorMessageAdapter<>(context,
+            DistributedData.get(context.getSystem()).replicator(), UNEXPECTED_ASK_TIMEOUT);
+    }
+
+    public static Behavior<StateCheckerCommand> create(final String localMember) {
+        return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember));
+    }
+
+    @Override
+    public Receive<StateCheckerCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(GetOwnershipState.class, this::onGetOwnershipState)
+                .onMessage(InternalGetReply.class, this::respondWithState)
+                .build();
+    }
+
+    private Behavior<StateCheckerCommand> onGetOwnershipState(final GetOwnershipState message) {
+        replicatorAdapter.askGet(
+                askReplyTo -> new Get<>(
+                        new LWWRegisterKey<>(message.getEntity().toString()),
+                        new ReadMajority(GET_OWNERSHIP_TIMEOUT),
+                        askReplyTo),
+                reply -> new InternalGetReply(reply, message.getEntity(), message.getReplyTo()));
+        return this;
+    }
+
+    private Behavior<StateCheckerCommand> respondWithState(final InternalGetReply reply) {
+        final GetResponse<LWWRegister<String>> response = reply.getResponse();
+        if (response instanceof NotFound) {
+            LOG.debug("Data for owner not found, most likely no owner has beed picked for entity: {}",
+                    reply.getEntity());
+            reply.getReplyTo().tell(new GetOwnershipStateReply(null));
+        } else if (response instanceof GetFailure) {
+            LOG.warn("Failure retrieving data for entity: {}", reply.getEntity());
+            reply.getReplyTo().tell(new GetOwnershipStateReply(null));
+        } else if (response instanceof GetSuccess) {
+            final String owner = ((GetSuccess<LWWRegister<String>>) response).get(response.key()).getValue();
+            LOG.debug("Data for owner received. {}, owner: {}", response, owner);
+
+            final boolean isOwner = localMember.equals(owner);
+            final boolean hasOwner = !owner.isEmpty();
+
+            reply.getReplyTo().tell(new GetOwnershipStateReply(EntityOwnershipState.from(isOwner, hasOwner)));
+        }
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipState.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipState.java
new file mode 100644 (file)
index 0000000..617b65e
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class GetOwnershipState extends StateCheckerCommand {
+    private final @NonNull DOMEntity entity;
+    private final @NonNull ActorRef<GetOwnershipStateReply> replyTo;
+
+    public GetOwnershipState(final DOMEntity entity, final ActorRef<GetOwnershipStateReply> replyTo) {
+        this.entity = requireNonNull(entity);
+        this.replyTo = requireNonNull(replyTo);
+    }
+
+    public @NonNull DOMEntity getEntity() {
+        return entity;
+    }
+
+    public @NonNull ActorRef<GetOwnershipStateReply> getReplyTo() {
+        return replyTo;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipStateReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnershipStateReply.java
new file mode 100644 (file)
index 0000000..58fb5a0
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
+
+public final class GetOwnershipStateReply extends StateCheckerReply {
+    private final @Nullable EntityOwnershipState ownershipState;
+
+    public GetOwnershipStateReply(final EntityOwnershipState ownershipState) {
+        this.ownershipState = ownershipState;
+    }
+
+    public @Nullable EntityOwnershipState getOwnershipState() {
+        return ownershipState;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/InternalGetReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/InternalGetReply.java
new file mode 100644 (file)
index 0000000..1a8f305
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class InternalGetReply extends StateCheckerCommand {
+    private final @NonNull GetResponse<LWWRegister<String>> response;
+    private final @NonNull ActorRef<GetOwnershipStateReply> replyTo;
+    private final @NonNull DOMEntity entity;
+
+    public InternalGetReply(final GetResponse<LWWRegister<String>> response, final DOMEntity entity,
+                            final ActorRef<GetOwnershipStateReply> replyTo) {
+        this.response = requireNonNull(response);
+        this.entity = requireNonNull(entity);
+        this.replyTo = requireNonNull(replyTo);
+    }
+
+    public @NonNull GetResponse<LWWRegister<String>> getResponse() {
+        return response;
+    }
+
+    public @NonNull DOMEntity getEntity() {
+        return entity;
+    }
+
+    public @NonNull ActorRef<GetOwnershipStateReply> getReplyTo() {
+        return replyTo;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerCommand.java
new file mode 100644 (file)
index 0000000..e6b5412
--- /dev/null
@@ -0,0 +1,14 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+public abstract class StateCheckerCommand {
+    StateCheckerCommand() {
+        // Hidden on purpose
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerReply.java
new file mode 100644 (file)
index 0000000..39347cc
--- /dev/null
@@ -0,0 +1,14 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+public abstract class StateCheckerReply {
+    StateCheckerReply() {
+        // Hidden on purpose
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java
new file mode 100644 (file)
index 0000000..41c83d4
--- /dev/null
@@ -0,0 +1,369 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.SelfUniqueAddress;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import akka.cluster.typed.Cluster;
+import akka.cluster.typed.Subscribe;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * Responsible for tracking candidates and assigning ownership of entities. This behavior is subscribed to the candidate
+ * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
+ * On cluster up/down etc. events the owners are reassigned if possible.
+ */
+public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
+    private static final String DATACENTER_PREFIX = "dc";
+
+    private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
+
+    // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
+    // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
+    // running in a cluster-singleton
+    private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
+
+    private final Cluster cluster;
+    private final SelfUniqueAddress node;
+
+    private final Set<String> activeMembers;
+
+    // currently registered candidates
+    private final Map<DOMEntity, Set<String>> currentCandidates;
+    // current owners
+    private final Map<DOMEntity, String> currentOwners;
+    // reverse lookup of owner to entity
+    private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
+
+    private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
+                            final Map<DOMEntity, Set<String>> currentCandidates,
+                            final Map<DOMEntity, String> currentOwners) {
+        super(context);
+
+        final DistributedData distributedData = DistributedData.get(context.getSystem());
+        final ActorRef<Replicator.Command> replicator = distributedData.replicator();
+
+        this.cluster = Cluster.get(context.getSystem());
+        this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+
+        this.node = distributedData.selfUniqueAddress();
+        this.activeMembers = getActiveMembers(cluster);
+
+        this.currentCandidates = currentCandidates;
+        this.currentOwners = currentOwners;
+
+        for (final Map.Entry<DOMEntity, String> entry : currentOwners.entrySet()) {
+            ownerToEntity.put(entry.getValue(), entry.getKey());
+        }
+
+        // check whether we have any unreachable/missing owners
+        reassignUnreachableOwners();
+        assignMissingOwners();
+
+        final ActorRef<ClusterEvent.MemberEvent> memberEventAdapter =
+                context.messageAdapter(ClusterEvent.MemberEvent.class, event -> {
+                    if (event instanceof ClusterEvent.MemberUp) {
+                        return new MemberUpEvent(event.member().address(), event.member().getRoles());
+                    } else {
+                        return new MemberDownEvent(event.member().address(), event.member().getRoles());
+                    }
+                });
+        cluster.subscriptions().tell(Subscribe.create(memberEventAdapter, ClusterEvent.MemberEvent.class));
+
+        final ActorRef<ClusterEvent.ReachabilityEvent> reachabilityEventAdapter =
+                context.messageAdapter(ClusterEvent.ReachabilityEvent.class, event -> {
+                    if (event instanceof ClusterEvent.ReachableMember) {
+                        return new MemberReachableEvent(event.member().address(), event.member().getRoles());
+                    } else {
+                        return new MemberUnreachableEvent(event.member().address(), event.member().getRoles());
+                    }
+                });
+        cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
+
+        new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
+            Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
+
+        LOG.debug("Owner Supervisor started");
+    }
+
+    public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
+                                                          final Map<DOMEntity, String> currentOwners) {
+        return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners));
+    }
+
+    @Override
+    public Receive<OwnerSupervisorCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
+                .onMessage(MemberUpEvent.class, this::onPeerUp)
+                .onMessage(MemberDownEvent.class, this::onPeerDown)
+                .onMessage(MemberReachableEvent.class, this::onPeerReachable)
+                .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
+                .build();
+    }
+
+    private void reassignUnreachableOwners() {
+        final Set<String> ownersToReassign = new HashSet<>();
+        for (final String owner : ownerToEntity.keys()) {
+            if (!activeMembers.contains(owner)) {
+                ownersToReassign.add(owner);
+            }
+        }
+
+        for (final String owner : ownersToReassign) {
+            reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)));
+        }
+    }
+
+    private void assignMissingOwners() {
+        for (final Map.Entry<DOMEntity, Set<String>> entry : currentCandidates.entrySet()) {
+            if (!currentOwners.containsKey(entry.getKey())) {
+                assignOwnerFor(entry.getKey());
+            }
+        }
+    }
+
+    private Behavior<OwnerSupervisorCommand> onCandidatesChanged(final CandidatesChanged message) {
+        LOG.debug("onCandidatesChanged {}", message.getResponse());
+        if (message.getResponse() instanceof Replicator.Changed) {
+            final Replicator.Changed<ORMap<DOMEntity, ORSet<String>>> changed =
+                    (Replicator.Changed<ORMap<DOMEntity, ORSet<String>>>) message.getResponse();
+            processCandidateChanges(changed.get(CandidateRegistry.KEY));
+        }
+        return this;
+    }
+
+    private void processCandidateChanges(final ORMap<DOMEntity, ORSet<String>> candidates) {
+        final Map<DOMEntity, ORSet<String>> entries = candidates.getEntries();
+        for (final Map.Entry<DOMEntity, ORSet<String>> entry : entries.entrySet()) {
+            processCandidatesFor(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void processCandidatesFor(final DOMEntity entity, final ORSet<String> receivedCandidates) {
+        LOG.debug("Processing candidates for : {}, new value: {}", entity, receivedCandidates.elements());
+
+        final Set<String> candidates = JavaConverters.asJava(receivedCandidates.elements());
+        // only insert candidates if there are any to insert, otherwise we would generate unnecessary notification with
+        // no owner
+        if (!currentCandidates.containsKey(entity) && !candidates.isEmpty()) {
+            LOG.debug("Candidates missing for entity: {} adding all candidates", entity);
+            currentCandidates.put(entity, new HashSet<>(candidates));
+
+            LOG.debug("Current state for {} : {}", entity, currentCandidates.get(entity).toString());
+            assignOwnerFor(entity);
+
+            return;
+        }
+
+        final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
+        final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
+
+        LOG.debug("currently present candidates: {}", currentlyPresent);
+        LOG.debug("difference: {}", difference);
+
+        final List<String> ownersToReassign = new ArrayList<>();
+
+        // first add/remove candidates from entities
+        for (final String toCheck : difference) {
+            if (!currentlyPresent.contains(toCheck)) {
+                // add new candidate
+                LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
+                currentCandidates.get(entity).add(toCheck);
+
+                if (!currentOwners.containsKey(entity)) {
+                    // might as well assign right away when we don't have an owner
+                    assignOwnerFor(entity);
+                }
+
+                LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
+                continue;
+            }
+
+            if (!candidates.contains(toCheck)) {
+                // remove candidate
+                LOG.debug("Removing candidate from entity: {} - {}", entity, toCheck);
+                currentCandidates.get(entity).remove(toCheck);
+                if (ownerToEntity.containsKey(toCheck)) {
+                    ownersToReassign.add(toCheck);
+                }
+            }
+        }
+
+        // then reassign those that need new owners
+        for (final String toReassign : ownersToReassign) {
+            reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)));
+        }
+
+        if (currentCandidates.get(entity) == null) {
+            LOG.debug("Last candidate removed for {}", entity);
+        } else {
+            LOG.debug("Current state for entity: {} : {}", entity, currentCandidates.get(entity).toString());
+        }
+    }
+
+    private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities) {
+        LOG.debug("Reassigning owners for {}", entities);
+        for (final DOMEntity entity : entities) {
+
+            // only reassign owner for those entities that lost this candidate or is not reachable
+            if (!activeMembers.contains(oldOwner)
+                    || !currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(oldOwner)) {
+                ownerToEntity.remove(oldOwner, entity);
+                assignOwnerFor(entity);
+            }
+        }
+    }
+
+    private void assignOwnerFor(final DOMEntity entity) {
+        final Set<String> candidatesForEntity = currentCandidates.get(entity);
+        if (candidatesForEntity.isEmpty()) {
+            LOG.debug("No candidates present for entity: {}", entity);
+            removeOwner(entity);
+            return;
+        }
+
+        String pickedCandidate = null;
+        for (final String candidate : candidatesForEntity) {
+            if (activeMembers.contains(candidate)) {
+                pickedCandidate = candidate;
+                break;
+            }
+        }
+        if (pickedCandidate == null) {
+            LOG.debug("No candidate is reachable for {}, activeMembers: {}, currentCandidates: {}",
+                    entity, activeMembers, currentCandidates.get(entity));
+            // no candidate is reachable so only remove owner if necessary
+            removeOwner(entity);
+            return;
+        }
+        ownerToEntity.put(pickedCandidate, entity);
+
+        LOG.debug("Entity {} new owner: {}", entity, pickedCandidate);
+        currentOwners.put(entity, pickedCandidate);
+        writeNewOwner(entity, pickedCandidate);
+    }
+
+    private void removeOwner(final DOMEntity entity) {
+        if (currentOwners.containsKey(entity)) {
+            // assign empty owner to dd, as we cannot delete data for a key since that would prevent
+            // writes for the same key
+            currentOwners.remove(entity);
+
+            writeNewOwner(entity, "");
+        }
+    }
+
+    private void writeNewOwner(final DOMEntity entity, final String candidate) {
+        ownerReplicator.askUpdate(
+                askReplyTo -> new Replicator.Update<>(
+                        new LWWRegisterKey<>(entity.toString()),
+                        new LWWRegister<>(node.uniqueAddress(), candidate, 0),
+                        Replicator.writeLocal(),
+                        askReplyTo,
+                        register -> register.withValue(node, candidate, clock)),
+                OwnerChanged::new);
+    }
+
+    private Behavior<OwnerSupervisorCommand> onPeerUp(final MemberUpEvent event) {
+        LOG.debug("Received MemberUp : {}", event);
+
+        handleReachableEvent(event.getRoles());
+        return this;
+    }
+
+    private Behavior<OwnerSupervisorCommand> onPeerReachable(final MemberReachableEvent event) {
+        LOG.debug("Received MemberReachable : {}", event);
+
+        handleReachableEvent(event.getRoles());
+        return this;
+    }
+
+    private void handleReachableEvent(final Set<String> roles) {
+        activeMembers.add(extractRole(roles));
+        assignMissingOwners();
+    }
+
+    private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
+        LOG.debug("Received MemberDown : {}", event);
+
+        handleUnreachableEvent(event.getRoles());
+        return this;
+    }
+
+    private Behavior<OwnerSupervisorCommand> onPeerUnreachable(final MemberUnreachableEvent event) {
+        LOG.debug("Received MemberUnreachable : {}", event);
+
+        handleUnreachableEvent(event.getRoles());
+        return this;
+    }
+
+    private void handleUnreachableEvent(final Set<String> roles) {
+        activeMembers.remove(extractRole(roles));
+        reassignUnreachableOwners();
+    }
+
+    private static Set<String> getActiveMembers(final Cluster cluster) {
+        final Set<String> activeMembers = new HashSet<>();
+        cluster.state().getMembers().forEach(member -> activeMembers.add(extractRole(member)));
+        activeMembers.removeAll(cluster.state().getUnreachable().stream()
+                .map(OwnerSupervisor::extractRole).collect(Collectors.toSet()));
+
+        return activeMembers;
+    }
+
+    private static String extractRole(final Member member) {
+        return extractRole(member.getRoles());
+    }
+
+    private static String extractRole(final Set<String> roles) {
+        return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
+                .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java
new file mode 100644 (file)
index 0000000..1a8df09
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Behavior that retrieves current candidates/owners from distributed-data and switches to OwnerSupervisor when the
+ * sync has finished.
+ */
+public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand> {
+    private static final Logger LOG = LoggerFactory.getLogger(OwnerSyncer.class);
+
+    private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
+    private final Map<DOMEntity, Set<String>> currentCandidates = new HashMap<>();
+    private final Map<DOMEntity, String> currentOwners = new HashMap<>();
+
+    // String representation of Entity to DOMEntity
+    private final Map<String, DOMEntity> entityLookup = new HashMap<>();
+
+    private int toSync = -1;
+
+    private OwnerSyncer(final ActorContext<OwnerSupervisorCommand> context) {
+        super(context);
+        LOG.debug("Starting candidate and owner sync");
+
+        final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
+
+        this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+
+        new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
+            Duration.ofSeconds(5)).askGet(
+                askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
+                InitialCandidateSync::new);
+    }
+
+    public static Behavior<OwnerSupervisorCommand> create() {
+        return Behaviors.setup(OwnerSyncer::new);
+    }
+
+    @Override
+    public Receive<OwnerSupervisorCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(InitialCandidateSync.class, this::onInitialCandidateSync)
+                .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
+                .build();
+    }
+
+    private Behavior<OwnerSupervisorCommand> onInitialCandidateSync(final InitialCandidateSync rsp) {
+        final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = rsp.getResponse();
+        if (response instanceof Replicator.GetSuccess) {
+            return doInitialSync((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response);
+        } else if (response instanceof Replicator.NotFound) {
+            LOG.debug("No candidates found switching to supervisor");
+            return switchToSupervisor();
+        } else {
+            LOG.debug("Initial candidate sync failed, switching to supervisor. Sync reply: {}", response);
+            return switchToSupervisor();
+        }
+    }
+
+    private Behavior<OwnerSupervisorCommand> doInitialSync(
+            final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response) {
+
+        final ORMap<DOMEntity, ORSet<String>> candidates = response.get(CandidateRegistry.KEY);
+        candidates.getEntries().entrySet().forEach(entry -> {
+            currentCandidates.put(entry.getKey(), new HashSet<>(entry.getValue().getElements()));
+        });
+
+        toSync = candidates.keys().size();
+        for (final DOMEntity entity : candidates.keys().getElements()) {
+            entityLookup.put(entity.toString(), entity);
+
+            ownerReplicator.askGet(
+                    askReplyTo -> new Replicator.Get<>(
+                            new LWWRegisterKey<>(entity.toString()),
+                            Replicator.readLocal(),
+                            askReplyTo),
+                    InitialOwnerSync::new);
+        }
+
+        return this;
+    }
+
+    private Behavior<OwnerSupervisorCommand> onInitialOwnerSync(final InitialOwnerSync rsp) {
+        final Replicator.GetResponse<LWWRegister<String>> response = rsp.getResponse();
+        if (response instanceof Replicator.GetSuccess) {
+            handleOwnerRsp((Replicator.GetSuccess<LWWRegister<String>>) response);
+        } else if (response instanceof Replicator.NotFound) {
+            handleNotFoundOwnerRsp((Replicator.NotFound<LWWRegister<String>>) response);
+        } else {
+            LOG.debug("Initial sync failed response: {}", response);
+        }
+
+        // count the responses, on last switch behaviors
+        toSync--;
+        if (toSync == 0) {
+            return switchToSupervisor();
+        }
+
+        return this;
+    }
+
+    private Behavior<OwnerSupervisorCommand> switchToSupervisor() {
+        LOG.debug("Initial sync done, switching to supervisor. candidates: {}, owners: {}",
+                currentCandidates, currentOwners);
+        return Behaviors.setup(ctx ->
+                OwnerSupervisor.create(currentCandidates, currentOwners));
+    }
+
+    private void handleOwnerRsp(final Replicator.GetSuccess<LWWRegister<String>> rsp) {
+        final DOMEntity entity = entityLookup.get(rsp.key().id());
+        final String owner = rsp.get(rsp.key()).getValue();
+
+        currentOwners.put(entity, owner);
+    }
+
+    private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
+        LOG.debug("Owner not found. {}", rsp);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/CandidatesChanged.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/CandidatesChanged.java
new file mode 100644 (file)
index 0000000..6334d97
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class CandidatesChanged extends OwnerSupervisorCommand {
+    private final @NonNull SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response;
+
+    public CandidatesChanged(final SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> subscribeResponse) {
+        this.response = requireNonNull(subscribeResponse);
+    }
+
+    public @NonNull SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+        return response;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("response", response).toString();
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialCandidateSync.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialCandidateSync.java
new file mode 100644 (file)
index 0000000..53bf10a
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class InitialCandidateSync extends OwnerSupervisorCommand {
+    private final @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> response;
+
+    public InitialCandidateSync(final GetResponse<ORMap<DOMEntity, ORSet<String>>> response) {
+        this.response = response;
+    }
+
+    public @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+        return response;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialOwnerSync.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InitialOwnerSync.java
new file mode 100644 (file)
index 0000000..e734c66
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+
+public final class InitialOwnerSync extends OwnerSupervisorCommand {
+    private final @NonNull GetResponse<LWWRegister<String>> response;
+
+    public InitialOwnerSync(final GetResponse<LWWRegister<String>> response) {
+        this.response = requireNonNull(response);
+    }
+
+    public @NonNull GetResponse<LWWRegister<String>> getResponse() {
+        return response;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InternalClusterEvent.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/InternalClusterEvent.java
new file mode 100644 (file)
index 0000000..0825c32
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.Address;
+import com.google.common.base.MoreObjects;
+import java.util.Set;
+import org.eclipse.jdt.annotation.NonNull;
+
+public abstract class InternalClusterEvent extends OwnerSupervisorCommand {
+    private final @NonNull Set<String> roles;
+    private final @NonNull Address address;
+
+    InternalClusterEvent(final Address address, final Set<String> roles) {
+        this.address = requireNonNull(address);
+        this.roles = Set.copyOf(roles);
+    }
+
+    public final @NonNull Address getAddress() {
+        return address;
+    }
+
+    public final @NonNull Set<String> getRoles() {
+        return roles;
+    }
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("address", address).add("roles", roles).toString();
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberDownEvent.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberDownEvent.java
new file mode 100644 (file)
index 0000000..9ff7789
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.Address;
+import java.util.Set;
+
+public final class MemberDownEvent extends InternalClusterEvent {
+    public MemberDownEvent(final Address address, final Set<String> roles) {
+        super(address, roles);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberReachableEvent.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberReachableEvent.java
new file mode 100644 (file)
index 0000000..dc6d798
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.Address;
+import java.util.Set;
+
+public final class MemberReachableEvent extends InternalClusterEvent {
+    public MemberReachableEvent(final Address address, final Set<String> roles) {
+        super(address, roles);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUnreachableEvent.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUnreachableEvent.java
new file mode 100644 (file)
index 0000000..24999fb
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.Address;
+import java.util.Set;
+
+public final class MemberUnreachableEvent extends InternalClusterEvent {
+    public MemberUnreachableEvent(final Address address, final Set<String> roles) {
+        super(address, roles);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUpEvent.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/MemberUpEvent.java
new file mode 100644 (file)
index 0000000..18eb765
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.Address;
+import java.util.Set;
+
+public final class MemberUpEvent extends InternalClusterEvent {
+    public MemberUpEvent(final Address address, final Set<String> roles) {
+        super(address, roles);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerChanged.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerChanged.java
new file mode 100644 (file)
index 0000000..b7ce5b2
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.UpdateResponse;
+import org.eclipse.jdt.annotation.NonNull;
+
+public final class OwnerChanged extends OwnerSupervisorCommand {
+    private final @NonNull UpdateResponse<LWWRegister<String>> rsp;
+
+    public OwnerChanged(final UpdateResponse<LWWRegister<String>> rsp) {
+        this.rsp = requireNonNull(rsp);
+    }
+
+    public @NonNull UpdateResponse<LWWRegister<String>> getResponse() {
+        return rsp;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorCommand.java
new file mode 100644 (file)
index 0000000..ba6ca1d
--- /dev/null
@@ -0,0 +1,14 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+public abstract class OwnerSupervisorCommand {
+    OwnerSupervisorCommand() {
+        // Hidden on purpose
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistry.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistry.java
new file mode 100644 (file)
index 0000000..16c2ab6
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.Key;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORMapKey;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.SelfUniqueAddress;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Actor responsible for handling registrations of candidates into distributed-data.
+ */
+public final class CandidateRegistry extends AbstractBehavior<CandidateRegistryCommand> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistry.class);
+
+    public static final Key<ORMap<DOMEntity, ORSet<String>>> KEY = new ORMapKey<>("candidateRegistry");
+
+    private final ReplicatorMessageAdapter<CandidateRegistryCommand, ORMap<DOMEntity, ORSet<String>>> replicatorAdapter;
+    private final SelfUniqueAddress node;
+
+    private CandidateRegistry(final ActorContext<CandidateRegistryCommand> context,
+                              final ReplicatorMessageAdapter<CandidateRegistryCommand,
+                                      ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) {
+        super(context);
+        this.replicatorAdapter = replicatorAdapter;
+
+        this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
+
+        LOG.debug("Candidate registry started");
+    }
+
+    public static Behavior<CandidateRegistryCommand> create() {
+        return Behaviors.setup(ctx ->
+                DistributedData.withReplicatorMessageAdapter(
+                        (ReplicatorMessageAdapter<CandidateRegistryCommand,
+                                ORMap<DOMEntity,ORSet<String>>> replicatorAdapter) ->
+                                        new CandidateRegistry(ctx, replicatorAdapter)));
+    }
+
+    @Override
+    public Receive<CandidateRegistryCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(RegisterCandidate.class, this::onRegisterCandidate)
+                .onMessage(UnregisterCandidate.class, this::onUnregisterCandidate)
+                .onMessage(InternalUpdateResponse.class, this::onInternalUpdateResponse)
+                .build();
+    }
+
+    private Behavior<CandidateRegistryCommand> onRegisterCandidate(final RegisterCandidate registerCandidate) {
+        LOG.debug("Registering candidate({}) for entity: {}",
+                registerCandidate.getCandidate(), registerCandidate.getEntity());
+        replicatorAdapter.askUpdate(
+                askReplyTo -> new Replicator.Update<>(
+                        KEY,
+                        ORMap.empty(),
+                        Replicator.writeLocal(),
+                        askReplyTo,
+                        map -> map.update(node, registerCandidate.getEntity(), ORSet.empty(),
+                                value -> value.add(node, registerCandidate.getCandidate()))),
+                InternalUpdateResponse::new);
+        return this;
+    }
+
+    private Behavior<CandidateRegistryCommand> onUnregisterCandidate(final UnregisterCandidate unregisterCandidate) {
+        LOG.debug("Removing candidate({}) from entity: {}",
+                unregisterCandidate.getCandidate(), unregisterCandidate.getEntity());
+        replicatorAdapter.askUpdate(
+                askReplyTo -> new Replicator.Update<>(
+                        KEY,
+                        ORMap.empty(),
+                        Replicator.writeLocal(),
+                        askReplyTo,
+                        map -> map.update(node, unregisterCandidate.getEntity(), ORSet.empty(),
+                                value -> value.remove(node, unregisterCandidate.getCandidate()))),
+                InternalUpdateResponse::new);
+        return this;
+    }
+
+    private Behavior<CandidateRegistryCommand> onInternalUpdateResponse(final InternalUpdateResponse updateResponse) {
+        LOG.debug("Received update response: {}", updateResponse.getRsp());
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/AbstractCandidateCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/AbstractCandidateCommand.java
new file mode 100644 (file)
index 0000000..5949f4d
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public abstract class AbstractCandidateCommand extends CandidateRegistryCommand {
+    private final @NonNull DOMEntity entity;
+    private final @NonNull String candidate;
+
+    AbstractCandidateCommand(final DOMEntity entity, final String candidate) {
+        this.entity = requireNonNull(entity);
+        this.candidate = requireNonNull(candidate);
+    }
+
+    public final @NonNull DOMEntity getEntity() {
+        return entity;
+    }
+
+    public final @NonNull String getCandidate() {
+        return candidate;
+    }
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("entity", entity).add("candidate", candidate).toString();
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRegistryCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRegistryCommand.java
new file mode 100644 (file)
index 0000000..1cd96a4
--- /dev/null
@@ -0,0 +1,14 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+public abstract class CandidateRegistryCommand {
+    CandidateRegistryCommand() {
+        // Hidden on purpose
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InternalUpdateResponse.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InternalUpdateResponse.java
new file mode 100644 (file)
index 0000000..1759615
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.UpdateResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class InternalUpdateResponse extends CandidateRegistryCommand {
+    private final @NonNull UpdateResponse<ORMap<DOMEntity, ORSet<String>>> rsp;
+
+    public InternalUpdateResponse(final UpdateResponse<ORMap<DOMEntity, ORSet<String>>> rsp) {
+        this.rsp = requireNonNull(rsp);
+    }
+
+    public @NonNull UpdateResponse<ORMap<DOMEntity, ORSet<String>>> getRsp() {
+        return rsp;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/RegisterCandidate.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/RegisterCandidate.java
new file mode 100644 (file)
index 0000000..b76a203
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+/**
+ * Sent to Candidate registry to register the candidate for a given entity.
+ */
+public final class RegisterCandidate extends AbstractCandidateCommand {
+    public RegisterCandidate(final DOMEntity entity, final String candidate) {
+        super(entity, candidate);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/UnregisterCandidate.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/UnregisterCandidate.java
new file mode 100644 (file)
index 0000000..a39f3d2
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+/**
+ * Sent to CandidateRegistry to unregister the candidate for a given entity.
+ */
+public final class UnregisterCandidate extends AbstractCandidateCommand {
+    public UnregisterCandidate(final DOMEntity entity, final String candidate) {
+        super(entity, candidate);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/SingleEntityListenerActor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/SingleEntityListenerActor.java
new file mode 100644 (file)
index 0000000..279ee8f
--- /dev/null
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.owner;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.command.InitialOwnerSync;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.command.OwnerChanged;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Keeps track of owners for a single entity, which is mapped to a single LWWRegister in distributed-data.
+ * Notifies the listener responsible for tracking the whole entity-type of changes.
+ */
+public class SingleEntityListenerActor extends AbstractBehavior<ListenerCommand> {
+    private static final Logger LOG = LoggerFactory.getLogger(SingleEntityListenerActor.class);
+
+    private final String localMember;
+    private final DOMEntity entity;
+    private final ActorRef<TypeListenerCommand> toNotify;
+    private final ReplicatorMessageAdapter<ListenerCommand, LWWRegister<String>> ownerReplicator;
+
+    private String currentOwner = "";
+
+    public SingleEntityListenerActor(final ActorContext<ListenerCommand> context, final String localMember,
+                                     final DOMEntity entity, final ActorRef<TypeListenerCommand> toNotify) {
+        super(context);
+        this.localMember = localMember;
+        this.entity = entity;
+        this.toNotify = toNotify;
+
+        final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
+        ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+
+        ownerReplicator.askGet(
+            replyTo -> new Replicator.Get<>(new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
+            InitialOwnerSync::new);
+        LOG.debug("OwnerListenerActor for {} started", entity.toString());
+    }
+
+    public static Behavior<ListenerCommand> create(final String localMember, final DOMEntity entity,
+                                                   final ActorRef<TypeListenerCommand> toNotify) {
+        return Behaviors.setup(ctx -> new SingleEntityListenerActor(ctx, localMember, entity, toNotify));
+    }
+
+    @Override
+    public Receive<ListenerCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(OwnerChanged.class, this::onOwnerChanged)
+                .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
+                .build();
+    }
+
+    private Behavior<ListenerCommand> onInitialOwnerSync(final InitialOwnerSync ownerSync) {
+        final Replicator.GetResponse<LWWRegister<String>> response = ownerSync.getResponse();
+        LOG.debug("Received initial sync response for: {}, response: {}", entity, response);
+
+        // only trigger initial notification when there is no owner present as we wont get a subscription callback
+        // when distributed-data does not have any data for a key
+        if (response instanceof Replicator.NotFound) {
+
+            // no data is present, trigger initial notification with no owner
+            triggerNoOwnerNotification();
+        } else if (response instanceof Replicator.GetSuccess) {
+
+            // when we get a success just let subscribe callback handle the initial notification
+            LOG.debug("Owner present for entity: {} at the time of initial sync.", entity);
+        } else {
+            LOG.warn("Get has failed for entity: {}", response);
+        }
+
+        // make sure to subscribe AFTER initial notification
+        ownerReplicator.subscribe(new LWWRegisterKey<>(entity.toString()), OwnerChanged::new);
+
+        return this;
+    }
+
+    private void triggerNoOwnerNotification() {
+        LOG.debug("Triggering initial notification without an owner for: {}", entity);
+
+        toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
+                entity, EntityOwnershipChangeState.REMOTE_OWNERSHIP_LOST_NO_OWNER)));
+    }
+
+    private Behavior<ListenerCommand> onOwnerChanged(final OwnerChanged ownerChanged) {
+
+        final Replicator.SubscribeResponse<LWWRegister<String>> response = ownerChanged.getResponse();
+        if (response instanceof Replicator.Changed) {
+
+            final Replicator.Changed<LWWRegister<String>> registerChanged =
+                    (Replicator.Changed<LWWRegister<String>>) response;
+            LOG.debug("Owner changed for: {}, prevOwner: {}, newOwner: {}",
+                    entity, currentOwner, registerChanged.get(registerChanged.key()).getValue());
+            handleOwnerChange(registerChanged);
+        } else if (response instanceof Replicator.Deleted) {
+            handleOwnerLost((Replicator.Deleted<LWWRegister<String>>) response);
+        }
+
+        return this;
+    }
+
+    private void handleOwnerChange(final Replicator.Changed<LWWRegister<String>> changed) {
+        final String newOwner = changed.get(changed.key()).getValue();
+
+        final boolean wasOwner = currentOwner.equals(localMember);
+        final boolean isOwner = newOwner.equals(localMember);
+        final boolean hasOwner = !newOwner.equals("");
+
+        LOG.debug("Owner changed for entity:{}, currentOwner: {}, wasOwner: {}, isOwner: {}, hasOwner:{}",
+                entity, currentOwner, wasOwner, isOwner, hasOwner);
+
+        currentOwner = newOwner;
+
+        toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
+                entity, EntityOwnershipChangeState.from(wasOwner, isOwner, hasOwner))));
+    }
+
+    private void handleOwnerLost(final Replicator.Deleted<LWWRegister<String>> changed) {
+        final boolean wasOwner = currentOwner.equals(localMember);
+
+        LOG.debug("Owner lost for entity:{}, currentOwner: {}, wasOwner: {}", entity, currentOwner, wasOwner);
+
+        currentOwner = "";
+        toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
+                entity, EntityOwnershipChangeState.from(wasOwner, false, false))));
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/InitialOwnerSync.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/InitialOwnerSync.java
new file mode 100644 (file)
index 0000000..402389d
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.owner.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+
+public final class InitialOwnerSync extends ListenerCommand {
+    private final @NonNull GetResponse<LWWRegister<String>> response;
+
+    public InitialOwnerSync(final GetResponse<LWWRegister<String>> response) {
+        this.response = requireNonNull(response);
+    }
+
+    public @NonNull GetResponse<LWWRegister<String>> getResponse() {
+        return response;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/ListenerCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/ListenerCommand.java
new file mode 100644 (file)
index 0000000..b0502f5
--- /dev/null
@@ -0,0 +1,14 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.owner.command;
+
+public abstract class ListenerCommand {
+    ListenerCommand() {
+        // Hidden on purpose
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/OwnerChanged.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/command/OwnerChanged.java
new file mode 100644 (file)
index 0000000..4e1298a
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.owner.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Notification from distributed-data sent to the SingleEntityListenerActor when owner changes for the tracked entity.
+ */
+public final class OwnerChanged extends ListenerCommand {
+    private final @NonNull SubscribeResponse<LWWRegister<String>> response;
+
+    public OwnerChanged(final SubscribeResponse<LWWRegister<String>> response) {
+        this.response = requireNonNull(response);
+    }
+
+    public @NonNull SubscribeResponse<LWWRegister<String>> getResponse() {
+        return response;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java
new file mode 100644 (file)
index 0000000..81772cf
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator.Changed;
+import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.SingleEntityListenerActor;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EntityTypeListenerActor extends AbstractBehavior<TypeListenerCommand> {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityTypeListenerActor.class);
+
+    private final Map<DOMEntity, ActorRef<ListenerCommand>> activeListeners = new HashMap<>();
+    private final String localMember;
+    private final String entityType;
+    private final DOMEntityOwnershipListener listener;
+
+    public EntityTypeListenerActor(final ActorContext<TypeListenerCommand> context, final String localMember,
+                                   final String entityType, final DOMEntityOwnershipListener listener) {
+        super(context);
+        this.localMember = localMember;
+        this.entityType = entityType;
+        this.listener = listener;
+
+        new ReplicatorMessageAdapter<TypeListenerCommand, ORMap<DOMEntity, ORSet<String>>>(context,
+            DistributedData.get(context.getSystem()).replicator(), Duration.ofSeconds(5))
+                .subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
+    }
+
+    public static Behavior<TypeListenerCommand> create(final String localMember, final String entityType,
+                                                       final DOMEntityOwnershipListener listener) {
+        return Behaviors.setup(ctx -> new EntityTypeListenerActor(ctx, localMember, entityType, listener));
+    }
+
+    @Override
+    public Receive<TypeListenerCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
+                .onMessage(EntityOwnerChanged.class, this::onOwnerChanged)
+                .build();
+    }
+
+    private Behavior<TypeListenerCommand> onCandidatesChanged(final CandidatesChanged notification) {
+        final SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response = notification.getResponse();
+        if (response instanceof Changed) {
+            processCandidates(((Changed<ORMap<DOMEntity, ORSet<String>>>) response).get(response.key()).getEntries());
+        } else {
+            LOG.warn("Unexpected notification from replicator: {}", response);
+        }
+        return this;
+    }
+
+    private void processCandidates(final Map<DOMEntity, ORSet<String>> entries) {
+        final Map<DOMEntity, ORSet<String>> filteredCandidates = entries.entrySet().stream()
+            .filter(entry -> entry.getKey().getType().equals(entityType))
+            .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+        LOG.debug("Entity-type: {} current candidates: {}", entityType, filteredCandidates);
+
+        final Set<DOMEntity> removed =
+                ImmutableSet.copyOf(Sets.difference(activeListeners.keySet(), filteredCandidates.keySet()));
+        if (!removed.isEmpty()) {
+            LOG.debug("Stopping listeners for {}", removed);
+            // kill actors for the removed
+            removed.forEach(removedEntity -> getContext().stop(activeListeners.remove(removedEntity)));
+        }
+
+        for (final Entry<DOMEntity, ORSet<String>> entry : filteredCandidates.entrySet()) {
+            activeListeners.computeIfAbsent(entry.getKey(), key -> {
+                // spawn actor for this entity
+                LOG.debug("Starting listener for {}", key);
+                return getContext().spawn(SingleEntityListenerActor.create(localMember, key, getContext().getSelf()),
+                    "SingleEntityListener-" + encodeEntityToActorName(key));
+            });
+        }
+    }
+
+    private Behavior<TypeListenerCommand> onOwnerChanged(final EntityOwnerChanged rsp) {
+        LOG.debug("Entity-type: {} listener, owner change: {}", entityType, rsp);
+
+        listener.ownershipChanged(rsp.getOwnershipChange());
+        return this;
+    }
+
+    private static String encodeEntityToActorName(final DOMEntity entity) {
+        return "type=" + entity.getType() + ",entity="
+                + entity.getIdentifier().getLastPathArgument().getNodeType().getLocalName() + "-" + UUID.randomUUID();
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerRegistry.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerRegistry.java
new file mode 100644 (file)
index 0000000..c853124
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EntityTypeListenerRegistry extends AbstractBehavior<TypeListenerRegistryCommand> {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityTypeListenerRegistry.class);
+
+    private final Map<DOMEntityOwnershipListener, ActorRef<TypeListenerCommand>> spawnedListenerActors =
+            new HashMap<>();
+    private final String localMember;
+
+    public EntityTypeListenerRegistry(final ActorContext<TypeListenerRegistryCommand> context,
+                                      final String localMember) {
+        super(context);
+        this.localMember = requireNonNull(localMember);
+    }
+
+    public static Behavior<TypeListenerRegistryCommand> create(final String role) {
+        return Behaviors.setup(ctx -> new EntityTypeListenerRegistry(ctx, role));
+    }
+
+    @Override
+    public Receive<TypeListenerRegistryCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(RegisterListener.class, this::onRegisterListener)
+                .onMessage(UnregisterListener.class, this::onUnregisterListener)
+                .build();
+    }
+
+    private Behavior<TypeListenerRegistryCommand> onRegisterListener(final RegisterListener command) {
+        LOG.debug("Spawning entity type listener actor for: {}", command.getEntityType());
+
+        final ActorRef<TypeListenerCommand> listenerActor =
+                getContext().spawn(EntityTypeListenerActor.create(localMember,
+                        command.getEntityType(), command.getDelegateListener()),
+                        "TypeListener:" + encodeEntityToActorName(command.getEntityType()));
+        spawnedListenerActors.put(command.getDelegateListener(), listenerActor);
+        return this;
+    }
+
+    private Behavior<TypeListenerRegistryCommand> onUnregisterListener(final UnregisterListener command) {
+        LOG.debug("Stopping entity type listener actor for: {}", command.getEntityType());
+
+        getContext().stop(spawnedListenerActors.remove(command.getDelegateListener()));
+        return this;
+    }
+
+    private static String encodeEntityToActorName(final String entityType) {
+        return "type=" + entityType + "-" + UUID.randomUUID();
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/CandidatesChanged.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/CandidatesChanged.java
new file mode 100644 (file)
index 0000000..07a4994
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+/**
+ * Adapted notification from distributed-data sent to EntityTypeListenerActor when candidates change.
+ */
+public final class CandidatesChanged extends TypeListenerCommand {
+    private final @NonNull SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response;
+
+    public CandidatesChanged(final SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response) {
+        this.response = requireNonNull(response);
+    }
+
+    public @NonNull SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+        return response;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("response", response).toString();
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/EntityOwnerChanged.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/EntityOwnerChanged.java
new file mode 100644 (file)
index 0000000..02d0e2f
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
+
+/**
+ * Notification sent to EntityTypeListenerActor when there is an owner change for an Entity of a given type.
+ */
+public final class EntityOwnerChanged extends TypeListenerCommand {
+    private final @NonNull DOMEntityOwnershipChange ownershipChange;
+
+    public EntityOwnerChanged(final DOMEntityOwnershipChange ownershipChange) {
+        this.ownershipChange = requireNonNull(ownershipChange);
+    }
+
+    public @NonNull DOMEntityOwnershipChange getOwnershipChange() {
+        return ownershipChange;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("ownershipChange", ownershipChange).toString();
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/RegisterListener.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/RegisterListener.java
new file mode 100644 (file)
index 0000000..ffa3d47
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+
+/**
+ * Register a DOMEntityOwnershipListener for a given entity-type.
+ */
+public final class RegisterListener extends TypeListenerRegistryCommand {
+    public RegisterListener(final String entityType, final DOMEntityOwnershipListener delegateListener) {
+        super(entityType, delegateListener);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerCommand.java
new file mode 100644 (file)
index 0000000..939d4a2
--- /dev/null
@@ -0,0 +1,14 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+public abstract class TypeListenerCommand {
+    TypeListenerCommand() {
+        // Hidden on purpose
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerRegistryCommand.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/TypeListenerRegistryCommand.java
new file mode 100644 (file)
index 0000000..3bfce42
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+import static java.util.Objects.requireNonNull;
+
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+
+public abstract class TypeListenerRegistryCommand {
+    private final @NonNull String entityType;
+    private final @NonNull DOMEntityOwnershipListener delegateListener;
+
+    TypeListenerRegistryCommand(final String entityType, final DOMEntityOwnershipListener delegateListener) {
+        this.entityType = requireNonNull(entityType);
+        this.delegateListener = requireNonNull(delegateListener);
+    }
+
+    public final @NonNull String getEntityType() {
+        return entityType;
+    }
+
+    public final @NonNull DOMEntityOwnershipListener getDelegateListener() {
+        return delegateListener;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/UnregisterListener.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/command/UnregisterListener.java
new file mode 100644 (file)
index 0000000..4bd09c8
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.type.command;
+
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+
+/**
+ * Unregister a listener from the EntityTypeListenerRegistry.
+ */
+public final class UnregisterListener extends TypeListenerRegistryCommand {
+    public UnregisterListener(final String entityType, final DOMEntityOwnershipListener delegateListener) {
+        super(entityType, delegateListener);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java
new file mode 100644 (file)
index 0000000..8d10119
--- /dev/null
@@ -0,0 +1,291 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.Adapter;
+import akka.actor.typed.javadsl.AskPattern;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
+import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
+import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractNativeEosTest {
+
+    public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
+    public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
+
+    protected static final List<String> TWO_NODE_SEED_NODES =
+            List.of("akka://ClusterSystem@127.0.0.1:2550",
+                    "akka://ClusterSystem@127.0.0.1:2551");
+
+    protected static final List<String> THREE_NODE_SEED_NODES =
+            List.of("akka://ClusterSystem@127.0.0.1:2550",
+                    "akka://ClusterSystem@127.0.0.1:2551",
+                    "akka://ClusterSystem@127.0.0.1:2552");
+
+    private static final String REMOTE_PROTOCOL = "akka";
+    private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
+    private static final String ROLE_PARAM = "akka.cluster.roles";
+    private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
+
+
+    protected static ClusterNode startupRemote(final int port, final List<String> roles)
+            throws ExecutionException, InterruptedException {
+        return startup(port, roles, THREE_NODE_SEED_NODES);
+    }
+
+    protected static ClusterNode startupRemote(final int port, final List<String> roles, final List<String> seedNodes)
+            throws ExecutionException, InterruptedException {
+        return startup(port, roles, seedNodes);
+    }
+
+    protected static ClusterNode startup(final int port, final List<String> roles)
+            throws ExecutionException, InterruptedException {
+        return startup(port, roles, List.of());
+    }
+
+    protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes)
+            throws ExecutionException, InterruptedException {
+
+        return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior);
+    }
+
+    protected static ClusterNode startup(final int port, final List<String> roles, final List<String> seedNodes,
+                                         final Supplier<Behavior<BootstrapCommand>> bootstrap)
+            throws ExecutionException, InterruptedException {
+        // Override the configuration
+        final Map<String, Object> overrides = new HashMap<>(4);
+        overrides.put(PORT_PARAM, port);
+        overrides.put(ROLE_PARAM, roles);
+        if (!seedNodes.isEmpty()) {
+            overrides.put(SEED_NODES_PARAM, seedNodes);
+        }
+
+        final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load());
+
+        // Create a classic Akka system since thats what we will have in osgi
+        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
+        final ActorRef<BootstrapCommand> eosBootstrap =
+                Adapter.spawn(system, bootstrap.get(), "EOSBootstrap");
+
+        final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
+                GetRunningContext::new,
+                Duration.ofSeconds(5),
+                Adapter.toTyped(system.scheduler()));
+        final RunningContext runningContext = ask.toCompletableFuture().get();
+
+        return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
+                runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+    }
+
+    private static Behavior<BootstrapCommand> rootBehavior() {
+        return Behaviors.setup(context -> EOSMain.create());
+    }
+
+    protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
+        final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
+        registerCandidates(candidateRegistry, entity, members);
+    }
+
+    protected static void registerCandidates(final ActorRef<CandidateRegistryCommand> candidateRegistry,
+                                             final DOMEntity entity, final String... members) {
+        for (final String member : members) {
+            candidateRegistry.tell(new RegisterCandidate(entity, member));
+        }
+    }
+
+    protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity,
+                                               final String... members) {
+        final ActorRef<CandidateRegistryCommand> candidateRegistry = node.getCandidateRegistry();
+        for (final String member : members) {
+            candidateRegistry.tell(new UnregisterCandidate(entity, member));
+        }
+    }
+
+    protected static  MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) {
+        final ActorRef<TypeListenerRegistryCommand> listenerRegistry = node.getListenerRegistry();
+        final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0));
+        listenerRegistry.tell(new RegisterListener(entity.getType(), listener));
+
+        return listener;
+    }
+
+    protected static void reachableMember(final ClusterNode node, final String role) {
+        reachableMember(node.getOwnerSupervisor(), role);
+    }
+
+    protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
+        ownerSupervisor.tell(new MemberReachableEvent(
+                new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
+    }
+
+    protected static void unreachableMember(final ClusterNode node, final String role) {
+        unreachableMember(node.getOwnerSupervisor(), role);
+    }
+
+    protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
+        ownerSupervisor.tell(new MemberUnreachableEvent(
+                new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
+    }
+
+    protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
+        await().until(() -> {
+            final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
+            final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
+                    AskPattern.ask(distributedData.replicator(),
+                            replyTo -> new Replicator.Get<>(
+                                    new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
+                            Duration.ofSeconds(5),
+                            clusterNode.getActorSystem().scheduler());
+
+            final Replicator.GetResponse<LWWRegister<String>> response =
+                    ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+            if (response instanceof Replicator.GetSuccess) {
+                final String owner = ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue();
+                return !owner.isEmpty();
+            }
+
+            return false;
+        });
+    }
+
+    protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
+                                              final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
+        await().until(() -> !listener.getChanges().isEmpty());
+
+        await().untilAsserted(() -> {
+            final List<DOMEntityOwnershipChange> changes = listener.getChanges();
+            final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
+            assertEquals(entity, domEntityOwnershipChange.getEntity());
+
+            assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
+            assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
+            assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
+        });
+    }
+
+    protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
+        await().pollDelay(2, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
+    }
+
+    protected static final class ClusterNode {
+        private final int port;
+        private final List<String> roles;
+        private final akka.actor.typed.ActorSystem<Void> actorSystem;
+        private final ActorRef<BootstrapCommand> eosBootstrap;
+        private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
+        private final ActorRef<CandidateRegistryCommand> candidateRegistry;
+        private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+
+        private ClusterNode(final int port,
+                            final List<String> roles,
+                            final ActorSystem actorSystem,
+                            final ActorRef<BootstrapCommand> eosBootstrap,
+                            final ActorRef<TypeListenerRegistryCommand> listenerRegistry,
+                            final ActorRef<CandidateRegistryCommand> candidateRegistry,
+                            final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
+            this.port = port;
+            this.roles = roles;
+            this.actorSystem = Adapter.toTyped(actorSystem);
+            this.eosBootstrap = eosBootstrap;
+            this.listenerRegistry = listenerRegistry;
+            this.candidateRegistry = candidateRegistry;
+            this.ownerSupervisor = ownerSupervisor;
+        }
+
+        public int getPort() {
+            return port;
+        }
+
+        public akka.actor.typed.ActorSystem<Void> getActorSystem() {
+            return actorSystem;
+        }
+
+        public ActorRef<BootstrapCommand> getEosBootstrap() {
+            return eosBootstrap;
+        }
+
+        public ActorRef<TypeListenerRegistryCommand> getListenerRegistry() {
+            return listenerRegistry;
+        }
+
+        public ActorRef<CandidateRegistryCommand> getCandidateRegistry() {
+            return candidateRegistry;
+        }
+
+        public ActorRef<OwnerSupervisorCommand> getOwnerSupervisor() {
+            return ownerSupervisor;
+        }
+
+        public List<String> getRoles() {
+            return roles;
+        }
+    }
+
+    protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
+
+        private final Logger log = LoggerFactory.getLogger(MockEntityOwnershipListener.class);
+
+        private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
+        private final String member;
+
+        public MockEntityOwnershipListener(final String member) {
+
+            this.member = member;
+        }
+
+        @Override
+        public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
+            log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
+            log.info("{} changes: {}", member, changes.size());
+            changes.add(ownershipChange);
+        }
+
+        public List<DOMEntityOwnershipChange> getChanges() {
+            return changes;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java
new file mode 100644 (file)
index 0000000..0c4b5ea
--- /dev/null
@@ -0,0 +1,245 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import akka.actor.ActorSystem;
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.javadsl.Adapter;
+import akka.actor.typed.javadsl.AskPattern;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import com.typesafe.config.ConfigFactory;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import org.awaitility.Durations;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class AkkaEntityOwnershipServiceTest extends AbstractNativeEosTest {
+    static final String ENTITY_TYPE = "test";
+    static final String ENTITY_TYPE2 = "test2";
+    static final QName QNAME = QName.create("test", "2015-08-11", "foo");
+    static int ID_COUNTER = 1;
+
+    private ActorSystem system;
+    private akka.actor.typed.ActorSystem<Void> typedSystem;
+    private AkkaEntityOwnershipService service;
+    private ActorRef<Replicator.Command> replicator;
+
+    @Before
+    public void setUp() throws Exception {
+        system = ActorSystem.create("ClusterSystem", ConfigFactory.load());
+        typedSystem = Adapter.toTyped(this.system);
+        replicator = DistributedData.get(typedSystem).replicator();
+
+        service = new AkkaEntityOwnershipService(system);
+    }
+
+    @After
+    public void tearDown() throws InterruptedException, ExecutionException {
+        service.close();
+        ActorTestKit.shutdown(Adapter.toTyped(system));
+    }
+
+    @Test
+    public void testRegisterCandidate() throws Exception {
+        final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
+        final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
+
+        final DOMEntityOwnershipCandidateRegistration reg = service.registerCandidate(entity);
+
+        verifyEntityOwnershipCandidateRegistration(entity, reg);
+        verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1");
+
+        try {
+            service.registerCandidate(entity);
+            fail("Expected CandidateAlreadyRegisteredException");
+        } catch (final CandidateAlreadyRegisteredException e) {
+            // expected
+            assertEquals("getEntity", entity, e.getEntity());
+        }
+
+        final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE2, entityId);
+        final DOMEntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2);
+
+        verifyEntityOwnershipCandidateRegistration(entity2, reg2);
+        verifyEntityCandidateRegistered(ENTITY_TYPE2, entityId, "member-1");
+    }
+
+    @Test
+    public void testUnregisterCandidate() throws Exception {
+        final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
+        final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
+
+        final DOMEntityOwnershipCandidateRegistration reg = service.registerCandidate(entity);
+
+        verifyEntityOwnershipCandidateRegistration(entity, reg);
+        verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1");
+
+        reg.close();
+        verifyEntityCandidateMissing(ENTITY_TYPE, entityId, "member-1");
+
+        service.registerCandidate(entity);
+        verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1");
+    }
+
+    @Test
+    public void testListenerRegistration() throws Exception {
+
+        final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
+        final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
+        final MockEntityOwnershipListener listener = new MockEntityOwnershipListener("member-1");
+
+        final DOMEntityOwnershipListenerRegistration reg = service.registerListener(entity.getType(), listener);
+
+        assertNotNull("EntityOwnershipListenerRegistration null", reg);
+        assertEquals("getEntityType", entity.getType(), reg.getEntityType());
+        assertEquals("getInstance", listener, reg.getInstance());
+
+        final DOMEntityOwnershipCandidateRegistration candidate = service.registerCandidate(entity);
+
+        verifyListenerState(listener, entity, true, true, false);
+        final int changes = listener.getChanges().size();
+
+        reg.close();
+        candidate.close();
+
+        verifyEntityCandidateMissing(ENTITY_TYPE, entityId, "member-1");
+
+        service.registerCandidate(entity);
+        // check listener not called when listener registration closed
+        await().pollDelay(Durations.TWO_SECONDS).until(() -> listener.getChanges().size() == changes);
+    }
+
+    @Test
+    public void testGetOwnershipState() throws Exception {
+        final DOMEntity entity = new DOMEntity(ENTITY_TYPE, "one");
+
+        final DOMEntityOwnershipCandidateRegistration registration = service.registerCandidate(entity);
+        verifyGetOwnershipState(service, entity, EntityOwnershipState.IS_OWNER);
+
+        final RunningContext runningContext = service.getRunningContext();
+        registerCandidates(runningContext.getCandidateRegistry(), entity, "member-2");
+
+        final ActorRef<OwnerSupervisorCommand> ownerSupervisor = runningContext.getOwnerSupervisor();
+        reachableMember(ownerSupervisor, "member-2");
+        unreachableMember(ownerSupervisor, "member-1");
+        verifyGetOwnershipState(service, entity, EntityOwnershipState.OWNED_BY_OTHER);
+
+        final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, "two");
+        final Optional<EntityOwnershipState> state = service.getOwnershipState(entity2);
+        assertFalse(state.isPresent());
+
+        unreachableMember(ownerSupervisor, "member-2");
+        verifyGetOwnershipState(service, entity, EntityOwnershipState.NO_OWNER);
+    }
+
+    @Test
+    public void testIsCandidateRegistered() throws Exception {
+        final DOMEntity test = new DOMEntity("test-type", "test");
+
+        assertFalse(service.isCandidateRegistered(test));
+
+        service.registerCandidate(test);
+
+        assertTrue(service.isCandidateRegistered(test));
+    }
+
+    private static void verifyGetOwnershipState(final DOMEntityOwnershipService service, final DOMEntity entity,
+                                                final EntityOwnershipState expState) {
+        await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
+            final Optional<EntityOwnershipState> state = service.getOwnershipState(entity);
+            assertTrue("getOwnershipState present", state.isPresent());
+            assertEquals("EntityOwnershipState", expState, state.get());
+        });
+    }
+
+    private void verifyEntityCandidateRegistered(final String entityType,
+                                                 final YangInstanceIdentifier entityId,
+                                                 final String candidateName) {
+        await().atMost(Duration.ofSeconds(5))
+                .untilAsserted(() -> doVerifyEntityCandidateRegistered(entityType, entityId, candidateName));
+    }
+
+    private void doVerifyEntityCandidateRegistered(final String entityType,
+                                                   final YangInstanceIdentifier entityId,
+                                                   final String candidateName)
+            throws ExecutionException, InterruptedException {
+        final Map<DOMEntity, ORSet<String>> entries = getCandidateData();
+        final DOMEntity entity = new DOMEntity(entityType, entityId);
+        assertTrue(entries.containsKey(entity));
+        assertTrue(entries.get(entity).getElements().contains(candidateName));
+    }
+
+    private void verifyEntityCandidateMissing(final String entityType,
+                                              final YangInstanceIdentifier entityId,
+                                              final String candidateName) {
+        await().atMost(Duration.ofSeconds(5))
+                .untilAsserted(() -> doVerifyEntityCandidateMissing(entityType, entityId, candidateName));
+    }
+
+    private void doVerifyEntityCandidateMissing(final String entityType,
+                                                final YangInstanceIdentifier entityId,
+                                                final String candidateName)
+            throws ExecutionException, InterruptedException {
+        final Map<DOMEntity, ORSet<String>> entries = getCandidateData();
+        final DOMEntity entity = new DOMEntity(entityType, entityId);
+        assertTrue(entries.containsKey(entity));
+        assertFalse(entries.get(entity).getElements().contains(candidateName));
+    }
+
+    private Map<DOMEntity, ORSet<String>> getCandidateData() throws ExecutionException, InterruptedException {
+        final CompletionStage<Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>>> ask =
+                AskPattern.ask(replicator, replyTo ->
+                                new Replicator.Get<>(
+                                        CandidateRegistry.KEY,
+                                        Replicator.readLocal(),
+                                        replyTo),
+                        Duration.ofSeconds(5),
+                        typedSystem.scheduler());
+
+        final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = ask.toCompletableFuture().get();
+        assertTrue(response instanceof Replicator.GetSuccess);
+
+        final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> success =
+                (Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response;
+
+        return success.get(CandidateRegistry.KEY).getEntries();
+    }
+
+    private static void verifyEntityOwnershipCandidateRegistration(final DOMEntity entity,
+                                                                   final DOMEntityOwnershipCandidateRegistration reg) {
+        assertNotNull("EntityOwnershipCandidateRegistration null", reg);
+        assertEquals("getInstance", entity, reg.getInstance());
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/SingleNodeTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/SingleNodeTest.java
new file mode 100644 (file)
index 0000000..043af99
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class SingleNodeTest extends AbstractNativeEosTest {
+
+    public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
+    public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
+
+    private ClusterNode clusterNode;
+
+    @Before
+    public void setUp() throws Exception {
+        clusterNode = startup(2550, List.of("member-1"));
+
+        reachableMember(clusterNode, "member-2");
+        reachableMember(clusterNode, "member-3");
+    }
+
+    @After
+    public void tearDown() {
+        ActorTestKit.shutdown(clusterNode.getActorSystem());
+    }
+
+    @Test
+    public void testNotificationPriorToCandidateRegistration() {
+        final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
+        verifyNoNotifications(listener);
+
+        registerCandidates(clusterNode, ENTITY_1, "member-1");
+        verifyListenerState(listener, ENTITY_1, true, true, false);
+    }
+
+    @Test
+    public void testListenerPriorToAddingCandidates() {
+        final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
+
+        registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
+        verifyListenerState(listener, ENTITY_1, true, true, false);
+
+        unregisterCandidates(clusterNode, ENTITY_1, "member-1");
+        verifyListenerState(listener, ENTITY_1, true, false, true);
+    }
+
+    @Test
+    public void testListenerRegistrationAfterCandidates() {
+        registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
+        waitUntillOwnerPresent(clusterNode, ENTITY_1);
+
+        final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
+        verifyListenerState(listener, ENTITY_1, true, true, false);
+
+        unregisterCandidates(clusterNode, ENTITY_1, "member-1", "member-2");
+        verifyListenerState(listener, ENTITY_1, true, false, true);
+    }
+
+    @Test
+    public void testMultipleEntities() {
+        registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
+        waitUntillOwnerPresent(clusterNode, ENTITY_1);
+
+        final MockEntityOwnershipListener listener1 = registerListener(clusterNode, ENTITY_1);
+        final MockEntityOwnershipListener listener2 = registerListener(clusterNode, ENTITY_2);
+
+        verifyListenerState(listener1, ENTITY_1, true, true, false);
+        verifyNoNotifications(listener2);
+
+        unregisterCandidates(clusterNode, ENTITY_1, "member-1");
+        verifyListenerState(listener1, ENTITY_1, true, false, true);
+        verifyNoNotifications(listener2);
+
+        registerCandidates(clusterNode, ENTITY_2, "member-2");
+        verifyListenerState(listener1, ENTITY_1, true, false, true);
+        verifyListenerState(listener2, ENTITY_2, true, false, false);
+
+        unregisterCandidates(clusterNode, ENTITY_2, "member-2");
+
+        verifyListenerState(listener1, ENTITY_1, true, false, true);
+        verifyListenerState(listener2, ENTITY_2, false, false, false);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeBaseTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeBaseTest.java
new file mode 100644 (file)
index 0000000..352a842
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.typed.Cluster;
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.List;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class ThreeNodeBaseTest extends AbstractNativeEosTest {
+    public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
+    public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
+
+    private ClusterNode node1;
+    private ClusterNode node2;
+    private ClusterNode node3;
+
+    @Before
+    public void setUp() throws Exception {
+        node1 = startupRemote(2550, List.of("member-1"));
+        node2 = startupRemote(2551, List.of("member-2"));
+        node3 = startupRemote(2552, List.of("member-3"));
+
+        // need to wait until all nodes are ready
+        final Cluster cluster = Cluster.get(node3.getActorSystem());
+        // need a longer timeout with classic remoting, artery.tcp doesnt need to wait as long for init
+        Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+            final List<Member> members = ImmutableList.copyOf(cluster.state().getMembers());
+            if (members.size() != 3) {
+                return false;
+            }
+
+            for (final Member member : members) {
+                if (!member.status().equals(MemberStatus.up())) {
+                    return false;
+                }
+            }
+
+            return true;
+        });
+    }
+
+    @After
+    public void tearDown() {
+        // same issue with classic remoting as in setup
+        ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
+        ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
+        ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20));
+    }
+
+    @Test
+    public void testInitialNotificationsWithoutOwner() throws Exception {
+        final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
+        verifyNoNotifications(listener1);
+
+        final MockEntityOwnershipListener listener2 = registerListener(node2, ENTITY_1);
+        verifyNoNotifications(listener2);
+
+        final MockEntityOwnershipListener listener3 = registerListener(node3, ENTITY_1);
+        verifyNoNotifications(listener3);
+    }
+
+    @Test
+    public void testInitialNotificationsWithOwner() {
+        registerCandidates(node1, ENTITY_1, "member-1");
+        // make sure we register other candidates after the first is seen everywhere to prevent different results due
+        // to timing
+        waitUntillOwnerPresent(node3, ENTITY_1);
+
+        registerCandidates(node2, ENTITY_1, "member-2");
+        registerCandidates(node3, ENTITY_1, "member-3");
+
+        final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
+        verifyListenerState(listener1, ENTITY_1, true, true, false);
+
+        final MockEntityOwnershipListener listener2 = registerListener(node2, ENTITY_1);
+        verifyListenerState(listener2, ENTITY_1, true, false, false);
+
+        final MockEntityOwnershipListener listener3 = registerListener(node3, ENTITY_1);
+        verifyListenerState(listener3, ENTITY_1, true, false, false);
+    }
+
+    @Test
+    public void testMultipleEntities() {
+        registerCandidates(node1, ENTITY_1, "member-1");
+        registerCandidates(node2, ENTITY_1, "member-2");
+        registerCandidates(node3, ENTITY_1, "member-3");
+
+        waitUntillOwnerPresent(node3, ENTITY_1);
+
+        registerCandidates(node2, ENTITY_2, "member-2");
+        waitUntillOwnerPresent(node2, ENTITY_2);
+        registerCandidates(node1, ENTITY_2, "member-1");
+
+        final MockEntityOwnershipListener firstEntityListener1 = registerListener(node1, ENTITY_1);
+        final MockEntityOwnershipListener firstEntityListener2 = registerListener(node2, ENTITY_1);
+        final MockEntityOwnershipListener firstEntityListener3 = registerListener(node3, ENTITY_1);
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
+        verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false);
+        verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+
+        final MockEntityOwnershipListener secondEntityListener1 = registerListener(node1, ENTITY_2);
+        final MockEntityOwnershipListener secondEntityListener2 = registerListener(node2, ENTITY_2);
+        final MockEntityOwnershipListener secondEntityListener3 = registerListener(node3, ENTITY_2);
+
+        verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false);
+        verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false);
+        verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
+
+        unregisterCandidates(node1, ENTITY_1, "member-1");
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, true, false, true);
+        verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false);
+        verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+
+        unregisterCandidates(node2, ENTITY_1, "member-2");
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false);
+        verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
+        verifyListenerState(firstEntityListener3, ENTITY_1, true, true, false);
+
+        unregisterCandidates(node3, ENTITY_1, "member-3");
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, false, false, false);
+        verifyListenerState(firstEntityListener2, ENTITY_1, false, false, false);
+        verifyListenerState(firstEntityListener3, ENTITY_1, false, false, true);
+
+        // check second listener hasnt moved
+        verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false);
+        verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false);
+        verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
+
+        registerCandidates(node1, ENTITY_1, "member-1");
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
+        verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false);
+        verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeReachabilityTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeReachabilityTest.java
new file mode 100644 (file)
index 0000000..871bc00
--- /dev/null
@@ -0,0 +1,251 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static org.awaitility.Awaitility.await;
+
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.typed.Cluster;
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class ThreeNodeReachabilityTest extends AbstractNativeEosTest {
+    public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
+    public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
+
+    private ClusterNode node1 = null;
+    private ClusterNode node2 = null;
+    private ClusterNode node3 = null;
+
+    @Before
+    public void setUp() throws Exception {
+        node1 = startupRemote(2550, List.of("member-1"), TWO_NODE_SEED_NODES);
+        node2 = startupRemote(2551, List.of("member-2"), TWO_NODE_SEED_NODES);
+
+        // need to wait until all nodes are ready
+        final Cluster cluster = Cluster.get(node2.getActorSystem());
+        await().atMost(Duration.ofSeconds(20)).until(() -> {
+            final List<Member> members = ImmutableList.copyOf(cluster.state().getMembers());
+            if (members.size() != 2) {
+                return false;
+            }
+
+            for (final Member member : members) {
+                if (!member.status().equals(MemberStatus.up())) {
+                    return false;
+                }
+            }
+
+            return true;
+        });
+    }
+
+    @After
+    public void tearDown() {
+        ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
+        ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
+
+
+        if (node3 != null) {
+            ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20));
+        }
+    }
+
+    @Test
+    public void testNodeLateStart() throws Exception {
+        registerCandidates(node1, ENTITY_1, "member-1");
+        registerCandidates(node2, ENTITY_1, "member-2");
+
+        registerCandidates(node2, ENTITY_2, "member-2");
+        waitUntillOwnerPresent(node2, ENTITY_2);
+        registerCandidates(node1, ENTITY_2, "member-1");
+
+        final MockEntityOwnershipListener firstEntityListener1 = registerListener(node1, ENTITY_1);
+        final MockEntityOwnershipListener firstEntityListener2 = registerListener(node2, ENTITY_1);
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
+        verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false);
+
+        final MockEntityOwnershipListener secondEntityListener1 = registerListener(node1, ENTITY_2);
+        final MockEntityOwnershipListener secondEntityListener2 = registerListener(node2, ENTITY_2);
+
+        verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false);
+        verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false);
+
+        unregisterCandidates(node1, ENTITY_1, "member-1");
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, true, false, true);
+        verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false);
+
+        unregisterCandidates(node2, ENTITY_1, "member-2");
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, false, false, false);
+        verifyListenerState(firstEntityListener2, ENTITY_1, false, false, true);
+
+        startNode3();
+
+        final MockEntityOwnershipListener firstEntityListener3 = registerListener(node3, ENTITY_1);
+        verifyListenerState(firstEntityListener3, ENTITY_1, false, false, false);
+
+        final MockEntityOwnershipListener secondEntityListener3 = registerListener(node3, ENTITY_2);
+        verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
+
+        registerCandidates(node3, ENTITY_1, "member-3");
+        waitUntillOwnerPresent(node3, ENTITY_1);
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false);
+        verifyListenerState(firstEntityListener2, ENTITY_1, true, false, false);
+
+        verifyListenerState(firstEntityListener3, ENTITY_1, true, true, false);
+    }
+
+    @Test
+    public void testReachabilityChangesDuringRuntime() throws Exception {
+        startNode3();
+
+        registerCandidates(node2, ENTITY_1, "member-2");
+        // we want singleton on node1 but owner on node2
+        waitUntillOwnerPresent(node2, ENTITY_1);
+
+        registerCandidates(node1, ENTITY_1, "member-1");
+        registerCandidates(node3, ENTITY_1, "member-3");
+
+        registerCandidates(node2, ENTITY_2, "member-2");
+        waitUntillOwnerPresent(node2, ENTITY_2);
+        registerCandidates(node1, ENTITY_2, "member-1");
+
+        final MockEntityOwnershipListener firstEntityListener1 = registerListener(node1, ENTITY_1);
+        final MockEntityOwnershipListener firstEntityListener2 = registerListener(node2, ENTITY_1);
+        final MockEntityOwnershipListener firstEntityListener3 = registerListener(node3, ENTITY_1);
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false);
+        verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false);
+        verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+
+        final MockEntityOwnershipListener secondEntityListener1 = registerListener(node1, ENTITY_2);
+        final MockEntityOwnershipListener secondEntityListener2 = registerListener(node2, ENTITY_2);
+        final MockEntityOwnershipListener secondEntityListener3 = registerListener(node3, ENTITY_2);
+
+        verifyListenerState(secondEntityListener1, ENTITY_2, true, false, false);
+        verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false);
+        verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
+
+        unreachableMember(node1, "member-2");
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
+        verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
+        verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+
+        verifyListenerState(secondEntityListener1, ENTITY_2, true, true, false);
+        verifyListenerState(secondEntityListener2, ENTITY_2, true, false, true);
+        verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
+
+        unreachableMember(node1, "member-3");
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
+        verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
+        verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+
+        unregisterCandidates(node1, ENTITY_1, "member-1");
+        unregisterCandidates(node1, ENTITY_2, "member-1");
+
+        verifyListenerState(firstEntityListener1, ENTITY_1, false, false, true);
+        verifyListenerState(firstEntityListener2, ENTITY_1, false, false, false);
+        verifyListenerState(firstEntityListener3, ENTITY_1, false, false, false);
+
+        verifyListenerState(secondEntityListener1, ENTITY_2, false, false, true);
+        verifyListenerState(secondEntityListener2, ENTITY_2, false, false, false);
+        verifyListenerState(secondEntityListener3, ENTITY_2, false, false, false);
+
+        reachableMember(node1, "member-2");
+        verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false);
+        verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false);
+        verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
+    }
+
+    @Test
+    public void testSingletonMoving() throws Exception {
+        final MockEntityOwnershipListener listener1 = registerListener(node2, ENTITY_1);
+        final MockEntityOwnershipListener listener2 = registerListener(node2, ENTITY_2);
+        verifyNoNotifications(listener1);
+        verifyNoNotifications(listener2);
+
+        registerCandidates(node1, ENTITY_1, "member-1");
+        registerCandidates(node2, ENTITY_1, "member-2");
+
+        registerCandidates(node2, ENTITY_2, "member-2");
+        waitUntillOwnerPresent(node2, ENTITY_2);
+        registerCandidates(node1, ENTITY_2, "member-1");
+        // end up with node1 - member-1, node2 - member-2 owners
+        verifyListenerState(listener1, ENTITY_1, true, false, false);
+        verifyListenerState(listener2, ENTITY_2, true, true, false);
+
+        ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
+
+        verifyListenerState(listener1, ENTITY_1, true, true, false);
+        verifyListenerState(listener2, ENTITY_2, true, true, false);
+
+        startNode3(2);
+
+        final MockEntityOwnershipListener listener3 = registerListener(node3, ENTITY_2);
+        verifyListenerState(listener3, ENTITY_2, true, false, false);
+
+        node1 = startupRemote(2550, List.of("member-1"));
+
+        final Cluster cluster = Cluster.get(node2.getActorSystem());
+        await().atMost(Duration.ofSeconds(20)).until(() -> {
+            final List<Member> members = ImmutableList.copyOf(cluster.state().getMembers());
+            if (members.size() != 3) {
+                return false;
+            }
+
+            for (final Member member : members) {
+                if (!member.status().equals(MemberStatus.up())) {
+                    return false;
+                }
+            }
+
+            return true;
+        });
+
+        final MockEntityOwnershipListener node1Listener = registerListener(node1, ENTITY_1);
+        verifyListenerState(node1Listener, ENTITY_1, true, false, false);
+    }
+
+    private void startNode3() throws Exception {
+        startNode3(3);
+    }
+
+    private void startNode3(final int membersPresent) throws Exception {
+        node3 = startupRemote(2552, List.of("member-3"), THREE_NODE_SEED_NODES);
+
+        // need to wait until all nodes are ready
+        final Cluster cluster = Cluster.get(node2.getActorSystem());
+        await().atMost(Duration.ofSeconds(20)).until(() -> {
+            final List<Member> members = ImmutableList.copyOf(cluster.state().getMembers());
+            if (members.size() != membersPresent) {
+                return false;
+            }
+
+            for (final Member member : members) {
+                if (!member.status().equals(MemberStatus.up())) {
+                    return false;
+                }
+            }
+
+            return true;
+        });
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java
new file mode 100644 (file)
index 0000000..6174762
--- /dev/null
@@ -0,0 +1,193 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.typed.Cluster;
+import akka.cluster.typed.ClusterSingleton;
+import akka.cluster.typed.SingletonActor;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Test;
+import org.opendaylight.controller.eos.akka.AbstractNativeEosTest;
+import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
+import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
+import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.owner.checker.OwnerStateChecker;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.type.EntityTypeListenerRegistry;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class OwnerSupervisorTest extends AbstractNativeEosTest {
+
+    @Test
+    public void testCandidatePickingWhenUnreachableCandidates() throws Exception {
+
+        final ClusterNode node = startup(2550, Collections.singletonList("member-1"));
+        try {
+            reachableMember(node, "member-2");
+            reachableMember(node, "member-3");
+            registerCandidates(node, ENTITY_1, "member-1", "member-2", "member-3");
+
+            final MockEntityOwnershipListener listener = registerListener(node, ENTITY_1);
+            verifyListenerState(listener, ENTITY_1,true, true, false);
+
+            unreachableMember(node, "member-1");
+            verifyListenerState(listener, ENTITY_1, true, false, true);
+
+            unreachableMember(node, "member-2");
+            verifyListenerState(listener, ENTITY_1, true, false, false);
+
+            unreachableMember(node, "member-3");
+            verifyListenerState(listener, ENTITY_1, false, false, false);
+
+            reachableMember(node, "member-2");
+            verifyListenerState(listener, ENTITY_1, true, false, false);
+
+            // no notification here as member-2 is already the owner
+            reachableMember(node, "member-1");
+
+            unreachableMember(node, "member-2");
+            verifyListenerState(listener, ENTITY_1,true, true, false);
+        } finally {
+            ActorTestKit.shutdown(node.getActorSystem());
+        }
+    }
+
+    @Test
+    public void testSupervisorInitWithMissingOwners() throws Exception {
+        final Map<DOMEntity, Set<String>> candidates = new HashMap<>();
+        candidates.put(ENTITY_1, Set.of("member-1"));
+        candidates.put(ENTITY_2, Set.of("member-2"));
+
+        final ClusterNode node = startup(2550, Collections.singletonList("member-1"), Collections.emptyList(),
+                () -> mockedBootstrap(candidates, new HashMap<>()));
+
+        try {
+            waitUntillOwnerPresent(node, ENTITY_1);
+
+            // also do a proper register so the listener from the type lister actor are spawned
+            registerCandidates(node, ENTITY_1, "member-1");
+            registerCandidates(node, ENTITY_2, "member-2");
+
+            final MockEntityOwnershipListener listener1 = registerListener(node, ENTITY_1);
+            final MockEntityOwnershipListener listener2 = registerListener(node, ENTITY_2);
+
+            // first entity should have correctly assigned owner as its reachable
+            verifyListenerState(listener1, ENTITY_1, true, true, false);
+            // this one could not be assigned during init as we dont have member-2 thats reachable
+            verifyListenerState(listener2, ENTITY_2, false, false, false);
+
+            reachableMember(node, "member-2");
+            verifyListenerState(listener2, ENTITY_2, true, false, false);
+        } finally {
+            ActorTestKit.shutdown(node.getActorSystem());
+        }
+    }
+
+    private static Behavior<BootstrapCommand> mockedBootstrap(final Map<DOMEntity, Set<String>> currentCandidates,
+                                                              final Map<DOMEntity, String> currentOwners) {
+        return Behaviors.setup(context -> MockBootstrap.create(currentCandidates, currentOwners));
+    }
+
+    /**
+     * Initial behavior that skips initial sync and instead initializes OwnerSupervisor with provided values.
+     */
+    private static final class MockSyncer extends AbstractBehavior<OwnerSupervisorCommand> {
+
+        private final Map<DOMEntity, Set<String>> currentCandidates;
+        private final Map<DOMEntity, String> currentOwners;
+
+        private MockSyncer(final ActorContext<OwnerSupervisorCommand> context,
+                          final Map<DOMEntity, Set<String>> currentCandidates,
+                          final Map<DOMEntity, String> currentOwners) {
+            super(context);
+            this.currentCandidates = currentCandidates;
+            this.currentOwners = currentOwners;
+
+            context.getSelf().tell(new InitialCandidateSync(null));
+        }
+
+        public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
+                                                              final Map<DOMEntity, String> currentOwners) {
+            return Behaviors.setup(ctx -> new MockSyncer(ctx, currentCandidates, currentOwners));
+        }
+
+        @Override
+        public Receive<OwnerSupervisorCommand> createReceive() {
+            return newReceiveBuilder()
+                    .onMessage(InitialCandidateSync.class, this::switchToSupervisor)
+                    .build();
+        }
+
+        private Behavior<OwnerSupervisorCommand> switchToSupervisor(final InitialCandidateSync message) {
+            return OwnerSupervisor.create(currentCandidates, currentOwners);
+        }
+    }
+
+    /**
+     * Bootstrap with OwnerSyncer replaced with the testing syncer behavior.
+     */
+    private static final class MockBootstrap extends AbstractBehavior<BootstrapCommand> {
+
+        private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
+        private final ActorRef<CandidateRegistryCommand> candidateRegistry;
+        private final ActorRef<StateCheckerCommand> ownerStateChecker;
+        private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+
+        private MockBootstrap(final ActorContext<BootstrapCommand> context,
+                              final Map<DOMEntity, Set<String>> currentCandidates,
+                              final Map<DOMEntity, String> currentOwners) {
+            super(context);
+
+            final Cluster cluster = Cluster.get(context.getSystem());
+            final String role = cluster.selfMember().getRoles().iterator().next();
+
+            listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry");
+            candidateRegistry = context.spawn(CandidateRegistry.create(), "CandidateRegistry");
+            ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker");
+
+            final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
+            // start the initial sync behavior that switches to the regular one after syncing
+            ownerSupervisor = clusterSingleton.init(SingletonActor.of(
+                    MockSyncer.create(currentCandidates, currentOwners), "OwnerSupervisor"));
+        }
+
+        public static Behavior<BootstrapCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
+                                                        final Map<DOMEntity, String> currentOwners) {
+            return Behaviors.setup(ctx -> new MockBootstrap(ctx, currentCandidates, currentOwners));
+        }
+
+        @Override
+        public Receive<BootstrapCommand> createReceive() {
+            return newReceiveBuilder()
+                    .onMessage(GetRunningContext.class, this::onGetRunningContext)
+                    .build();
+        }
+
+        private Behavior<BootstrapCommand> onGetRunningContext(final GetRunningContext request) {
+            request.getReplyTo().tell(
+                    new RunningContext(listenerRegistry, candidateRegistry,ownerStateChecker, ownerSupervisor));
+            return this;
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/resources/application.conf b/opendaylight/md-sal/eos-dom-akka/src/test/resources/application.conf
new file mode 100644 (file)
index 0000000..71c983a
--- /dev/null
@@ -0,0 +1,25 @@
+akka {
+  loglevel = debug
+  actor {
+    warn-about-java-serializer-usage = off
+    allow-java-serialization = on
+    provider = cluster
+  }
+
+  remote {
+    artery {
+      enabled = on
+      canonical.hostname = "127.0.0.1"
+      canonical.port = 2550
+    }
+  }
+  cluster {
+    seed-nodes = [
+      "akka://ClusterSystem@127.0.0.1:2550"]
+    roles = [
+      "member-1"
+    ]
+    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
+  }
+}
+
diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties b/opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties
new file mode 100644 (file)
index 0000000..942eeee
--- /dev/null
@@ -0,0 +1,7 @@
+org.slf4j.simpleLogger.defaultLogLevel=info
+org.slf4j.simpleLogger.showDateTime=true
+org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
+org.slf4j.simpleLogger.logFile=System.out
+org.slf4j.simpleLogger.showShortLogName=true
+org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.opendaylight.controller.eos.akka=debug
index 976d74b..6b63828 100644 (file)
@@ -47,6 +47,7 @@
     <module>sal-cluster-admin-api</module>
     <module>sal-cluster-admin-impl</module>
     <module>sal-distributed-eos</module>
+    <module>eos-dom-akka</module>
 
     <!-- Yang Test Models for MD-SAL -->
     <module>sal-test-model</module>