BUG-5280: introduce DistributedDataStoreClientActor
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DistributedDataStore.java
index 0244eb37407d53a7de46a44427e4ef571cd5bf49..fe321d4cd2808bff7c6be29b67222b4876c8b2fe 100644 (file)
@@ -10,11 +10,17 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
@@ -51,7 +57,6 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
-    private static final String UNKNOWN_TYPE = "unknown";
 
     private static final long READY_WAIT_FACTOR = 3;
 
@@ -66,7 +71,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
     private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
 
-    private final String type;
+    private final ClientIdentifier identifier;
+    private final DistributedDataStoreClient client;
 
     private final TransactionContextFactory txContextFactory;
 
@@ -78,9 +84,22 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
 
-        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
+        final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+            datastoreContextFactory.getBaseDatastoreContext().getDataStoreName());
+        final ActorRef clientActor = actorSystem.actorOf(clientProps);
+        try {
+            client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error("Failed to get actor for {}", clientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            throw Throwables.propagate(e);
+        }
+
+        identifier = client.getIdentifier();
+        LOG.debug("Distributed data store client {} started", identifier);
 
-        String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
+        String shardManagerId = ShardManagerIdentifier.builder()
+                .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
@@ -112,10 +131,11 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     }
 
     @VisibleForTesting
-    DistributedDataStore(ActorContext actorContext) {
+    DistributedDataStore(ActorContext actorContext, ClientIdentifier identifier) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+        this.client = null;
+        this.identifier = Preconditions.checkNotNull(identifier);
         this.txContextFactory = TransactionContextFactory.create(actorContext);
-        this.type = UNKNOWN_TYPE;
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
     }
@@ -214,7 +234,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
     @Override
     public void close() {
-        LOG.info("Closing data store {}", type);
+        LOG.info("Closing data store {}", identifier);
 
         if (datastoreConfigMXBean != null) {
             datastoreConfigMXBean.unregisterMBean();
@@ -233,6 +253,10 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
         txContextFactory.close();
         actorContext.shutdown();
+
+        if (client != null) {
+            client.close();
+        }
     }
 
     @Override
@@ -241,11 +265,11 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     }
 
     public void waitTillReady(){
-        LOG.info("Beginning to wait for data store to become ready : {}", type);
+        LOG.info("Beginning to wait for data store to become ready : {}", identifier);
 
         try {
             if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
-                LOG.debug("Data store {} is now ready", type);
+                LOG.debug("Data store {} is now ready", identifier);
             } else {
                 LOG.error("Shared leaders failed to settle in {} seconds, giving up", TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
             }