BUG-5280: fix compilation after unrebased merge
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DistributedDataStore.java
index 0244eb37407d53a7de46a44427e4ef571cd5bf49..a925e93566e9dcff73fd33f1dab458dc2028cfdf 100644 (file)
@@ -10,11 +10,17 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
@@ -51,7 +57,6 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
-    private static final String UNKNOWN_TYPE = "unknown";
 
     private static final long READY_WAIT_FACTOR = 3;
 
@@ -66,21 +71,21 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
     private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
 
-    private final String type;
+    private final ClientIdentifier identifier;
+    private final DistributedDataStoreClient client;
 
     private final TransactionContextFactory txContextFactory;
 
-    public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
-            Configuration configuration, DatastoreContextFactory datastoreContextFactory,
-            DatastoreSnapshot restoreFromSnapshot) {
+    public DistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+            final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
+            final DatastoreSnapshot restoreFromSnapshot) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
 
-        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
-
-        String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
+        String shardManagerId = ShardManagerIdentifier.builder()
+                .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
@@ -96,10 +101,24 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
                 shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
 
+        final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+            datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext);
+        final ActorRef clientActor = actorSystem.actorOf(clientProps);
+        try {
+            client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error("Failed to get actor for {}", clientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            throw Throwables.propagate(e);
+        }
+
+        identifier = client.getIdentifier();
+        LOG.debug("Distributed data store client {} started", identifier);
+
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
 
-        this.txContextFactory = TransactionContextFactory.create(actorContext);
+        this.txContextFactory = new TransactionContextFactory(actorContext, identifier);
 
         datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
@@ -112,15 +131,16 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     }
 
     @VisibleForTesting
-    DistributedDataStore(ActorContext actorContext) {
+    DistributedDataStore(final ActorContext actorContext, final ClientIdentifier identifier) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
-        this.txContextFactory = TransactionContextFactory.create(actorContext);
-        this.type = UNKNOWN_TYPE;
+        this.client = null;
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.txContextFactory = new TransactionContextFactory(actorContext, identifier);
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
     }
 
-    public void setCloseable(AutoCloseable closeable) {
+    public void setCloseable(final AutoCloseable closeable) {
         this.closeable = closeable;
     }
 
@@ -128,8 +148,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     @Override
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
                                               ListenerRegistration<L> registerChangeListener(
-        final YangInstanceIdentifier path, L listener,
-        AsyncDataBroker.DataChangeScope scope) {
+        final YangInstanceIdentifier path, final L listener,
+        final AsyncDataBroker.DataChangeScope scope) {
 
         Preconditions.checkNotNull(path, "path should not be null");
         Preconditions.checkNotNull(listener, "listener should not be null");
@@ -146,7 +166,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     }
 
     @Override
-    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(YangInstanceIdentifier treeId, L listener) {
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
         Preconditions.checkNotNull(treeId, "treeId should not be null");
         Preconditions.checkNotNull(listener, "listener should not be null");
 
@@ -163,7 +183,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
     @Override
     public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
-            DOMDataTreeIdentifier subtree, C cohort) {
+            final DOMDataTreeIdentifier subtree, final C cohort) {
         YangInstanceIdentifier treeId =
                 Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier();
         Preconditions.checkNotNull(cohort, "listener should not be null");
@@ -200,12 +220,12 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     }
 
     @Override
-    public void onGlobalContextUpdated(SchemaContext schemaContext) {
+    public void onGlobalContextUpdated(final SchemaContext schemaContext) {
         actorContext.setSchemaContext(schemaContext);
     }
 
     @Override
-    public void onDatastoreContextUpdated(DatastoreContextFactory contextFactory) {
+    public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) {
         LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName());
 
         actorContext.setDatastoreContext(contextFactory);
@@ -214,7 +234,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
     @Override
     public void close() {
-        LOG.info("Closing data store {}", type);
+        LOG.info("Closing data store {}", identifier);
 
         if (datastoreConfigMXBean != null) {
             datastoreConfigMXBean.unregisterMBean();
@@ -233,6 +253,10 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
         txContextFactory.close();
         actorContext.shutdown();
+
+        if (client != null) {
+            client.close();
+        }
     }
 
     @Override
@@ -241,11 +265,11 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     }
 
     public void waitTillReady(){
-        LOG.info("Beginning to wait for data store to become ready : {}", type);
+        LOG.info("Beginning to wait for data store to become ready : {}", identifier);
 
         try {
             if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
-                LOG.debug("Data store {} is now ready", type);
+                LOG.debug("Data store {} is now ready", identifier);
             } else {
                 LOG.error("Shared leaders failed to settle in {} seconds, giving up", TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
             }
@@ -254,8 +278,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         }
     }
 
-    private static ActorRef createShardManager(ActorSystem actorSystem, ShardManagerCreator creator,
-            String shardDispatcher, String shardManagerId) {
+    private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
+            final String shardDispatcher, final String shardManagerId) {
         Exception lastException = null;
 
         for(int i=0;i<100;i++) {