Bug4105: Implement DistributedEntityOwnershipService#registerCandidate
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / DistributedEntityOwnershipService.java
index ca315435cf9035abb3d68e956cc81323d3eda8bf..90720eead6f180b4037356e2723a4f58910723a5 100644 (file)
@@ -10,8 +10,11 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.util.Timeout;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
@@ -35,6 +38,8 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     private static final Timeout MESSAGE_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
 
     private final DistributedDataStore datastore;
+    private final ConcurrentMap<Entity, EntityOwnershipCandidate> registeredEntities = new ConcurrentHashMap<>();
+    private volatile ActorRef localEntityOwnershipShard;
 
     public DistributedEntityOwnershipService(DistributedDataStore datastore) {
         this.datastore = datastore;
@@ -45,7 +50,7 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
 
         CreateShard createShard = new CreateShard(ENTITY_OWNERSHIP_SHARD_NAME,
                 datastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards(),
-                new EntityOwnershipShardPropsCreator(), null);
+                newShardPropsCreator(), null);
 
         Future<Object> createFuture = datastore.getActorContext().executeOperationAsync(shardManagerActor,
                 createShard, MESSAGE_TIMEOUT);
@@ -62,11 +67,56 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
         }, datastore.getActorContext().getClientDispatcher());
     }
 
+    private void executeEntityOwnershipShardOperation(final ActorRef shardActor, final Object message) {
+        Future<Object> future = datastore.getActorContext().executeOperationAsync(shardActor, message, MESSAGE_TIMEOUT);
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if(failure != null) {
+                    LOG.debug("Error sending message {} to {}", message, shardActor, failure);
+                    // TODO - queue for retry
+                } else {
+                    LOG.debug("{} message to {} succeeded", message, shardActor, failure);
+                }
+            }
+        }, datastore.getActorContext().getClientDispatcher());
+    }
+
+    private void executeLocalEntityOwnershipShardOperation(final Object message) {
+        if(localEntityOwnershipShard == null) {
+            Future<ActorRef> future = datastore.getActorContext().findLocalShardAsync(ENTITY_OWNERSHIP_SHARD_NAME);
+            future.onComplete(new OnComplete<ActorRef>() {
+                @Override
+                public void onComplete(Throwable failure, ActorRef shardActor) {
+                    if(failure != null) {
+                        LOG.error("Failed to find local {} shard", ENTITY_OWNERSHIP_SHARD_NAME, failure);
+                    } else {
+                        localEntityOwnershipShard = shardActor;
+                        executeEntityOwnershipShardOperation(localEntityOwnershipShard, message);
+                    }
+                }
+            }, datastore.getActorContext().getClientDispatcher());
+
+        } else {
+            executeEntityOwnershipShardOperation(localEntityOwnershipShard, message);
+        }
+    }
+
     @Override
     public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
             throws CandidateAlreadyRegisteredException {
-        // TODO Auto-generated method stub
-        return null;
+
+        EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity,candidate);
+        if(currentCandidate != null) {
+            throw new CandidateAlreadyRegisteredException(entity, currentCandidate);
+        }
+
+        RegisterCandidateLocal registerCandidate = new RegisterCandidateLocal(candidate, entity);
+
+        LOG.debug("Registering candidate with message: " + registerCandidate);
+
+        executeLocalEntityOwnershipShardOperation(registerCandidate);
+        return new DistributedEntityOwnershipCandidateRegistration(candidate, entity);
     }
 
     @Override
@@ -78,4 +128,8 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     @Override
     public void close() {
     }
+
+    protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
+        return new EntityOwnershipShardPropsCreator();
+    }
 }