import static java.util.Objects.requireNonNull;
+import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.cluster.ddata.LWWRegister;
import akka.cluster.ddata.LWWRegisterKey;
import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
import akka.cluster.ddata.typed.javadsl.Replicator.Get;
import akka.cluster.ddata.typed.javadsl.Replicator.GetFailure;
import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
import akka.cluster.ddata.typed.javadsl.Replicator.ReadMajority;
import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
import java.time.Duration;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
import org.opendaylight.controller.eos.akka.owner.checker.command.InternalGetReply;
import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Duration GET_OWNERSHIP_TIMEOUT = Duration.ofSeconds(5);
private static final Duration UNEXPECTED_ASK_TIMEOUT = Duration.ofSeconds(5);
- private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> replicatorAdapter;
+ private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> ownerReplicator;
+ private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+ private final BindingInstanceIdentifierCodec iidCodec;
+ private final ActorRef<Replicator.Command> replicator;
private final String localMember;
- private OwnerStateChecker(final ActorContext<StateCheckerCommand> context, final String localMember) {
+ private OwnerStateChecker(final ActorContext<StateCheckerCommand> context,
+ final String localMember,
+ final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+ final BindingInstanceIdentifierCodec iidCodec) {
super(context);
this.localMember = requireNonNull(localMember);
- replicatorAdapter = new ReplicatorMessageAdapter<>(context,
- DistributedData.get(context.getSystem()).replicator(), UNEXPECTED_ASK_TIMEOUT);
+ this.ownerSupervisor = requireNonNull(ownerSupervisor);
+ this.iidCodec = requireNonNull(iidCodec);
+ replicator = DistributedData.get(context.getSystem()).replicator();
+ ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, UNEXPECTED_ASK_TIMEOUT);
}
- public static Behavior<StateCheckerCommand> create(final String localMember) {
- return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember));
+ public static Behavior<StateCheckerCommand> create(final String localMember,
+ final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+ final BindingInstanceIdentifierCodec iidCodec) {
+ return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember, ownerSupervisor, iidCodec));
}
@Override
return newReceiveBuilder()
.onMessage(GetOwnershipState.class, this::onGetOwnershipState)
.onMessage(InternalGetReply.class, this::respondWithState)
+ .onMessage(GetEntitiesRequest.class, this::executeEntityRpc)
+ .onMessage(GetEntityRequest.class, this::executeEntityRpc)
+ .onMessage(GetEntityOwnerRequest.class, this::executeEntityRpc)
.build();
}
private Behavior<StateCheckerCommand> onGetOwnershipState(final GetOwnershipState message) {
- replicatorAdapter.askGet(
+ ownerReplicator.askGet(
askReplyTo -> new Get<>(
new LWWRegisterKey<>(message.getEntity().toString()),
new ReadMajority(GET_OWNERSHIP_TIMEOUT),
}
return this;
}
+
+ private Behavior<StateCheckerCommand> executeEntityRpc(final StateCheckerRequest request) {
+ final ActorRef<StateCheckerCommand> rpcHandler =
+ getContext().spawnAnonymous(EntityRpcHandler.create(ownerSupervisor, iidCodec));
+
+ LOG.debug("Executing entity rpc: {} in actor: {}", request, rpcHandler);
+ rpcHandler.tell(request);
+ return this;
+ }
}