BUG-2399: take into account new ModificationTypes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / DistributedEntityOwnershipService.java
index f51f57944321ce14887518097de7d3f4d8f2d907..e272b80a67893ba927f42099c090e1817ce77282 100644 (file)
@@ -11,20 +11,27 @@ import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -40,7 +47,7 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     private static final Timeout MESSAGE_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
 
     private final DistributedDataStore datastore;
-    private final ConcurrentMap<Entity, EntityOwnershipCandidate> registeredEntities = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Entity, Entity> registeredEntities = new ConcurrentHashMap<>();
     private volatile ActorRef localEntityOwnershipShard;
 
     public DistributedEntityOwnershipService(DistributedDataStore datastore) {
@@ -50,9 +57,11 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     public void start() {
         ActorRef shardManagerActor = datastore.getActorContext().getShardManager();
 
-        CreateShard createShard = new CreateShard(ENTITY_OWNERSHIP_SHARD_NAME,
-                datastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards(),
-                newShardPropsCreator(), null);
+        Configuration configuration = datastore.getActorContext().getConfiguration();
+        Collection<String> entityOwnersMemberNames = configuration.getUniqueMemberNamesForAllShards();
+        CreateShard createShard = new CreateShard(new ModuleShardConfiguration(EntityOwners.QNAME.getNamespace(),
+                "entity-owners", ENTITY_OWNERSHIP_SHARD_NAME, ModuleShardStrategy.NAME, entityOwnersMemberNames),
+                        newShardPropsCreator(), null);
 
         Future<Object> createFuture = datastore.getActorContext().executeOperationAsync(shardManagerActor,
                 createShard, MESSAGE_TIMEOUT);
@@ -76,7 +85,6 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
             public void onComplete(Throwable failure, Object response) {
                 if(failure != null) {
                     LOG.debug("Error sending message {} to {}", message, shardActor, failure);
-                    // TODO - queue for retry
                 } else {
                     LOG.debug("{} message to {} succeeded", message, shardActor, failure);
                 }
@@ -105,20 +113,20 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     }
 
     @Override
-    public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
+    public EntityOwnershipCandidateRegistration registerCandidate(Entity entity)
             throws CandidateAlreadyRegisteredException {
+        Preconditions.checkNotNull(entity, "entity cannot be null");
 
-        EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity, candidate);
-        if(currentCandidate != null) {
-            throw new CandidateAlreadyRegisteredException(entity, currentCandidate);
+        if(registeredEntities.putIfAbsent(entity, entity) != null) {
+            throw new CandidateAlreadyRegisteredException(entity);
         }
 
-        RegisterCandidateLocal registerCandidate = new RegisterCandidateLocal(candidate, entity);
+        RegisterCandidateLocal registerCandidate = new RegisterCandidateLocal(entity);
 
         LOG.debug("Registering candidate with message: {}", registerCandidate);
 
         executeLocalEntityOwnershipShardOperation(registerCandidate);
-        return new DistributedEntityOwnershipCandidateRegistration(candidate, entity, this);
+        return new DistributedEntityOwnershipCandidateRegistration(entity, this);
     }
 
     void unregisterCandidate(Entity entity) {
@@ -129,9 +137,22 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     }
 
     @Override
-    public EntityOwnershipListenerRegistration registerListener(Entity entity, EntityOwnershipListener listener) {
-        // TODO Auto-generated method stub
-        return null;
+    public EntityOwnershipListenerRegistration registerListener(String entityType, EntityOwnershipListener listener) {
+        Preconditions.checkNotNull(entityType, "entityType cannot be null");
+        Preconditions.checkNotNull(listener, "listener cannot be null");
+
+        RegisterListenerLocal registerListener = new RegisterListenerLocal(listener, entityType);
+
+        LOG.debug("Registering listener with message: {}", registerListener);
+
+        executeLocalEntityOwnershipShardOperation(registerListener);
+        return new DistributedEntityOwnershipListenerRegistration(listener, entityType, this);
+    }
+
+    void unregisterListener(String entityType, EntityOwnershipListener listener) {
+        LOG.debug("Unregistering listener {} for entity type {}", listener, entityType);
+
+        executeLocalEntityOwnershipShardOperation(new UnregisterListenerLocal(listener, entityType));
     }
 
     @Override