import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
- private static final String UNKNOWN_TYPE = "unknown";
private static final long READY_WAIT_FACTOR = 3;
private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
- private final String type;
+ private final ClientIdentifier identifier;
+ private final DistributedDataStoreClient client;
private final TransactionContextFactory txContextFactory;
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
- this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
+ final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreName());
+ final ActorRef clientActor = actorSystem.actorOf(clientProps);
+ try {
+ client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to get actor for {}", clientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ throw Throwables.propagate(e);
+ }
+
+ identifier = client.getIdentifier();
+ LOG.debug("Distributed data store client {} started", identifier);
- String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
+ String shardManagerId = ShardManagerIdentifier.builder()
+ .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
LOG.info("Creating ShardManager : {}", shardManagerId);
}
@VisibleForTesting
- DistributedDataStore(ActorContext actorContext) {
+ DistributedDataStore(ActorContext actorContext, ClientIdentifier identifier) {
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+ this.client = null;
+ this.identifier = Preconditions.checkNotNull(identifier);
this.txContextFactory = TransactionContextFactory.create(actorContext);
- this.type = UNKNOWN_TYPE;
this.waitTillReadyTimeInMillis =
actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
}
@Override
public void close() {
- LOG.info("Closing data store {}", type);
+ LOG.info("Closing data store {}", identifier);
if (datastoreConfigMXBean != null) {
datastoreConfigMXBean.unregisterMBean();
txContextFactory.close();
actorContext.shutdown();
+
+ if (client != null) {
+ client.close();
+ }
}
@Override
}
public void waitTillReady(){
- LOG.info("Beginning to wait for data store to become ready : {}", type);
+ LOG.info("Beginning to wait for data store to become ready : {}", identifier);
try {
if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
- LOG.debug("Data store {} is now ready", type);
+ LOG.debug("Data store {} is now ready", identifier);
} else {
LOG.error("Shared leaders failed to settle in {} seconds, giving up", TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
}