Bug 4105: Remove candidates on PeerDown
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / DistributedEntityOwnershipService.java
index bfdda4ce706aab635cc5c2fe7ed68c289b8e9910..cd45ef58a9660c331aae11c1f691b62580039b2f 100644 (file)
@@ -11,13 +11,20 @@ import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
@@ -25,6 +32,7 @@ import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipC
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -50,9 +58,11 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     public void start() {
         ActorRef shardManagerActor = datastore.getActorContext().getShardManager();
 
-        CreateShard createShard = new CreateShard(ENTITY_OWNERSHIP_SHARD_NAME,
-                datastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards(),
-                newShardPropsCreator(), null);
+        Configuration configuration = datastore.getActorContext().getConfiguration();
+        Collection<String> entityOwnersMemberNames = configuration.getUniqueMemberNamesForAllShards();
+        CreateShard createShard = new CreateShard(new ModuleShardConfiguration(EntityOwners.QNAME.getNamespace(),
+                "entity-owners", ENTITY_OWNERSHIP_SHARD_NAME, ModuleShardStrategy.NAME, entityOwnersMemberNames),
+                        newShardPropsCreator(), null);
 
         Future<Object> createFuture = datastore.getActorContext().executeOperationAsync(shardManagerActor,
                 createShard, MESSAGE_TIMEOUT);
@@ -106,6 +116,8 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     @Override
     public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
             throws CandidateAlreadyRegisteredException {
+        Preconditions.checkNotNull(entity, "entity cannot be null");
+        Preconditions.checkNotNull(candidate, "candidate cannot be null");
 
         EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity, candidate);
         if(currentCandidate != null) {
@@ -121,16 +133,29 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     }
 
     void unregisterCandidate(Entity entity, EntityOwnershipCandidate entityOwnershipCandidate) {
-        LOG.debug("Unregistering candidate for {}", entity);
+        LOG.debug("Unregistering candidate {} for {}", entityOwnershipCandidate, entity);
 
         executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entityOwnershipCandidate, entity));
         registeredEntities.remove(entity);
     }
 
     @Override
-    public EntityOwnershipListenerRegistration registerListener(Entity entity, EntityOwnershipListener listener) {
-        // TODO Auto-generated method stub
-        return null;
+    public EntityOwnershipListenerRegistration registerListener(String entityType, EntityOwnershipListener listener) {
+        Preconditions.checkNotNull(entityType, "entityType cannot be null");
+        Preconditions.checkNotNull(listener, "listener cannot be null");
+
+        RegisterListenerLocal registerListener = new RegisterListenerLocal(listener, entityType);
+
+        LOG.debug("Registering listener with message: {}", registerListener);
+
+        executeLocalEntityOwnershipShardOperation(registerListener);
+        return new DistributedEntityOwnershipListenerRegistration(listener, entityType, this);
+    }
+
+    void unregisterListener(String entityType, EntityOwnershipListener listener) {
+        LOG.debug("Unregistering listener {} for entity type {}", listener, entityType);
+
+        executeLocalEntityOwnershipShardOperation(new UnregisterListenerLocal(listener, entityType));
     }
 
     @Override