import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
+import akka.cluster.Cluster;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ORMap;
import akka.cluster.ddata.ORMapKey;
import akka.cluster.ddata.typed.javadsl.DistributedData;
import akka.cluster.ddata.typed.javadsl.Replicator;
import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.util.Set;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistry.class);
+ private static final String DATACENTER_PREFIX = "dc-";
+
public static final Key<ORMap<DOMEntity, ORSet<String>>> KEY = new ORMapKey<>("candidateRegistry");
private final ReplicatorMessageAdapter<CandidateRegistryCommand, ORMap<DOMEntity, ORSet<String>>> replicatorAdapter;
private final SelfUniqueAddress node;
+ private final String selfRole;
private CandidateRegistry(final ActorContext<CandidateRegistryCommand> context,
final ReplicatorMessageAdapter<CandidateRegistryCommand,
this.replicatorAdapter = replicatorAdapter;
this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
+ this.selfRole = extractRole(Cluster.get(context.getSystem()).selfMember().getRoles());
- LOG.debug("Candidate registry started");
+ LOG.debug("{} : Candidate registry started", selfRole);
}
public static Behavior<CandidateRegistryCommand> create() {
}
private Behavior<CandidateRegistryCommand> onRegisterCandidate(final RegisterCandidate registerCandidate) {
- LOG.debug("Registering candidate({}) for entity: {}",
+ LOG.debug("{} - Registering candidate({}) for entity: {}", selfRole,
registerCandidate.getCandidate(), registerCandidate.getEntity());
replicatorAdapter.askUpdate(
askReplyTo -> new Replicator.Update<>(
}
private Behavior<CandidateRegistryCommand> onUnregisterCandidate(final UnregisterCandidate unregisterCandidate) {
- LOG.debug("Removing candidate({}) from entity: {}",
+ LOG.debug("{} - Removing candidate({}) from entity: {}", selfRole,
unregisterCandidate.getCandidate(), unregisterCandidate.getEntity());
replicatorAdapter.askUpdate(
askReplyTo -> new Replicator.Update<>(
}
private Behavior<CandidateRegistryCommand> onInternalUpdateResponse(final InternalUpdateResponse updateResponse) {
- LOG.debug("Received update response: {}", updateResponse.getRsp());
+ LOG.debug("{} : Received update response: {}", selfRole, updateResponse.getRsp());
return this;
}
+
+ private static String extractRole(final Set<String> roles) {
+ return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
+ .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+ }
}