private static final long serialVersionUID = 1L;
private final Entity entity;
- private final EntityOwnershipCandidate registeredCandidate;
- public CandidateAlreadyRegisteredException(@Nonnull Entity entity,
- @Nonnull EntityOwnershipCandidate registeredCandidate) {
- super(String.format("Candidate %s has already been registered for %s",
- Preconditions.checkNotNull(registeredCandidate, "registeredCandidate should not be null"),
+ public CandidateAlreadyRegisteredException(@Nonnull Entity entity) {
+ super(String.format("Candidate has already been registered for %s",
Preconditions.checkNotNull(entity, "entity should not be null")));
this.entity = entity;
- this.registeredCandidate = registeredCandidate;
}
/**
public Entity getEntity() {
return entity;
}
-
- /**
- *
- * @return the currently registered candidate
- */
- @Nonnull
- public EntityOwnershipCandidate getRegisteredCandidate() {
- return registeredCandidate;
- }
}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.md.sal.common.api.clustering;
-
-/**
- * <p>
- * An EntityOwnershipCandidate represents a component which would like to own a given Entity.
- * The EntityOwnershipCandidate will be notified of changes in ownership as it is also an EntityOwnershipListener.
- * </p>
- */
-public interface EntityOwnershipCandidate extends EntityOwnershipListener {
-}
package org.opendaylight.controller.md.sal.common.api.clustering;
-import javax.annotation.Nonnull;
import org.opendaylight.yangtools.concepts.ObjectRegistration;
/**
* An EntityOwnershipCandidateRegistration records a request to register a Candidate for a given Entity. Calling
* close on the EntityOwnershipCandidateRegistration will remove the Candidate from any future ownership considerations
- * for that Entity and will also remove it as a Listener for ownership status changes.
+ * for that Entity.
*/
-public interface EntityOwnershipCandidateRegistration extends ObjectRegistration<EntityOwnershipCandidate> {
+public interface EntityOwnershipCandidateRegistration extends ObjectRegistration<Entity> {
/**
- * Returns the entity that the listener was registered for
- */
- @Nonnull Entity getEntity();
-
- /**
- * Unregister the listener
+ * Unregister the candidate
*/
@Override
void close();
* per process. If multiple requests for registering a candidate for a given entity are received in the
* current process a CandidateAlreadyRegisteredException will be thrown.
* <p>
- * The registration is performed asynchronously and the {@link EntityOwnershipCandidate} instance is
- * notified whenever its process instance is granted ownership of the entity and also whenever it loses
- * ownership. Note that the {@link EntityOwnershipCandidate} is not notified when another process instance
- * is granted ownership.
+ * The registration is performed asynchronously and any registered {@link EntityOwnershipListener} is
+ * notified of ownership status changes for the entity.
*
* @param entity the entity which the Candidate wants to own
- * @param candidate the Candidate that wants to own the entity
* @return a registration object that can be used to unregister the Candidate
* @throws org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException
*/
- EntityOwnershipCandidateRegistration registerCandidate(@Nonnull Entity entity, @Nonnull EntityOwnershipCandidate candidate)
+ EntityOwnershipCandidateRegistration registerCandidate(@Nonnull Entity entity)
throws CandidateAlreadyRegisteredException;
/**
import com.google.common.base.Preconditions;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
*
* @author Thomas Pantelis
*/
-public abstract class AbstractEntityOwnershipCandidateRegistration extends AbstractObjectRegistration<EntityOwnershipCandidate>
+public abstract class AbstractEntityOwnershipCandidateRegistration extends AbstractObjectRegistration<Entity>
implements EntityOwnershipCandidateRegistration {
- private final Entity entity;
- protected AbstractEntityOwnershipCandidateRegistration(@Nonnull EntityOwnershipCandidate candidate,
- @Nonnull Entity entity) {
- super(candidate);
- this.entity = Preconditions.checkNotNull(entity, "entity cannot be null");
- }
-
- @Override
- public Entity getEntity() {
- return entity;
+ protected AbstractEntityOwnershipCandidateRegistration(@Nonnull Entity entity) {
+ super(Preconditions.checkNotNull(entity, "entity cannot be null"));
}
}
package org.opendaylight.controller.cluster.datastore.entityownership;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.impl.clustering.AbstractEntityOwnershipCandidateRegistration;
/**
class DistributedEntityOwnershipCandidateRegistration extends AbstractEntityOwnershipCandidateRegistration {
private final DistributedEntityOwnershipService service;
- DistributedEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity,
- DistributedEntityOwnershipService service) {
- super(candidate, entity);
+ DistributedEntityOwnershipCandidateRegistration(Entity entity, DistributedEntityOwnershipService service) {
+ super(entity);
this.service = service;
}
@Override
protected void removeRegistration() {
- service.unregisterCandidate(getEntity(), getInstance());
+ service.unregisterCandidate(getInstance());
}
@Override
public String toString() {
- return "DistributedEntityOwnershipCandidateRegistration [entity=" + getEntity() + ", candidate="
- + getInstance() + "]";
+ return "DistributedEntityOwnershipCandidateRegistration [entity=" + getInstance() + "]";
}
import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
private static final Timeout MESSAGE_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
private final DistributedDataStore datastore;
- private final ConcurrentMap<Entity, EntityOwnershipCandidate> registeredEntities = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Entity, Entity> registeredEntities = new ConcurrentHashMap<>();
private volatile ActorRef localEntityOwnershipShard;
public DistributedEntityOwnershipService(DistributedDataStore datastore) {
}
@Override
- public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
+ public EntityOwnershipCandidateRegistration registerCandidate(Entity entity)
throws CandidateAlreadyRegisteredException {
Preconditions.checkNotNull(entity, "entity cannot be null");
- Preconditions.checkNotNull(candidate, "candidate cannot be null");
- EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity, candidate);
- if(currentCandidate != null) {
- throw new CandidateAlreadyRegisteredException(entity, currentCandidate);
+ if(registeredEntities.putIfAbsent(entity, entity) != null) {
+ throw new CandidateAlreadyRegisteredException(entity);
}
- RegisterCandidateLocal registerCandidate = new RegisterCandidateLocal(candidate, entity);
+ RegisterCandidateLocal registerCandidate = new RegisterCandidateLocal(entity);
LOG.debug("Registering candidate with message: {}", registerCandidate);
executeLocalEntityOwnershipShardOperation(registerCandidate);
- return new DistributedEntityOwnershipCandidateRegistration(candidate, entity, this);
+ return new DistributedEntityOwnershipCandidateRegistration(entity, this);
}
- void unregisterCandidate(Entity entity, EntityOwnershipCandidate entityOwnershipCandidate) {
- LOG.debug("Unregistering candidate {} for {}", entityOwnershipCandidate, entity);
+ void unregisterCandidate(Entity entity) {
+ LOG.debug("Unregistering candidate for {}", entity);
- executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entityOwnershipCandidate, entity));
+ executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entity));
registeredEntities.remove(entity);
}
import com.google.common.collect.Multimap;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
+import java.util.Set;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
import org.slf4j.Logger;
private final String logId;
private final ActorContext actorContext;
private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
- private final Multimap<Entity, EntityOwnershipListener> entityListenerMap = HashMultimap.create();
+ private final Set<Entity> entitiesWithCandidateSet = new HashSet<>();
private final Multimap<String, EntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();
EntityOwnershipListenerSupport(ActorContext actorContext, String logId) {
}
boolean hasCandidateForEntity(Entity entity) {
- for(EntityOwnershipListener listener: entityListenerMap.get(entity)) {
- if(listener instanceof EntityOwnershipCandidate) {
- return true;
- }
- }
-
- return false;
+ return entitiesWithCandidateSet.contains(entity);
}
- void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
- LOG.debug("{}: Adding EntityOwnershipListener {} for {}", logId, listener, entity);
+ void setHasCandidateForEntity(Entity entity) {
+ entitiesWithCandidateSet.add(entity);
+ }
- addListener(listener, entity, entityListenerMap);
+ void unsetHasCandidateForEntity(Entity entity) {
+ entitiesWithCandidateSet.remove(entity);
}
void addEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
addListener(listener, entityType, entityTypeListenerMap);
}
- void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
- LOG.debug("{}: Removing EntityOwnershipListener {} for {}", logId, listener, entity);
-
- removeListener(listener, entity, entityListenerMap);
- }
-
void removeEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
LOG.debug("{}: Removing EntityOwnershipListener {} for entity type {}", logId, listener, entityType);
}
void notifyEntityOwnershipListeners(Entity entity, boolean wasOwner, boolean isOwner, boolean hasOwner) {
- notifyListeners(entity, entity, wasOwner, isOwner, hasOwner, entityListenerMap);
notifyListeners(entity, entity.getType(), wasOwner, isOwner, hasOwner, entityTypeListenerMap);
}
private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate);
- listenerSupport.addEntityOwnershipListener(registerCandidate.getEntity(), registerCandidate.getCandidate());
+ listenerSupport.setHasCandidateForEntity(registerCandidate.getEntity());
NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
registerCandidate.getEntity().getId(), localMemberName);
LOG.debug("{}: onUnregisterCandidateLocal: {}", persistenceId(), unregisterCandidate);
Entity entity = unregisterCandidate.getEntity();
- listenerSupport.removeEntityOwnershipListener(entity, unregisterCandidate.getCandidate());
+ listenerSupport.unsetHasCandidateForEntity(entity);
YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getId(), localMemberName);
commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
package org.opendaylight.controller.cluster.datastore.entityownership.messages;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
/**
* Message sent to the local EntityOwnershipShard to register a candidate.
* @author Thomas Pantelis
*/
public class RegisterCandidateLocal {
- private final EntityOwnershipCandidate candidate;
private final Entity entity;
- public RegisterCandidateLocal(EntityOwnershipCandidate candidate, Entity entity) {
- this.candidate = candidate;
+ public RegisterCandidateLocal(Entity entity) {
this.entity = entity;
}
- public EntityOwnershipCandidate getCandidate() {
- return candidate;
- }
-
public Entity getEntity() {
return entity;
}
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("RegisterCandidateLocal [entity=").append(entity).append(", candidate=").append(candidate)
- .append("]");
- return builder.toString();
+ return "RegisterCandidateLocal [entity=" + entity + "]";
}
}
package org.opendaylight.controller.cluster.datastore.entityownership.messages;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
/**
* Message sent to the local EntityOwnershipShard to unregister a candidate.
* @author Thomas Pantelis
*/
public class UnregisterCandidateLocal {
- private final EntityOwnershipCandidate candidate;
private final Entity entity;
- public UnregisterCandidateLocal(EntityOwnershipCandidate candidate, Entity entity) {
- this.candidate = candidate;
+ public UnregisterCandidateLocal(Entity entity) {
this.entity = entity;
}
- public EntityOwnershipCandidate getCandidate() {
- return candidate;
- }
-
public Entity getEntity() {
return entity;
}
@Override
public String toString() {
- return "UnregisterCandidateLocal [entity=" + entity + ", candidate=" + candidate + "]";
+ return "UnregisterCandidateLocal [entity=" + entity + "]";
}
}
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
private IntegrationTestKit follower2TestKit;
@Mock
- private EntityOwnershipCandidate leaderMockCandidate;
+ private EntityOwnershipListener leaderMockListener;
@Mock
- private EntityOwnershipCandidate follower1MockCandidate;
+ private EntityOwnershipListener leaderMockListener2;
@Mock
- private EntityOwnershipCandidate follower2MockCandidate;
+ private EntityOwnershipListener follower1MockListener;
@Mock
- private EntityOwnershipCandidate leaderMockListener;
-
- @Mock
- private EntityOwnershipCandidate leaderMockListener2;
-
- @Mock
- private EntityOwnershipCandidate follower1MockListener;
-
- @Mock
- private EntityOwnershipCandidate follower2MockListener;
+ private EntityOwnershipListener follower2MockListener;
@Before
public void setUp() {
// Register leader candidate for entity1 and verify it becomes owner
- leaderEntityOwnershipService.registerCandidate(ENTITY1, leaderMockCandidate);
+ leaderEntityOwnershipService.registerCandidate(ENTITY1);
verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, true, true));
verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
reset(leaderMockListener, follower1MockListener);
// Register leader candidate for entity1_2 (same id, different type) and verify it becomes owner
- leaderEntityOwnershipService.registerCandidate(ENTITY1_2, leaderMockCandidate);
+ leaderEntityOwnershipService.registerCandidate(ENTITY1_2);
verify(leaderMockListener2, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1_2, false, true, true));
verify(leaderMockListener, timeout(300).never()).ownershipChanged(ownershipChange(ENTITY1_2));
reset(leaderMockListener2);
// Register follower1 candidate for entity1 and verify it gets added but doesn't become owner
- follower1EntityOwnershipService.registerCandidate(ENTITY1, follower1MockCandidate);
+ follower1EntityOwnershipService.registerCandidate(ENTITY1);
verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-1", "member-2");
verifyOwner(leaderDistributedDataStore, ENTITY1, "member-1");
verify(leaderMockListener, timeout(300).never()).ownershipChanged(ownershipChange(ENTITY1));
// Register follower1 candidate for entity2 and verify it becomes owner
- follower1EntityOwnershipService.registerCandidate(ENTITY2, follower1MockCandidate);
- verify(follower1MockCandidate, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
+ follower1EntityOwnershipService.registerCandidate(ENTITY2);
+ verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, true));
reset(leaderMockListener, follower1MockListener);
// Register follower2 candidate for entity2 and verify it gets added but doesn't become owner
follower2EntityOwnershipService.registerListener(ENTITY_TYPE1, follower2MockListener);
- follower2EntityOwnershipService.registerCandidate(ENTITY2, follower2MockCandidate);
+ follower2EntityOwnershipService.registerCandidate(ENTITY2);
verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-2", "member-3");
verifyOwner(leaderDistributedDataStore, ENTITY2, "member-2");
// Unregister follower1 candidate for entity2 and verify follower2 becomes owner
- follower1EntityOwnershipService.unregisterCandidate(ENTITY2, follower1MockCandidate);
+ follower1EntityOwnershipService.unregisterCandidate(ENTITY2);
verifyOwner(leaderDistributedDataStore, ENTITY2, "member-3");
verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, true));
// Register follower1 candidate for entity3 and verify it becomes owner
- follower1EntityOwnershipService.registerCandidate(ENTITY3, follower1MockCandidate);
+ follower1EntityOwnershipService.registerCandidate(ENTITY3);
verifyOwner(leaderDistributedDataStore, ENTITY3, "member-2");
verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY3, false, true, true));
verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY3, false, false, true));
// Register follower2 candidate for entity4 and verify it becomes owner
- follower2EntityOwnershipService.registerCandidate(ENTITY4, follower2MockCandidate);
+ follower2EntityOwnershipService.registerCandidate(ENTITY4);
verifyOwner(leaderDistributedDataStore, ENTITY4, "member-3");
verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, true, true));
verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, false, true));
verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, false, true));
- reset(follower2MockListener);
+ reset(follower1MockListener, follower2MockListener);
// Register follower1 candidate for entity4 and verify it gets added but doesn't become owner
- reset(follower1MockCandidate);
- follower1EntityOwnershipService.registerCandidate(ENTITY4, follower1MockCandidate);
+ follower1EntityOwnershipService.registerCandidate(ENTITY4);
verifyCandidates(leaderDistributedDataStore, ENTITY4, "member-3", "member-2");
verifyOwner(leaderDistributedDataStore, ENTITY4, "member-3");
// Register leader candidate for entity2 and verify it becomes owner
- leaderEntityOwnershipService.registerCandidate(ENTITY2, leaderMockCandidate);
+ leaderEntityOwnershipService.registerCandidate(ENTITY2);
verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
// Unregister leader candidate for entity2 and verify the owner is cleared
- leaderEntityOwnershipService.unregisterCandidate(ENTITY2, leaderMockCandidate);
+ leaderEntityOwnershipService.unregisterCandidate(ENTITY2);
verifyOwner(leaderDistributedDataStore, ENTITY2, "");
verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, false));
verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, false));
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
Entity entity = new Entity(ENTITY_TYPE, entityId);
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
- EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
+ EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity);
verifyEntityOwnershipCandidateRegistration(entity, reg);
- verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
+ verifyRegisterCandidateLocal(shardPropsCreator, entity);
verifyEntityCandidate(service.getLocalEntityOwnershipShard(), ENTITY_TYPE, entityId,
dataStore.getActorContext().getCurrentMemberName());
// Register the same entity - should throw exception
- EntityOwnershipCandidate candidate2 = mock(EntityOwnershipCandidate.class);
try {
- service.registerCandidate(entity, candidate2);
+ service.registerCandidate(entity);
fail("Expected CandidateAlreadyRegisteredException");
} catch(CandidateAlreadyRegisteredException e) {
// expected
- assertSame("getCandidate", candidate, e.getRegisteredCandidate());
assertEquals("getEntity", entity, e.getEntity());
}
Entity entity2 = new Entity(ENTITY_TYPE2, entityId);
shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
- EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate);
+ EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2);
verifyEntityOwnershipCandidateRegistration(entity2, reg2);
- verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
+ verifyRegisterCandidateLocal(shardPropsCreator, entity2);
verifyEntityCandidate(service.getLocalEntityOwnershipShard(), ENTITY_TYPE2, entityId,
dataStore.getActorContext().getCurrentMemberName());
shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
- EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
+ EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity);
verifyEntityOwnershipCandidateRegistration(entity, reg);
- verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
+ verifyRegisterCandidateLocal(shardPropsCreator, entity);
shardPropsCreator.expectShardMessage(UnregisterCandidateLocal.class);
UnregisterCandidateLocal unregCandidate = shardPropsCreator.waitForShardMessage();
assertEquals("getEntity", entity, unregCandidate.getEntity());
- assertSame("getCandidate", candidate, unregCandidate.getCandidate());
// Re-register - should succeed.
shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
- service.registerCandidate(entity, candidate);
+ service.registerCandidate(entity);
- verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
+ verifyRegisterCandidateLocal(shardPropsCreator, entity);
service.close();
}
});
}
- private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
- EntityOwnershipCandidate candidate) {
+ private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity) {
RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
- assertSame("getCandidate", candidate, regCandidate.getCandidate());
assertEquals("getEntity", entity, regCandidate.getEntity());
}
private void verifyEntityOwnershipCandidateRegistration(Entity entity, EntityOwnershipCandidateRegistration reg) {
assertNotNull("EntityOwnershipCandidateRegistration null", reg);
- assertEquals("getEntity", entity, reg.getEntity());
+ assertEquals("getInstance", entity, reg.getInstance());
}
static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
package org.opendaylight.controller.cluster.datastore.entityownership;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
EntityOwnershipListener mockListener1 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1");
EntityOwnershipListener mockListener2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener2");
- EntityOwnershipListener mockListener3 = mock(EntityOwnershipListener.class, "EntityOwnershipListener3");
- Entity entity1 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id1")));
- Entity entity2 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id2")));
- Entity entity3 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id3")));
- Entity entity4 = new Entity("type2", YangInstanceIdentifier.of(QName.create("test", "id4")));
- Entity entity5 = new Entity("noListener", YangInstanceIdentifier.of(QName.create("test", "id5")));
+ EntityOwnershipListener mockListener1_2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1_2");
+ String entityType1 = "type1";
+ String entityType2 = "type2";
+ Entity entity1 = new Entity(entityType1, YangInstanceIdentifier.of(QName.create("test", "id1")));
+ Entity entity2 = new Entity(entityType2, YangInstanceIdentifier.of(QName.create("test", "id2")));
+ Entity entity3 = new Entity("noListener", YangInstanceIdentifier.of(QName.create("test", "id5")));
// Add EntityOwnershipListener registrations.
- support.addEntityOwnershipListener(entity1, mockListener1);
- support.addEntityOwnershipListener(entity1, mockListener1); // register again - should be noop
- support.addEntityOwnershipListener(entity2, mockListener1);
- support.addEntityOwnershipListener(entity1, mockListener2);
- support.addEntityOwnershipListener(entity1.getType(), mockListener3);
+ support.addEntityOwnershipListener(entityType1, mockListener1);
+ support.addEntityOwnershipListener(entityType1, mockListener1); // register again - should be noop
+ support.addEntityOwnershipListener(entityType1, mockListener1_2);
+ support.addEntityOwnershipListener(entityType2, mockListener2);
- // Notify entity1 changed and verify listeners are notified.
+ // Notify entity1 changed and verify appropriate listeners are notified.
support.notifyEntityOwnershipListeners(entity1, false, true, true);
verify(mockListener1, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
- verify(mockListener2, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
- verify(mockListener3, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
- assertEquals("# of listener actors", 3, actorContext.children().size());
+ verify(mockListener1_2, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
+ verify(mockListener2, timeout(300).never()).ownershipChanged(any(EntityOwnershipChange.class));
+ assertEquals("# of listener actors", 2, actorContext.children().size());
+ reset(mockListener1, mockListener2, mockListener1_2);
- // Notify entity2 changed and verify only mockListener1 and mockListener3 are notified.
+ // Notify entity2 changed and verify appropriate listeners are notified.
- support.notifyEntityOwnershipListeners(entity2, false, false, false);
+ support.notifyEntityOwnershipListeners(entity2, false, true, true);
- verify(mockListener1, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, false));
- verify(mockListener3, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, false));
- Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
- verify(mockListener2, never()).ownershipChanged(ownershipChange(entity2));
+ verify(mockListener2, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
+ verify(mockListener1, timeout(300).never()).ownershipChanged(any(EntityOwnershipChange.class));
+ verify(mockListener1_2, never()).ownershipChanged(any(EntityOwnershipChange.class));
assertEquals("# of listener actors", 3, actorContext.children().size());
+ reset(mockListener1, mockListener2, mockListener1_2);
- // Notify entity3 changed and verify only mockListener3 is notified.
-
- support.notifyEntityOwnershipListeners(entity3, false, true, true);
-
- verify(mockListener3, timeout(5000)).ownershipChanged(ownershipChange(entity3, false, true, true));
- Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
- verify(mockListener1, never()).ownershipChanged(ownershipChange(entity3));
- verify(mockListener2, never()).ownershipChanged(ownershipChange(entity3));
-
- // Notify entity4 changed and verify no listeners are notified.
-
- support.notifyEntityOwnershipListeners(entity4, true, false, true);
-
- Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
- verify(mockListener1, never()).ownershipChanged(ownershipChange(entity4));
- verify(mockListener2, never()).ownershipChanged(ownershipChange(entity4));
- verify(mockListener3, never()).ownershipChanged(ownershipChange(entity4));
+ // Notify entity3 changed and verify no listeners are notified.
- // Notify entity5 changed and verify no listener is notified.
+ support.notifyEntityOwnershipListeners(entity3, true, false, true);
- support.notifyEntityOwnershipListeners(entity5, true, false, true);
+ verify(mockListener1, timeout(300).never()).ownershipChanged(any(EntityOwnershipChange.class));
+ verify(mockListener2, never()).ownershipChanged(any(EntityOwnershipChange.class));
+ verify(mockListener1_2, never()).ownershipChanged(any(EntityOwnershipChange.class));
+ reset(mockListener1, mockListener2, mockListener1_2);
- Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
- verify(mockListener1, never()).ownershipChanged(ownershipChange(entity4));
- verify(mockListener2, never()).ownershipChanged(ownershipChange(entity4));
- verify(mockListener3, never()).ownershipChanged(ownershipChange(entity4));
-
- reset(mockListener1, mockListener2, mockListener3);
+ Iterable<ActorRef> listenerActors = actorContext.children();
+ assertEquals("# of listener actors", 3, listenerActors.size());
- // Unregister mockListener1 for entity1, issue a change and verify only mockListeners 2 & 3 are notified.
+ // Unregister mockListener1, issue a change for entity1 and verify only remaining listeners are notified.
- support.removeEntityOwnershipListener(entity1, mockListener1);
+ support.removeEntityOwnershipListener(entityType1, mockListener1);
support.notifyEntityOwnershipListeners(entity1, true, false, true);
- verify(mockListener2, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
- verify(mockListener3, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
- Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
- verify(mockListener1, never()).ownershipChanged(ownershipChange(entity1));
-
- // Unregister mockListener3, issue a change for entity1 and verify only mockListeners2 is notified.
-
- reset(mockListener1, mockListener2, mockListener3);
-
- support.removeEntityOwnershipListener(entity1.getType(), mockListener3);
- support.notifyEntityOwnershipListeners(entity1, false, false, false);
+ verify(mockListener1_2, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
+ verify(mockListener1, timeout(300).never()).ownershipChanged(any(EntityOwnershipChange.class));
+ reset(mockListener1, mockListener2, mockListener1_2);
- verify(mockListener2, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, false));
- Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
- verify(mockListener1, never()).ownershipChanged(ownershipChange(entity1));
- verify(mockListener3, never()).ownershipChanged(ownershipChange(entity1));
-
- // Completely unregister all listeners and verify their listener actors are destroyed.
-
- Iterable<ActorRef> listenerActors = actorContext.children();
- assertEquals("# of listener actors", 2, listenerActors.size());
+ // Unregister all listeners and verify their listener actors are destroyed.
List<JavaTestKit> watchers = new ArrayList<>();
for(Iterator<ActorRef> iter = listenerActors.iterator(); iter.hasNext();) {
watchers.add(kit);
}
- support.removeEntityOwnershipListener(entity2, mockListener1);
- support.removeEntityOwnershipListener(entity2, mockListener1); // un-register again - shoild be noop
- support.removeEntityOwnershipListener(entity1, mockListener2);
+ support.removeEntityOwnershipListener(entityType1, mockListener1_2);
+ support.removeEntityOwnershipListener(entityType1, mockListener1_2); // un-register again - should be noop
+ support.removeEntityOwnershipListener(entityType2, mockListener2);
Iterator<ActorRef> iter = listenerActors.iterator();
for(JavaTestKit kit: watchers) {
assertEquals("# of listener actors", 0, actorContext.children().size());
- // Re-register mockListener1 for entity1 and verify it is notified.
+ // Re-register mockListener1 and verify it is notified.
reset(mockListener1, mockListener2);
- support.addEntityOwnershipListener(entity1, mockListener1);
-
+ support.addEntityOwnershipListener(entityType1, mockListener1);
support.notifyEntityOwnershipListeners(entity1, false, false, true);
verify(mockListener1, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
- verify(mockListener2, never()).ownershipChanged(ownershipChange(entity1));
- verify(mockListener3, never()).ownershipChanged(ownershipChange(entity1));
+ verify(mockListener1_2, never()).ownershipChanged(any(EntityOwnershipChange.class));
+ verify(mockListener2, never()).ownershipChanged(any(EntityOwnershipChange.class));
// Quickly register and unregister mockListener2 - expecting no exceptions.
- support.addEntityOwnershipListener(entity1, mockListener2);
- support.removeEntityOwnershipListener(entity1, mockListener2);
+ support.addEntityOwnershipListener(entityType1, mockListener2);
+ support.removeEntityOwnershipListener(entityType1, mockListener2);
}
@Test
assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity));
- support.addEntityOwnershipListener(entity, mock(EntityOwnershipListener.class));
- assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity));
-
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
- support.addEntityOwnershipListener(entity, candidate);
+ support.setHasCandidateForEntity(entity);
+ support.setHasCandidateForEntity(entity); // set again - should be noop
assertEquals("hasCandidateForEntity", true, support.hasCandidateForEntity(entity));
- support.removeEntityOwnershipListener(entity, candidate);
+ support.unsetHasCandidateForEntity(entity);
+ assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity));
+
+ support.unsetHasCandidateForEntity(entity); // unset again - should be noop
assertEquals("hasCandidateForEntity", false, support.hasCandidateForEntity(entity));
+
+ support.setHasCandidateForEntity(entity);
+ assertEquals("hasCandidateForEntity", true, support.hasCandidateForEntity(entity));
}
}
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
import org.opendaylight.yangtools.yang.common.QName;
YangInstanceIdentifier entityId = ENTITY_ID1;
Entity entity = new Entity(ENTITY_TYPE, entityId);
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
- shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
- verify(candidate, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
}
@Test
YangInstanceIdentifier entityId = ENTITY_ID1;
Entity entity = new Entity(ENTITY_TYPE, entityId);
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
- shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
// Now grant the vote so the shard becomes the leader. This should retry the commit.
peer.underlyingActor().grantVote = true;
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-
verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-
- verify(candidate, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
}
@Test
YangInstanceIdentifier entityId = ENTITY_ID1;
Entity entity = new Entity(ENTITY_TYPE, entityId);
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
- shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
// Wait enough time for the commit to timeout.
follower.dropAppendEntries = false;
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-
verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-
- verify(candidate, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
}
@Test
YangInstanceIdentifier entityId = ENTITY_ID1;
Entity entity = new Entity(ENTITY_TYPE, entityId);
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
- shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
// Resume AppendEntries - the candidate write should now be committed.
follower.dropAppendEntries = false;
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-
verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
-
- verify(candidate, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
}
@Test
shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
DataStoreVersions.CURRENT_VERSION), peer);
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
-
- shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
MockLeader leader = peer.underlyingActor();
shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
- shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
for(int i = 1; i <= max; i++) {
YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
entityIds.add(id);
- shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, id)), kit.getRef());
}
assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
kit.waitUntilLeader(shard);
Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
// Register
- shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
- verify(candidate, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
// Unregister
- reset(candidate);
-
- shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+ shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
- //verify(candidate).ownershipChanged(entity, true, false, false);
// Register again
- shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
- verify(candidate, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
}
@Test
kit.waitUntilLeader(shard);
Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
// Add a remote candidate
// Register local
- shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
// Verify the remote candidate becomes owner
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
- verify(candidate, never()).ownershipChanged(any(EntityOwnershipChange.class));
// Add another remote candidate and verify ownership doesn't change
- reset(candidate);
String remoteMemberName2 = "remoteMember2";
writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
- verify(candidate, never()).ownershipChanged(any(EntityOwnershipChange.class));
// Remove the second remote candidate and verify ownership doesn't change
- reset(candidate);
deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
- verify(candidate, never()).ownershipChanged(any(EntityOwnershipChange.class));
// Remove the first remote candidate and verify the local candidate becomes owner
- reset(candidate);
deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
- verify(candidate, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
// Add the second remote candidate back and verify ownership doesn't change
- reset(candidate);
writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
- verify(candidate, never()).ownershipChanged(any(EntityOwnershipChange.class));
// Unregister the local candidate and verify the second remote candidate becomes owner
- shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+ shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
kit.waitUntilLeader(leader);
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
-
// Send PeerDown and PeerUp with no entities
leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
// Add candidates for entity1 with the local leader as the owner
- leader.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+ leader.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
- leader.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
+ leader.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
shard.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender());
Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+ EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
+
+ shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
// Register local candidate
- shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
- verify(candidate, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
- reset(candidate);
+ verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
+ reset(listener);
// Simulate a replicated commit from the leader to remove the local candidate that would occur after a
// network partition is healed.
leader.tell(new PeerDown(LOCAL_MEMBER_NAME, localId.toString()), ActorRef.noSender());
- verify(candidate, timeout(5000)).ownershipChanged(ownershipChange(entity, true, false, false));
+ verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, true, false, false));
// Since the the shard has a local candidate registered, it should re-add its candidate to the entity.
verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
- verify(candidate, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
+ verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
// Unregister the local candidate and verify it's removed and no re-added.
- shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+ shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
Entity entity3 = new Entity(ENTITY_TYPE, ENTITY_ID3);
Entity entity4 = new Entity(otherEntityType, ENTITY_ID3);
EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
- EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
// Register listener
// Register a couple candidates for the desired entity type and verify listener is notified.
- shard.tell(new RegisterCandidateLocal(candidate, entity1), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity1), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
- shard.tell(new RegisterCandidateLocal(candidate, entity2), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity2), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
// Register another candidate for another entity type and verify listener is not notified.
- shard.tell(new RegisterCandidateLocal(candidate, entity4), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity4), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
// Unregister the local candidate for entity1 and verify listener is notified
- shard.tell(new UnregisterCandidateLocal(candidate, entity1), kit.getRef());
+ shard.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
shard.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
- shard.tell(new RegisterCandidateLocal(candidate, entity3), kit.getRef());
+ shard.tell(new RegisterCandidateLocal(entity3), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
verifyOwner(shard, ENTITY_TYPE, entity3.getId(), LOCAL_MEMBER_NAME);
// Re-register the listener and verify it gets notified of currently owned entities
- reset(listener, candidate);
+ reset(listener);
shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
verify(listener, never()).ownershipChanged(ownershipChange(entity4));
verify(listener, never()).ownershipChanged(ownershipChange(entity1));
- verify(candidate, never()).ownershipChanged(ownershipChange(entity2));
- verify(candidate, never()).ownershipChanged(ownershipChange(entity3));
}
private void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
private static final String ENTITY_TYPE = "cars";
-
- private final CarEntityOwnershipCandidate ownershipCandidate = new CarEntityOwnershipCandidate();
-
+ private final CarEntityOwnershipListener ownershipListener = new CarEntityOwnershipListener();
+ private final AtomicBoolean registeredListener = new AtomicBoolean();
private volatile Thread testThread;
private volatile boolean stopThread;
@Override
public Future<RpcResult<Void>> registerOwnership(RegisterOwnershipInput input) {
+ if(registeredListener.compareAndSet(false, true)) {
+ ownershipService.registerListener(ENTITY_TYPE, ownershipListener);
+ }
+
Entity entity = new Entity(ENTITY_TYPE, input.getCarId());
try {
- ownershipService.registerCandidate(entity, ownershipCandidate);
+ ownershipService.registerCandidate(entity);
} catch (CandidateAlreadyRegisteredException e) {
return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION,
"Could not register for car " + input.getCarId(), e).buildFuture();
return RpcResultBuilder.<Void>success().buildFuture();
}
- private static class CarEntityOwnershipCandidate implements EntityOwnershipCandidate {
+ private static class CarEntityOwnershipListener implements EntityOwnershipListener {
@Override
public void ownershipChanged(EntityOwnershipChange ownershipChange) {
LOG.info("ownershipChanged: {}", ownershipChange);