+ public void setSchemaContext(SchemaContext schemaContext) {
+ this.schemaContext = schemaContext;
+
+ if(shardManager != null) {
+ shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+ }
+ }
+
+ public void setDatastoreContext(DatastoreContextFactory contextFactory) {
+ this.datastoreContext = contextFactory.getBaseDatastoreContext();
+ setCachedProperties();
+
+ // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
+ // will be published immediately even though they may not be immediately visible to other
+ // threads due to unsynchronized reads. That's OK though - we're going for eventual
+ // consistency here as immediately visible updates to these members aren't critical. These
+ // members could've been made volatile but wanted to avoid volatile reads as these are
+ // accessed often and updates will be infrequent.
+
+ updated = true;
+
+ if(shardManager != null) {
+ shardManager.tell(contextFactory, ActorRef.noSender());
+ }
+ }
+
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
+
+ public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
+ Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
+ if(ret != null){
+ return ret;
+ }
+ Future<Object> future = executeOperationAsync(shardManager,
+ new FindPrimary(shardName, true), shardInitializationTimeout);
+
+ return future.transform(new Mapper<Object, PrimaryShardInfo>() {
+ @Override
+ public PrimaryShardInfo checkedApply(Object response) throws Exception {
+ if(response instanceof RemotePrimaryShardFound) {
+ LOG.debug("findPrimaryShardAsync received: {}", response);
+ RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
+ return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
+ } else if(response instanceof LocalPrimaryShardFound) {
+ LOG.debug("findPrimaryShardAsync received: {}", response);
+ LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
+ return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
+ found.getLocalShardDataTree());
+ } else if(response instanceof NotInitializedException) {
+ throw (NotInitializedException)response;
+ } else if(response instanceof PrimaryNotFoundException) {
+ throw (PrimaryNotFoundException)response;
+ } else if(response instanceof NoShardLeaderException) {
+ throw (NoShardLeaderException)response;
+ }
+
+ throw new UnknownMessageException(String.format(
+ "FindPrimary returned unkown response: %s", response));
+ }
+ }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
+ }
+
+ private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
+ short primaryVersion, DataTree localShardDataTree) {
+ ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
+ PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, primaryVersion,
+ Optional.fromNullable(localShardDataTree));
+ primaryShardInfoCache.putSuccessful(shardName, info);
+ return info;
+ }