import java.util.Set;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
return newReceiveBuilder()
.onMessage(CandidatesChanged.class, this::onCandidatesChanged)
.onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
+ .onMessage(OwnerChanged.class, this::onOwnerChanged)
.onMessage(MemberUpEvent.class, this::onPeerUp)
.onMessage(MemberDownEvent.class, this::onPeerDown)
.onMessage(MemberReachableEvent.class, this::onPeerReachable)
.onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
+ .onMessage(GetEntitiesRequest.class, this::onGetEntities)
+ .onMessage(GetEntityRequest.class, this::onGetEntity)
+ .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
.build();
}
return IdleSupervisor.create();
}
+ private Behavior<OwnerSupervisorCommand> onOwnerChanged(final OwnerChanged command) {
+ LOG.debug("Owner has changed for {}", command.getResponse().key());
+ return this;
+ }
+
private void reassignUnreachableOwners() {
final Set<String> ownersToReassign = new HashSet<>();
for (final String owner : ownerToEntity.keys()) {
final BiPredicate<DOMEntity, String> predicate) {
LOG.debug("Reassigning owners for {}", entities);
for (final DOMEntity entity : entities) {
-
-
if (predicate.test(entity, oldOwner)) {
ownerToEntity.remove(oldOwner, entity);
assignOwnerFor(entity);
return this;
}
+ private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesRequest request) {
+ request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates));
+ return this;
+ }
+
+ private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityRequest request) {
+ final DOMEntity entity = extractEntity(request);
+ request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity)));
+ return this;
+ }
+
+ private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
+ request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request))));
+ return this;
+ }
+
private void handleReachableEvent(final Set<String> roles) {
if (roles.contains(dataCenter)) {
activeMembers.add(extractRole(roles));
return members;
}
+ private static DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
+ return new DOMEntity(request.getType().getValue(), request.getName().getValue());
+ }
+
private static String extractRole(final Member member) {
return extractRole(member.getRoles());
}