+ actorContext.acquireTxCreationPermit();
+ return new TransactionProxy(txContextFactory, TransactionType.READ_WRITE);
+ }
+
+ @Override
+ public void onGlobalContextUpdated(final SchemaContext schemaContext) {
+ actorContext.setSchemaContext(schemaContext);
+ }
+
+ @Override
+ public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) {
+ LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName());
+
+ actorContext.setDatastoreContext(contextFactory);
+ datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext());
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void close() {
+ LOG.info("Closing data store {}", identifier);
+
+ if (datastoreConfigMXBean != null) {
+ datastoreConfigMXBean.unregisterMBean();
+ }
+ if (datastoreInfoMXBean != null) {
+ datastoreInfoMXBean.unregisterMBean();
+ }
+
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOG.debug("Error closing instance", e);
+ }
+ }
+
+ txContextFactory.close();
+ actorContext.shutdown();
+
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ public ActorContext getActorContext() {
+ return actorContext;
+ }
+
+ public void waitTillReady() {
+ LOG.info("Beginning to wait for data store to become ready : {}", identifier);
+
+ try {
+ if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
+ LOG.debug("Data store {} is now ready", identifier);
+ } else {
+ LOG.error("Shard leaders failed to settle in {} seconds, giving up",
+ TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for shards to settle", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
+ final String shardDispatcher, final String shardManagerId) {
+ Exception lastException = null;
+
+ for (int i = 0; i < 100; i++) {
+ try {
+ return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
+ ActorContext.BOUNDED_MAILBOX), shardManagerId);
+ } catch (Exception e) {
+ lastException = e;
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying "
+ + "(retry count = {})", shardManagerId, e.getMessage(), i);
+ }
+ }
+
+ throw new IllegalStateException("Failed to create Shard Manager", lastException);