package org.opendaylight.controller.md.sal.common.api.clustering;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+
/**
* An EntityOwnershipCandidateRegistration records a request to register a Candidate for a given Entity. Calling
* close on the EntityOwnershipCandidateRegistration will remove the Candidate from any future ownership considerations
* for that Entity and will also remove it as a Listener for ownership status changes.
*/
-public interface EntityOwnershipCandidateRegistration extends EntityOwnershipListenerRegistration {
+public interface EntityOwnershipCandidateRegistration extends ObjectRegistration<EntityOwnershipCandidate> {
+ /**
+ * Returns the entity that the listener was registered for
+ */
+ @Nonnull Entity getEntity();
+
+ /**
+ * Unregister the listener
+ */
+ @Override
+ void close();
}
package org.opendaylight.controller.md.sal.common.api.clustering;
+import javax.annotation.Nonnull;
import org.opendaylight.yangtools.concepts.ObjectRegistration;
/**
public interface EntityOwnershipListenerRegistration extends ObjectRegistration<EntityOwnershipListener> {
/**
- *
- * @return the entity that the listener was registered for
+ * Return the entity type that the listener was registered for
*/
- Entity getEntity();
+ @Nonnull String getEntityType();
/**
* Unregister the listener
*/
+ @Override
void close();
}
package org.opendaylight.controller.md.sal.common.api.clustering;
+import javax.annotation.Nonnull;
+
/**
* <p>
* The EntityOwnershipService provides the means for a component/application to request ownership for a given
public interface EntityOwnershipService {
/**
- * Registers as a Candidate that wants to own the given Entity. Only one such request can be made per process.
- * If multiple requests for registering a Candidate for a given Entity are received in the current process a
- * CandidateAlreadyRegisteredException will be thrown
+ * Registers a candidate for ownership of the given entity. Only one such request can be made per entity
+ * per process. If multiple requests for registering a candidate for a given entity are received in the
+ * current process a CandidateAlreadyRegisteredException will be thrown.
+ * <p>
+ * The registration is performed asynchronously and the {@link EntityOwnershipCandidate} instance is
+ * notified whenever its process instance is granted ownership of the entity and also whenever it loses
+ * ownership. Note that the {@link EntityOwnershipCandidate} is not notified when another process instance
+ * is granted ownership.
*
* @param entity the entity which the Candidate wants to own
* @param candidate the Candidate that wants to own the entity
* @return a registration object that can be used to unregister the Candidate
* @throws org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException
*/
- EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
+ EntityOwnershipCandidateRegistration registerCandidate(@Nonnull Entity entity, @Nonnull EntityOwnershipCandidate candidate)
throws CandidateAlreadyRegisteredException;
/**
- * Registers a Listener that is interested in the ownership status of the given Entity. On registration the Listener
- * will be notified of the ownership status of the given Entity at the time of registration.
+ * Registers a listener that is interested in ownership changes for entities of the given entity type. The
+ * listener is notified whenever its process instance is granted ownership of the entity and also whenever
+ * it loses ownership. On registration the listener will be notified of all entities its process instance
+ * currently owns at the time of registration.
*
- * @param entity the Entity whose ownership status the Listener is interested in
- * @param listener the Listener that is interested in the entity
+ * @param entityType the type of entities whose ownership status the Listener is interested in
+ * @param listener the listener that is interested in the entities
* @return a registration object that can be used to unregister the Listener
*/
- EntityOwnershipListenerRegistration registerListener(Entity entity, EntityOwnershipListener listener);
+ EntityOwnershipListenerRegistration registerListener(@Nonnull String entityType, @Nonnull EntityOwnershipListener listener);
}
*/
package org.opendaylight.controller.md.sal.common.impl.clustering;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
/**
* Abstract base class for an EntityOwnershipCandidateRegistration.
*
* @author Thomas Pantelis
*/
-public abstract class AbstractEntityOwnershipCandidateRegistration
- extends AbstractEntityOwnershipListenerRegistration<EntityOwnershipCandidate>
+public abstract class AbstractEntityOwnershipCandidateRegistration extends AbstractObjectRegistration<EntityOwnershipCandidate>
implements EntityOwnershipCandidateRegistration {
+ private final Entity entity;
- protected AbstractEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity) {
- super(candidate, entity);
+ protected AbstractEntityOwnershipCandidateRegistration(@Nonnull EntityOwnershipCandidate candidate,
+ @Nonnull Entity entity) {
+ super(candidate);
+ this.entity = Preconditions.checkNotNull(entity, "entity cannot be null");
+ }
+
+ @Override
+ public Entity getEntity() {
+ return entity;
}
}
*/
package org.opendaylight.controller.md.sal.common.impl.clustering;
-import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
/**
* Abstract base class for an EntityOwnershipListenerRegistration.
*
* @author Thomas Pantelis
*/
-public abstract class AbstractEntityOwnershipListenerRegistration<T extends EntityOwnershipListener>
+public abstract class AbstractEntityOwnershipListenerRegistration extends AbstractObjectRegistration<EntityOwnershipListener>
implements EntityOwnershipListenerRegistration {
- private final T listener;
- private final Entity entity;
+ private final String entityType;
- protected AbstractEntityOwnershipListenerRegistration(T listener, Entity entity) {
- this.listener = listener;
- this.entity = entity;
+ protected AbstractEntityOwnershipListenerRegistration(@Nonnull EntityOwnershipListener listener,
+ @Nonnull String entityType) {
+ super(listener);
+ this.entityType = Preconditions.checkNotNull(entityType, "entityType cannot be null");
}
@Override
- public T getInstance() {
- return listener;
- }
-
- @Override
- public Entity getEntity() {
- return entity;
+ public String getEntityType() {
+ return entityType;
}
}
}
@Override
- public void close() {
+ protected void removeRegistration() {
service.unregisterCandidate(getEntity(), getInstance());
}
+
+ @Override
+ public String toString() {
+ return "DistributedEntityOwnershipCandidateRegistration [entity=" + getEntity() + ", candidate="
+ + getInstance() + "]";
+ }
+
+
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.impl.clustering.AbstractEntityOwnershipListenerRegistration;
+
+/**
+ * Implementation of EntityOwnershipListenerRegistration.
+ *
+ * @author Thomas Pantelis
+ */
+class DistributedEntityOwnershipListenerRegistration extends AbstractEntityOwnershipListenerRegistration {
+
+ private final DistributedEntityOwnershipService service;
+
+ DistributedEntityOwnershipListenerRegistration(EntityOwnershipListener listener, String entityType,
+ DistributedEntityOwnershipService service) {
+ super(listener, entityType);
+ this.service = service;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ service.unregisterListener(getEntityType(), getInstance());
+ }
+
+ @Override
+ public String toString() {
+ return "DistributedEntityOwnershipListenerRegistration [entityType=" + getEntityType()
+ + ", listener=" + getInstance() + "]";
+ }
+}
import akka.dispatch.OnComplete;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
@Override
public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
throws CandidateAlreadyRegisteredException {
+ Preconditions.checkNotNull(entity, "entity cannot be null");
+ Preconditions.checkNotNull(candidate, "candidate cannot be null");
EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity, candidate);
if(currentCandidate != null) {
}
void unregisterCandidate(Entity entity, EntityOwnershipCandidate entityOwnershipCandidate) {
- LOG.debug("Unregistering candidate for {}", entity);
+ LOG.debug("Unregistering candidate {} for {}", entityOwnershipCandidate, entity);
executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entityOwnershipCandidate, entity));
registeredEntities.remove(entity);
}
@Override
- public EntityOwnershipListenerRegistration registerListener(Entity entity, EntityOwnershipListener listener) {
- // TODO Auto-generated method stub
- return null;
+ public EntityOwnershipListenerRegistration registerListener(String entityType, EntityOwnershipListener listener) {
+ Preconditions.checkNotNull(entityType, "entityType cannot be null");
+ Preconditions.checkNotNull(listener, "listener cannot be null");
+
+ RegisterListenerLocal registerListener = new RegisterListenerLocal(listener, entityType);
+
+ LOG.debug("Registering listener with message: {}", registerListener);
+
+ executeLocalEntityOwnershipShardOperation(registerListener);
+ return new DistributedEntityOwnershipListenerRegistration(listener, entityType, this);
+ }
+
+ void unregisterListener(String entityType, EntityOwnershipListener listener) {
+ LOG.debug("Unregistering listener {} for entity type {}", listener, entityType);
+
+ executeLocalEntityOwnershipShardOperation(new UnregisterListenerLocal(listener, entityType));
}
@Override
static final NodeIdentifier ENTITY_OWNERS_NODE_ID = new NodeIdentifier(EntityOwners.QNAME);
static final NodeIdentifier ENTITY_OWNER_NODE_ID = new NodeIdentifier(ENTITY_OWNER_QNAME);
static final NodeIdentifier ENTITY_NODE_ID = new NodeIdentifier(ENTITY_QNAME);
+ static final NodeIdentifier ENTITY_ID_NODE_ID = new NodeIdentifier(ENTITY_ID_QNAME);
+ static final NodeIdentifier ENTITY_TYPE_NODE_ID = new NodeIdentifier(ENTITY_TYPE_QNAME);
static final NodeIdentifier CANDIDATE_NODE_ID = new NodeIdentifier(Candidate.QNAME);
static final NodeIdentifier CANDIDATE_NAME_NODE_ID = new NodeIdentifier(CANDIDATE_NAME_QNAME);
static final YangInstanceIdentifier ENTITY_OWNERS_PATH = YangInstanceIdentifier.of(EntityOwners.QNAME);
import akka.actor.PoisonPill;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import java.util.Arrays;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Map;
private final ActorContext actorContext;
private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
private final Multimap<Entity, EntityOwnershipListener> entityListenerMap = HashMultimap.create();
+ private final Multimap<String, EntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();
EntityOwnershipListenerSupport(ActorContext actorContext) {
this.actorContext = actorContext;
void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
LOG.debug("Adding EntityOwnershipListener {} for {}", listener, entity);
- if(entityListenerMap.put(entity, listener)) {
+ addListener(listener, entity, entityListenerMap);
+ }
+
+ void addEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
+ LOG.debug("Adding EntityOwnershipListener {} for entity type {}", listener, entityType);
+
+ addListener(listener, entityType, entityTypeListenerMap);
+ }
+
+ void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
+ LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity);
+
+ removeListener(listener, entity, entityListenerMap);
+ }
+
+ void removeEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
+ LOG.debug("Removing EntityOwnershipListener {} for entity type {}", listener, entityType);
+
+ removeListener(listener, entityType, entityTypeListenerMap);
+ }
+
+ void notifyEntityOwnershipListeners(Entity entity, boolean wasOwner, boolean isOwner) {
+ notifyListeners(entity, entity, wasOwner, isOwner, entityListenerMap);
+ notifyListeners(entity, entity.getType(), wasOwner, isOwner, entityTypeListenerMap);
+ }
+
+ void notifyEntityOwnershipListener(Entity entity, boolean wasOwner, boolean isOwner,
+ EntityOwnershipListener listener) {
+ notifyListeners(entity, wasOwner, isOwner, Arrays.asList(listener));
+ }
+
+ private <T> void notifyListeners(Entity entity, T mapKey, boolean wasOwner, boolean isOwner,
+ Multimap<T, EntityOwnershipListener> listenerMap) {
+ Collection<EntityOwnershipListener> listeners = listenerMap.get(mapKey);
+ if(!listeners.isEmpty()) {
+ notifyListeners(entity, wasOwner, isOwner, listeners);
+ }
+ }
+
+ private void notifyListeners(Entity entity, boolean wasOwner, boolean isOwner,
+ Collection<EntityOwnershipListener> listeners) {
+ EntityOwnershipChanged changed = new EntityOwnershipChanged(entity, wasOwner, isOwner);
+ for(EntityOwnershipListener listener: listeners) {
+ ActorRef listenerActor = listenerActorFor(listener);
+
+ LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor, changed);
+
+ listenerActor.tell(changed, ActorRef.noSender());
+ }
+ }
+
+ private <T> void addListener(EntityOwnershipListener listener, T mapKey,
+ Multimap<T, EntityOwnershipListener> toListenerMap) {
+ if(toListenerMap.put(mapKey, listener)) {
ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
if(listenerEntry == null) {
listenerActorMap.put(listener, new ListenerActorRefEntry());
}
}
- void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
- LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity);
-
- if(entityListenerMap.remove(entity, listener)) {
+ private <T> void removeListener(EntityOwnershipListener listener, T mapKey,
+ Multimap<T, EntityOwnershipListener> fromListenerMap) {
+ if(fromListenerMap.remove(mapKey, listener)) {
ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
LOG.debug("Found {}", listenerEntry);
}
}
- void notifyEntityOwnershipListeners(Entity entity, boolean wasOwner, boolean isOwner) {
- Collection<EntityOwnershipListener> listeners = entityListenerMap.get(entity);
- if(listeners.isEmpty()) {
- return;
- }
-
- EntityOwnershipChanged changed = new EntityOwnershipChanged(entity, wasOwner, isOwner);
- for(EntityOwnershipListener listener: listeners) {
- ActorRef listenerActor = listenerActorFor(listener);
-
- LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor,changed);
-
- listenerActor.tell(changed, ActorRef.noSender());
- }
- }
-
private ActorRef listenerActorFor(EntityOwnershipListener listener) {
return listenerActorMap.get(listener).actorFor(listener);
}
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
import akka.actor.ActorRef;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
onPeerDown((PeerDown) message);
} else if(message instanceof PeerUp) {
onPeerUp((PeerUp) message);
+ } if(message instanceof RegisterListenerLocal) {
+ onRegisterListenerLocal((RegisterListenerLocal)message);
+ } if(message instanceof UnregisterListenerLocal) {
+ onUnregisterListenerLocal((UnregisterListenerLocal)message);
} else if(!commitCoordinator.handleMessage(message, this)) {
super.onReceiveCommand(message);
}
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
+ private void onRegisterListenerLocal(final RegisterListenerLocal registerListener) {
+ LOG.debug("{}: onRegisterListenerLocal: {}", persistenceId(), registerListener);
+
+ listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+
+ searchForEntitiesOwnedBy(localMemberName, new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ Optional<DataContainerChild<? extends PathArgument, ?>> possibleType =
+ entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
+ String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
+ if(registerListener.getEntityType().equals(entityType)) {
+ Entity entity = new Entity(entityType,
+ (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
+ listenerSupport.notifyEntityOwnershipListener(entity, false, true, registerListener.getListener());
+ }
+ }
+ });
+ }
+
+ private void onUnregisterListenerLocal(UnregisterListenerLocal unregisterListener) {
+ LOG.debug("{}: onUnregisterListenerLocal: {}", persistenceId(), unregisterListener);
+
+ listenerSupport.removeEntityOwnershipListener(unregisterListener.getEntityType(), unregisterListener.getListener());
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
void tryCommitModifications(final BatchedModifications modifications) {
if(isLeader()) {
LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(), modifications.getTransactionID());
}
private void selectNewOwnerForEntitiesOwnedBy(String owner) {
+ final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+ searchForEntitiesOwnedBy(owner, new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ Object newOwner = newOwner(getCandidateNames(entityNode));
+ YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
+ node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
+ node(ENTITY_OWNER_NODE_ID).build();
+
+ LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
+
+ modifications.addModification(new WriteModification(entityPath,
+ ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+ }
+ });
+
+ commitCoordinator.commitModifications(modifications, this);
+ }
+
+ private void searchForEntitiesOwnedBy(String owner, EntityWalker walker) {
DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
if(!possibleEntityTypes.isPresent()) {
LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
- BatchedModifications modifications = commitCoordinator.newBatchedModifications();
for(MapEntryNode entityType: ((MapNode) possibleEntityTypes.get()).getValue()) {
Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
entityType.getChild(ENTITY_NODE_ID);
Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
entity.getChild(ENTITY_OWNER_NODE_ID);
if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
- Object newOwner = newOwner(getCandidateNames(entity));
- YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
- node(entityType.getIdentifier()).node(ENTITY_NODE_ID).node(entity.getIdentifier()).
- node(ENTITY_OWNER_NODE_ID).build();
-
- LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
-
- modifications.addModification(new WriteModification(entityPath,
- ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+ walker.onEntity(entityType, entity);
}
}
}
-
- commitCoordinator.commitModifications(modifications, this);
}
private Collection<String> getCandidateNames(MapEntryNode entity) {
return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
}
}
+
+ private static interface EntityWalker {
+ void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+
+/**
+ * Message sent to the local EntityOwnershipShard to register an EntityOwnershipListener.
+ *
+ * @author Thomas Pantelis
+ */
+public class RegisterListenerLocal {
+ private final EntityOwnershipListener listener;
+ private final String entityType;
+
+ public RegisterListenerLocal(EntityOwnershipListener listener, String entityType) {
+ this.listener = listener;
+ this.entityType = entityType;
+ }
+
+ public EntityOwnershipListener getListener() {
+ return listener;
+ }
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+ @Override
+ public String toString() {
+ return "RegisterListenerLocal [entityType=" + entityType + ", listener=" + listener + "]";
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+
+/**
+ * Message sent to the local EntityOwnershipShard to unregister an EntityOwnershipListener.
+ *
+ * @author Thomas Pantelis
+ */
+public class UnregisterListenerLocal {
+ private final EntityOwnershipListener listener;
+ private final String entityType;
+
+ public UnregisterListenerLocal(EntityOwnershipListener listener, String entityType) {
+ this.listener = listener;
+ this.entityType = entityType;
+ }
+
+ public EntityOwnershipListener getListener() {
+ return listener;
+ }
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+ @Override
+ public String toString() {
+ return "UnregisterListenerLocal [entityType=" + entityType + ", listener=" + listener + "]";
+ }
+}
import org.opendaylight.controller.cluster.datastore.config.ModuleConfig;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfigProvider;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
}
@Test
- public void testRegisterListener() {
- // TODO
+ public void testListenerRegistration() {
+ final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
+ DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
+ @Override
+ protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
+ return shardPropsCreator;
+ }
+ };
+
+ service.start();
+
+ shardPropsCreator.expectShardMessage(RegisterListenerLocal.class);
+
+ YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
+ Entity entity = new Entity(ENTITY_TYPE, entityId);
+ EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
+
+ EntityOwnershipListenerRegistration reg = service.registerListener(entity.getType(), listener);
+
+ assertNotNull("EntityOwnershipListenerRegistration null", reg);
+ assertEquals("getEntityType", entity.getType(), reg.getEntityType());
+ assertEquals("getInstance", listener, reg.getInstance());
+
+ RegisterListenerLocal regListener = shardPropsCreator.waitForShardMessage();
+ assertSame("getListener", listener, regListener.getListener());
+ assertEquals("getEntityType", entity.getType(), regListener.getEntityType());
+
+ shardPropsCreator.expectShardMessage(UnregisterListenerLocal.class);
+
+ reg.close();
+
+ UnregisterListenerLocal unregListener = shardPropsCreator.waitForShardMessage();
+ assertEquals("getEntityType", entity.getType(), unregListener.getEntityType());
+ assertSame("getListener", listener, unregListener.getListener());
+
+ service.close();
}
private void verifyEntityCandidate(ActorRef entityOwnershipShard, String entityType,
package org.opendaylight.controller.cluster.datastore.entityownership;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;
EntityOwnershipListener mockListener1 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1");
EntityOwnershipListener mockListener2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener2");
- Entity entity1 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1")));
- Entity entity2 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id2")));
+ EntityOwnershipListener mockListener3 = mock(EntityOwnershipListener.class, "EntityOwnershipListener3");
+ Entity entity1 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id1")));
+ Entity entity2 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id2")));
+ Entity entity3 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id3")));
+ Entity entity4 = new Entity("type2", YangInstanceIdentifier.of(QName.create("test", "id4")));
+ Entity entity5 = new Entity("noListener", YangInstanceIdentifier.of(QName.create("test", "id5")));
// Add EntityOwnershipListener registrations.
support.addEntityOwnershipListener(entity1, mockListener1);
+ support.addEntityOwnershipListener(entity1, mockListener1); // register again - should be noop
support.addEntityOwnershipListener(entity2, mockListener1);
support.addEntityOwnershipListener(entity1, mockListener2);
+ support.addEntityOwnershipListener(entity1.getType(), mockListener3);
- // Notify entity1 changed and verify both listeners are notified.
+ // Notify entity1 changed and verify listeners are notified.
support.notifyEntityOwnershipListeners(entity1, false, true);
verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true);
verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true);
- assertEquals("# of listener actors", 2, actorContext.children().size());
+ verify(mockListener3, timeout(5000)).ownershipChanged(entity1, false, true);
+ assertEquals("# of listener actors", 3, actorContext.children().size());
- // Notify entity2 changed and verify only mockListener1 is notified.
+ // Notify entity2 changed and verify only mockListener1 and mockListener3 are notified.
support.notifyEntityOwnershipListeners(entity2, false, true);
verify(mockListener1, timeout(5000)).ownershipChanged(entity2, false, true);
- verify(mockListener2, never()).ownershipChanged(entity2, false, true);
- assertEquals("# of listener actors", 2, actorContext.children().size());
+ verify(mockListener3, timeout(5000)).ownershipChanged(entity2, false, true);
+ Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+ verify(mockListener2, never()).ownershipChanged(eq(entity2), anyBoolean(), anyBoolean());
+ assertEquals("# of listener actors", 3, actorContext.children().size());
- // Notify entity3 changed and verify neither listener is notified.
+ // Notify entity3 changed and verify only mockListener3 is notified.
- Entity entity3 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id3")));
support.notifyEntityOwnershipListeners(entity3, false, true);
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verify(mockListener3, timeout(5000)).ownershipChanged(entity3, false, true);
+ Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+ verify(mockListener1, never()).ownershipChanged(eq(entity3), anyBoolean(), anyBoolean());
+ verify(mockListener2, never()).ownershipChanged(eq(entity3), anyBoolean(), anyBoolean());
- verify(mockListener1, never()).ownershipChanged(entity3, false, true);
- verify(mockListener2, never()).ownershipChanged(entity3, false, true);
+ // Notify entity4 changed and verify no listeners are notified.
- reset(mockListener1, mockListener2);
+ support.notifyEntityOwnershipListeners(entity4, false, true);
+
+ Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+ verify(mockListener1, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+ verify(mockListener2, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+ verify(mockListener3, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+
+ // Notify entity5 changed and verify no listener is notified.
+
+ support.notifyEntityOwnershipListeners(entity5, false, true);
- // Unregister mockListener1 for entity1, issue a change and verify only mockListener2 is notified.
+ Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+ verify(mockListener1, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+ verify(mockListener2, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+ verify(mockListener3, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+
+ reset(mockListener1, mockListener2, mockListener3);
+
+ // Unregister mockListener1 for entity1, issue a change and verify only mockListeners 2 & 3 are notified.
support.removeEntityOwnershipListener(entity1, mockListener1);
+ support.notifyEntityOwnershipListeners(entity1, false, true);
+
+ verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true);
+ verify(mockListener3, timeout(5000)).ownershipChanged(entity1, false, true);
+ Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+ verify(mockListener1, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
+
+ // Unregister mockListener3, issue a change for entity1 and verify only mockListeners2 is notified.
+ reset(mockListener1, mockListener2, mockListener3);
+
+ support.removeEntityOwnershipListener(entity1.getType(), mockListener3);
support.notifyEntityOwnershipListeners(entity1, false, true);
verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true);
- verify(mockListener1, never()).ownershipChanged(entity1, false, true);
+ Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+ verify(mockListener1, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
+ verify(mockListener3, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
- // Completely unregister both listeners and verify their listener actors are destroyed.
+ // Completely unregister all listeners and verify their listener actors are destroyed.
Iterable<ActorRef> listenerActors = actorContext.children();
assertEquals("# of listener actors", 2, listenerActors.size());
- Iterator<ActorRef> iter = listenerActors.iterator();
- ActorRef listenerActor1 = iter.next();
- ActorRef listenerActor2 = iter.next();
-
- JavaTestKit kit1 = new JavaTestKit(getSystem());
- kit1.watch(listenerActor1);
-
- JavaTestKit kit2 = new JavaTestKit(getSystem());
- kit2.watch(listenerActor2);
+ List<JavaTestKit> watchers = new ArrayList<>();
+ for(Iterator<ActorRef> iter = listenerActors.iterator(); iter.hasNext();) {
+ JavaTestKit kit = new JavaTestKit(getSystem());
+ kit.watch(iter.next());
+ watchers.add(kit);
+ }
support.removeEntityOwnershipListener(entity2, mockListener1);
+ support.removeEntityOwnershipListener(entity2, mockListener1); // un-register again - shoild be noop
support.removeEntityOwnershipListener(entity1, mockListener2);
- kit1.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor1);
- kit2.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor2);
+ Iterator<ActorRef> iter = listenerActors.iterator();
+ for(JavaTestKit kit: watchers) {
+ kit.expectTerminated(JavaTestKit.duration("3 seconds"), iter.next());
+ }
+
assertEquals("# of listener actors", 0, actorContext.children().size());
// Re-register mockListener1 for entity1 and verify it is notified.
support.notifyEntityOwnershipListeners(entity1, false, true);
verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true);
- verify(mockListener2, never()).ownershipChanged(entity2, false, true);
+ verify(mockListener2, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
+ verify(mockListener3, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
// Quickly register and unregister mockListener2 - expecting no exceptions.
*/
package org.opendaylight.controller.cluster.datastore.entityownership;
+import static org.hamcrest.CoreMatchers.either;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.cluster.datastore.ShardTestKit;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
ShardTestKit kit = new ShardTestKit(getSystem());
- dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
shardBatchedModificationCount(5);
String peerId = newShardId("leader").toString();
TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
- TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
- ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
- withDispatcher(Dispatchers.DefaultDispatcherId()));
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
+ TestEntityOwnershipShard.class, newShardId(LOCAL_MEMBER_NAME),
+ ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build(),
+ dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
DataStoreVersions.CURRENT_VERSION), peer);
verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
LOCAL_MEMBER_NAME);
- shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
-
// Test with initial commit timeout and subsequent retry.
leader.modificationsReceived = new CountDownLatch(1);
verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
}
+ @Test
+ public void testListenerRegistration() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+ kit.waitUntilLeader(shard);
+ ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
+
+ String otherEntityType = "otherEntityType";
+ Entity entity1 = new Entity(ENTITY_TYPE, ENTITY_ID1);
+ Entity entity2 = new Entity(ENTITY_TYPE, ENTITY_ID2);
+ Entity entity3 = new Entity(ENTITY_TYPE, ENTITY_ID3);
+ Entity entity4 = new Entity(otherEntityType, ENTITY_ID3);
+ EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ // Register listener
+
+ shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Register a couple candidates for the desired entity type and verify listener is notified.
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity1), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verify(listener, timeout(5000)).ownershipChanged(entity1, false, true);
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity2), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verify(listener, timeout(5000)).ownershipChanged(entity2, false, true);
+ reset(listener);
+
+ // Register another candidate for another entity type and verify listener is not notified.
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity4), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verify(listener, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+
+ // Register remote candidate for entity1
+
+ String remoteMemberName = "remoteMember";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, entity1.getId(), remoteMemberName),
+ shardDataTree);
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entity1.getId(), remoteMemberName);
+
+ // Unregister the local candidate for entity1 and verify listener is notified
+
+ shard.tell(new UnregisterCandidateLocal(candidate, entity1), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verify(listener, timeout(5000)).ownershipChanged(entity1, true, false);
+ reset(listener);
+
+ // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
+
+ shard.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity3), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyOwner(shard, ENTITY_TYPE, entity3.getId(), LOCAL_MEMBER_NAME);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verify(listener, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Re-register the listener and verify it gets notified of current locally owned entities
+
+ reset(listener, candidate);
+
+ shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ Matcher<Entity> entityMatcher = either(equalTo(entity2)).or(equalTo(entity3));
+ verify(listener, timeout(5000).times(2)).ownershipChanged(argThat(entityMatcher), eq(false), eq(true));
+ Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+ verify(listener, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+ verify(listener, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
+ verify(candidate, never()).ownershipChanged(eq(entity2), anyBoolean(), anyBoolean());
+ verify(candidate, never()).ownershipChanged(eq(entity3), anyBoolean(), anyBoolean());
+ }
+
private void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
JavaTestKit sender) {
BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
}
+ public static class TestEntityOwnershipShard extends EntityOwnershipShard {
+
+ TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext) {
+ super(name, peerAddresses, datastoreContext, SCHEMA_CONTEXT, LOCAL_MEMBER_NAME);
+ }
+
+ @Override
+ public void onReceiveCommand(Object message) throws Exception {
+ if(!(message instanceof ElectionTimeout)) {
+ super.onReceiveCommand(message);
+ }
+ }
+
+
+ }
+
public static class MockFollower extends UntypedActor {
volatile boolean grantVote;
volatile boolean dropAppendEntries;