import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.BiPredicate;
import java.util.stream.Collectors;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
- private static final String DATACENTER_PREFIX = "dc";
+ private static final String DATACENTER_PREFIX = "dc-";
private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
private final Cluster cluster;
private final SelfUniqueAddress node;
+ private final String dataCenter;
private final Set<String> activeMembers;
// reverse lookup of owner to entity
private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
+ // only reassign owner for those entities that lost this candidate or is not reachable
+ private final BiPredicate<DOMEntity, String> reassignPredicate = (entity, candidate) ->
+ !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate);
+
private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
final Map<DOMEntity, Set<String>> currentCandidates,
final Map<DOMEntity, String> currentOwners) {
final DistributedData distributedData = DistributedData.get(context.getSystem());
final ActorRef<Replicator.Command> replicator = distributedData.replicator();
- this.cluster = Cluster.get(context.getSystem());
- this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+ cluster = Cluster.get(context.getSystem());
+ ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+ dataCenter = extractDatacenterRole(cluster.selfMember());
- this.node = distributedData.selfUniqueAddress();
- this.activeMembers = getActiveMembers(cluster);
+ node = distributedData.selfUniqueAddress();
+ activeMembers = getActiveMembers();
this.currentCandidates = currentCandidates;
this.currentOwners = currentOwners;
public Receive<OwnerSupervisorCommand> createReceive() {
return newReceiveBuilder()
.onMessage(CandidatesChanged.class, this::onCandidatesChanged)
+ .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
+ .onMessage(OwnerChanged.class, this::onOwnerChanged)
.onMessage(MemberUpEvent.class, this::onPeerUp)
.onMessage(MemberDownEvent.class, this::onPeerDown)
.onMessage(MemberReachableEvent.class, this::onPeerReachable)
.onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
+ .onMessage(GetEntitiesRequest.class, this::onGetEntities)
+ .onMessage(GetEntityRequest.class, this::onGetEntity)
+ .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
.build();
}
+ private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
+ LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
+ command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
+ return IdleSupervisor.create();
+ }
+
+ private Behavior<OwnerSupervisorCommand> onOwnerChanged(final OwnerChanged command) {
+ LOG.debug("Owner has changed for {}", command.getResponse().key());
+ return this;
+ }
+
private void reassignUnreachableOwners() {
final Set<String> ownersToReassign = new HashSet<>();
for (final String owner : ownerToEntity.keys()) {
}
for (final String owner : ownersToReassign) {
- reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)));
+ reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)), reassignPredicate);
}
}
// then reassign those that need new owners
for (final String toReassign : ownersToReassign) {
- reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)));
+ reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)),
+ reassignPredicate);
}
if (currentCandidates.get(entity) == null) {
}
}
- private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities) {
+ private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities,
+ final BiPredicate<DOMEntity, String> predicate) {
LOG.debug("Reassigning owners for {}", entities);
for (final DOMEntity entity : entities) {
-
- // only reassign owner for those entities that lost this candidate or is not reachable
- if (!activeMembers.contains(oldOwner)
- || !currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(oldOwner)) {
+ if (predicate.test(entity, oldOwner)) {
ownerToEntity.remove(oldOwner, entity);
assignOwnerFor(entity);
}
}
}
+ private boolean isActiveCandidate(final String candidate) {
+ return activeMembers.contains(candidate);
+ }
+
+ private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
+ return currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(candidate);
+ }
+
private void assignOwnerFor(final DOMEntity entity) {
final Set<String> candidatesForEntity = currentCandidates.get(entity);
if (candidatesForEntity.isEmpty()) {
return this;
}
+ private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesRequest request) {
+ request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates));
+ return this;
+ }
+
+ private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityRequest request) {
+ final DOMEntity entity = extractEntity(request);
+ request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity)));
+ return this;
+ }
+
+ private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
+ request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request))));
+ return this;
+ }
+
private void handleReachableEvent(final Set<String> roles) {
- activeMembers.add(extractRole(roles));
- assignMissingOwners();
+ if (roles.contains(dataCenter)) {
+ activeMembers.add(extractRole(roles));
+ assignMissingOwners();
+ } else {
+ LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
+ }
}
private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
}
private void handleUnreachableEvent(final Set<String> roles) {
- activeMembers.remove(extractRole(roles));
- reassignUnreachableOwners();
+ if (roles.contains(dataCenter)) {
+ activeMembers.remove(extractRole(roles));
+ reassignUnreachableOwners();
+ } else {
+ LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
+ }
}
- private static Set<String> getActiveMembers(final Cluster cluster) {
- final Set<String> activeMembers = new HashSet<>();
- cluster.state().getMembers().forEach(member -> activeMembers.add(extractRole(member)));
- activeMembers.removeAll(cluster.state().getUnreachable().stream()
- .map(OwnerSupervisor::extractRole).collect(Collectors.toSet()));
+ private Set<String> getActiveMembers() {
+ final Set<String> members = new HashSet<>();
+ cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
+ // filter out unreachable
+ members.removeAll(cluster.state().getUnreachable().stream()
+ .map(OwnerSupervisor::extractRole)
+ .collect(Collectors.toSet()));
+
+ // filter out members not from our datacenter
+ cluster.state().getMembers().forEach(member -> {
+ if (!member.roles().contains(dataCenter)) {
+ members.remove(extractRole(member));
+ }
+ });
+
+ return members;
+ }
- return activeMembers;
+ private static DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
+ return new DOMEntity(request.getType().getValue(), request.getName().getValue());
}
private static String extractRole(final Member member) {
}
private static String extractRole(final Set<String> roles) {
- return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
+ return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
+ .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+ }
+
+ private String extractDatacenterRole(final Member member) {
+ return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
.findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
}
}