class EntityOwnershipShard extends Shard {
private final String localMemberName;
private final EntityOwnershipShardCommitCoordinator commitCoordinator;
+ private final EntityOwnershipListenerSupport listenerSupport;
private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
this.localMemberName = localMemberName;
this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
+ this.listenerSupport = new EntityOwnershipListenerSupport(getContext());
}
@Override
protected void onRecoveryComplete() {
super.onRecoveryComplete();
- new CandidateListChangeListener(getSelf(), getDataStore());
+ new CandidateListChangeListener(getSelf()).init(getDataStore());
+ new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
}
@Override
private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
- // TODO - add the listener locally.
+ listenerSupport.addEntityOwnershipListener(registerCandidate.getEntity(), registerCandidate.getCandidate());
NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
registerCandidate.getEntity().getId(), localMemberName);