2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.owner.checker;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.typed.Behavior;
13 import akka.actor.typed.javadsl.AbstractBehavior;
14 import akka.actor.typed.javadsl.ActorContext;
15 import akka.actor.typed.javadsl.Behaviors;
16 import akka.actor.typed.javadsl.Receive;
17 import akka.cluster.ddata.LWWRegister;
18 import akka.cluster.ddata.LWWRegisterKey;
19 import akka.cluster.ddata.typed.javadsl.DistributedData;
20 import akka.cluster.ddata.typed.javadsl.Replicator.Get;
21 import akka.cluster.ddata.typed.javadsl.Replicator.GetFailure;
22 import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
23 import akka.cluster.ddata.typed.javadsl.Replicator.GetSuccess;
24 import akka.cluster.ddata.typed.javadsl.Replicator.NotFound;
25 import akka.cluster.ddata.typed.javadsl.Replicator.ReadMajority;
26 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
27 import java.time.Duration;
28 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
29 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
30 import org.opendaylight.controller.eos.akka.owner.checker.command.InternalGetReply;
31 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
32 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 public final class OwnerStateChecker extends AbstractBehavior<StateCheckerCommand> {
37 private static final Logger LOG = LoggerFactory.getLogger(OwnerStateChecker.class);
38 private static final Duration GET_OWNERSHIP_TIMEOUT = Duration.ofSeconds(5);
39 private static final Duration UNEXPECTED_ASK_TIMEOUT = Duration.ofSeconds(5);
41 private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> replicatorAdapter;
42 private final String localMember;
44 private OwnerStateChecker(final ActorContext<StateCheckerCommand> context, final String localMember) {
46 this.localMember = requireNonNull(localMember);
47 replicatorAdapter = new ReplicatorMessageAdapter<>(context,
48 DistributedData.get(context.getSystem()).replicator(), UNEXPECTED_ASK_TIMEOUT);
51 public static Behavior<StateCheckerCommand> create(final String localMember) {
52 return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember));
56 public Receive<StateCheckerCommand> createReceive() {
57 return newReceiveBuilder()
58 .onMessage(GetOwnershipState.class, this::onGetOwnershipState)
59 .onMessage(InternalGetReply.class, this::respondWithState)
63 private Behavior<StateCheckerCommand> onGetOwnershipState(final GetOwnershipState message) {
64 replicatorAdapter.askGet(
65 askReplyTo -> new Get<>(
66 new LWWRegisterKey<>(message.getEntity().toString()),
67 new ReadMajority(GET_OWNERSHIP_TIMEOUT),
69 reply -> new InternalGetReply(reply, message.getEntity(), message.getReplyTo()));
73 private Behavior<StateCheckerCommand> respondWithState(final InternalGetReply reply) {
74 final GetResponse<LWWRegister<String>> response = reply.getResponse();
75 if (response instanceof NotFound) {
76 LOG.debug("Data for owner not found, most likely no owner has beed picked for entity: {}",
78 reply.getReplyTo().tell(new GetOwnershipStateReply(null));
79 } else if (response instanceof GetFailure) {
80 LOG.warn("Failure retrieving data for entity: {}", reply.getEntity());
81 reply.getReplyTo().tell(new GetOwnershipStateReply(null));
82 } else if (response instanceof GetSuccess) {
83 final String owner = ((GetSuccess<LWWRegister<String>>) response).get(response.key()).getValue();
84 LOG.debug("Data for owner received. {}, owner: {}", response, owner);
86 final boolean isOwner = localMember.equals(owner);
87 final boolean hasOwner = !owner.isEmpty();
89 reply.getReplyTo().tell(new GetOwnershipStateReply(EntityOwnershipState.from(isOwner, hasOwner)));