6d418a80e7d9b873988b4e5f0a97acfe6ca279ee
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / checker / OwnerStateChecker.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.checker;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.typed.ActorRef;
13 import akka.actor.typed.Behavior;
14 import akka.actor.typed.javadsl.AbstractBehavior;
15 import akka.actor.typed.javadsl.ActorContext;
16 import akka.actor.typed.javadsl.Behaviors;
17 import akka.actor.typed.javadsl.Receive;
18 import akka.cluster.ddata.LWWRegister;
19 import akka.cluster.ddata.LWWRegisterKey;
20 import akka.cluster.ddata.typed.javadsl.DistributedData;
21 import akka.cluster.ddata.typed.javadsl.Replicator;
22 import akka.cluster.ddata.typed.javadsl.Replicator.Get;
23 import akka.cluster.ddata.typed.javadsl.Replicator.GetFailure;
24 import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
25 import akka.cluster.ddata.typed.javadsl.Replicator.GetSuccess;
26 import akka.cluster.ddata.typed.javadsl.Replicator.NotFound;
27 import akka.cluster.ddata.typed.javadsl.Replicator.ReadMajority;
28 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
29 import java.time.Duration;
30 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
31 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
32 import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
33 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
34 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
35 import org.opendaylight.controller.eos.akka.owner.checker.command.InternalGetReply;
36 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
37 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerRequest;
38 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
39 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
40 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 public final class OwnerStateChecker extends AbstractBehavior<StateCheckerCommand> {
45     private static final Logger LOG = LoggerFactory.getLogger(OwnerStateChecker.class);
46     private static final Duration GET_OWNERSHIP_TIMEOUT = Duration.ofSeconds(5);
47     private static final Duration UNEXPECTED_ASK_TIMEOUT = Duration.ofSeconds(5);
48
49     private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> ownerReplicator;
50     private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
51     private final BindingInstanceIdentifierCodec iidCodec;
52     private final ActorRef<Replicator.Command> replicator;
53     private final String localMember;
54
55     private OwnerStateChecker(final ActorContext<StateCheckerCommand> context,
56                               final String localMember,
57                               final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
58                               final BindingInstanceIdentifierCodec iidCodec) {
59         super(context);
60         this.localMember = requireNonNull(localMember);
61         this.ownerSupervisor = requireNonNull(ownerSupervisor);
62         this.iidCodec = requireNonNull(iidCodec);
63         replicator = DistributedData.get(context.getSystem()).replicator();
64         ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, UNEXPECTED_ASK_TIMEOUT);
65     }
66
67     public static Behavior<StateCheckerCommand> create(final String localMember,
68                                                        final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
69                                                        final BindingInstanceIdentifierCodec iidCodec) {
70         return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember, ownerSupervisor, iidCodec));
71     }
72
73     @Override
74     public Receive<StateCheckerCommand> createReceive() {
75         return newReceiveBuilder()
76                 .onMessage(GetOwnershipState.class, this::onGetOwnershipState)
77                 .onMessage(InternalGetReply.class, this::respondWithState)
78                 .onMessage(GetEntitiesRequest.class, this::executeEntityRpc)
79                 .onMessage(GetEntityRequest.class, this::executeEntityRpc)
80                 .onMessage(GetEntityOwnerRequest.class, this::executeEntityRpc)
81                 .build();
82     }
83
84     private Behavior<StateCheckerCommand> onGetOwnershipState(final GetOwnershipState message) {
85         ownerReplicator.askGet(
86                 askReplyTo -> new Get<>(
87                         new LWWRegisterKey<>(message.getEntity().toString()),
88                         new ReadMajority(GET_OWNERSHIP_TIMEOUT),
89                         askReplyTo),
90                 reply -> new InternalGetReply(reply, message.getEntity(), message.getReplyTo()));
91         return this;
92     }
93
94     private Behavior<StateCheckerCommand> respondWithState(final InternalGetReply reply) {
95         final GetResponse<LWWRegister<String>> response = reply.getResponse();
96         if (response instanceof NotFound) {
97             LOG.debug("Data for owner not found, most likely no owner has beed picked for entity: {}",
98                     reply.getEntity());
99             reply.getReplyTo().tell(new GetOwnershipStateReply(null));
100         } else if (response instanceof GetFailure) {
101             LOG.warn("Failure retrieving data for entity: {}", reply.getEntity());
102             reply.getReplyTo().tell(new GetOwnershipStateReply(null));
103         } else if (response instanceof GetSuccess) {
104             final String owner = ((GetSuccess<LWWRegister<String>>) response).get(response.key()).getValue();
105             LOG.debug("Data for owner received. {}, owner: {}", response, owner);
106
107             final boolean isOwner = localMember.equals(owner);
108             final boolean hasOwner = !owner.isEmpty();
109
110             reply.getReplyTo().tell(new GetOwnershipStateReply(EntityOwnershipState.from(isOwner, hasOwner)));
111         }
112         return this;
113     }
114
115     private Behavior<StateCheckerCommand> executeEntityRpc(final StateCheckerRequest request) {
116         final ActorRef<StateCheckerCommand> rpcHandler =
117                 getContext().spawnAnonymous(EntityRpcHandler.create(ownerSupervisor, iidCodec));
118
119         LOG.debug("Executing entity rpc: {} in actor: {}", request, rpcHandler);
120         rpcHandler.tell(request);
121         return this;
122     }
123 }