import java.util.Set;
import java.util.stream.Collectors;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
- private static final String DATACENTER_PREFIX = "dc";
+ private static final String DATACENTER_PREFIX = "dc-";
private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
private final Cluster cluster;
private final SelfUniqueAddress node;
+ private final String dataCenter;
private final Set<String> activeMembers;
final DistributedData distributedData = DistributedData.get(context.getSystem());
final ActorRef<Replicator.Command> replicator = distributedData.replicator();
- this.cluster = Cluster.get(context.getSystem());
- this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+ cluster = Cluster.get(context.getSystem());
+ ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+ dataCenter = extractDatacenterRole(cluster.selfMember());
- this.node = distributedData.selfUniqueAddress();
- this.activeMembers = getActiveMembers(cluster);
+ node = distributedData.selfUniqueAddress();
+ activeMembers = getActiveMembers();
this.currentCandidates = currentCandidates;
this.currentOwners = currentOwners;
public Receive<OwnerSupervisorCommand> createReceive() {
return newReceiveBuilder()
.onMessage(CandidatesChanged.class, this::onCandidatesChanged)
+ .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
.onMessage(MemberUpEvent.class, this::onPeerUp)
.onMessage(MemberDownEvent.class, this::onPeerDown)
.onMessage(MemberReachableEvent.class, this::onPeerReachable)
.build();
}
+ private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
+ LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
+ return IdleSupervisor.create();
+ }
+
private void reassignUnreachableOwners() {
final Set<String> ownersToReassign = new HashSet<>();
for (final String owner : ownerToEntity.keys()) {
}
private void handleReachableEvent(final Set<String> roles) {
- activeMembers.add(extractRole(roles));
- assignMissingOwners();
+ if (roles.contains(dataCenter)) {
+ activeMembers.add(extractRole(roles));
+ assignMissingOwners();
+ } else {
+ LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
+ }
}
private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
}
private void handleUnreachableEvent(final Set<String> roles) {
- activeMembers.remove(extractRole(roles));
- reassignUnreachableOwners();
+ if (roles.contains(dataCenter)) {
+ activeMembers.remove(extractRole(roles));
+ reassignUnreachableOwners();
+ } else {
+ LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
+ }
}
- private static Set<String> getActiveMembers(final Cluster cluster) {
- final Set<String> activeMembers = new HashSet<>();
- cluster.state().getMembers().forEach(member -> activeMembers.add(extractRole(member)));
- activeMembers.removeAll(cluster.state().getUnreachable().stream()
- .map(OwnerSupervisor::extractRole).collect(Collectors.toSet()));
+ private Set<String> getActiveMembers() {
+ final Set<String> members = new HashSet<>();
+ cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
+ // filter out unreachable
+ members.removeAll(cluster.state().getUnreachable().stream()
+ .map(OwnerSupervisor::extractRole)
+ .collect(Collectors.toSet()));
+
+ // filter out members not from our datacenter
+ cluster.state().getMembers().forEach(member -> {
+ if (!member.roles().contains(dataCenter)) {
+ members.remove(extractRole(member));
+ }
+ });
- return activeMembers;
+ return members;
}
private static String extractRole(final Member member) {
}
private static String extractRole(final Set<String> roles) {
- return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
+ return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
+ .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+ }
+
+ private String extractDatacenterRole(final Member member) {
+ return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
.findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
}
}