Bug 4105: Implement RegisterCandidate in EntityOwnershipShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / DistributedEntityOwnershipService.java
index ca315435cf9035abb3d68e956cc81323d3eda8bf..f51f57944321ce14887518097de7d3f4d8f2d907 100644 (file)
@@ -10,8 +10,13 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
@@ -35,6 +40,8 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     private static final Timeout MESSAGE_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
 
     private final DistributedDataStore datastore;
+    private final ConcurrentMap<Entity, EntityOwnershipCandidate> registeredEntities = new ConcurrentHashMap<>();
+    private volatile ActorRef localEntityOwnershipShard;
 
     public DistributedEntityOwnershipService(DistributedDataStore datastore) {
         this.datastore = datastore;
@@ -45,7 +52,7 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
 
         CreateShard createShard = new CreateShard(ENTITY_OWNERSHIP_SHARD_NAME,
                 datastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards(),
-                new EntityOwnershipShardPropsCreator(), null);
+                newShardPropsCreator(), null);
 
         Future<Object> createFuture = datastore.getActorContext().executeOperationAsync(shardManagerActor,
                 createShard, MESSAGE_TIMEOUT);
@@ -62,11 +69,63 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
         }, datastore.getActorContext().getClientDispatcher());
     }
 
+    private void executeEntityOwnershipShardOperation(final ActorRef shardActor, final Object message) {
+        Future<Object> future = datastore.getActorContext().executeOperationAsync(shardActor, message, MESSAGE_TIMEOUT);
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if(failure != null) {
+                    LOG.debug("Error sending message {} to {}", message, shardActor, failure);
+                    // TODO - queue for retry
+                } else {
+                    LOG.debug("{} message to {} succeeded", message, shardActor, failure);
+                }
+            }
+        }, datastore.getActorContext().getClientDispatcher());
+    }
+
+    private void executeLocalEntityOwnershipShardOperation(final Object message) {
+        if(localEntityOwnershipShard == null) {
+            Future<ActorRef> future = datastore.getActorContext().findLocalShardAsync(ENTITY_OWNERSHIP_SHARD_NAME);
+            future.onComplete(new OnComplete<ActorRef>() {
+                @Override
+                public void onComplete(Throwable failure, ActorRef shardActor) {
+                    if(failure != null) {
+                        LOG.error("Failed to find local {} shard", ENTITY_OWNERSHIP_SHARD_NAME, failure);
+                    } else {
+                        localEntityOwnershipShard = shardActor;
+                        executeEntityOwnershipShardOperation(localEntityOwnershipShard, message);
+                    }
+                }
+            }, datastore.getActorContext().getClientDispatcher());
+
+        } else {
+            executeEntityOwnershipShardOperation(localEntityOwnershipShard, message);
+        }
+    }
+
     @Override
     public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
             throws CandidateAlreadyRegisteredException {
-        // TODO Auto-generated method stub
-        return null;
+
+        EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity, candidate);
+        if(currentCandidate != null) {
+            throw new CandidateAlreadyRegisteredException(entity, currentCandidate);
+        }
+
+        RegisterCandidateLocal registerCandidate = new RegisterCandidateLocal(candidate, entity);
+
+        LOG.debug("Registering candidate with message: {}", registerCandidate);
+
+        executeLocalEntityOwnershipShardOperation(registerCandidate);
+        return new DistributedEntityOwnershipCandidateRegistration(candidate, entity, this);
+    }
+
+    void unregisterCandidate(Entity entity) {
+        LOG.debug("Unregistering candidate for {}", entity);
+
+        executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entity));
+        registeredEntities.remove(entity);
     }
 
     @Override
@@ -78,4 +137,13 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     @Override
     public void close() {
     }
+
+    protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
+        return new EntityOwnershipShardPropsCreator(datastore.getActorContext().getCurrentMemberName());
+    }
+
+    @VisibleForTesting
+    ActorRef getLocalEntityOwnershipShard() {
+        return localEntityOwnershipShard;
+    }
 }