+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
+ return ask(actorRef, message, timeout);
+ }
+
+ protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
+ return ask(actorRef, message, timeout);
+ }
+
+ @VisibleForTesting
+ Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
+ return primaryShardInfoCache;
+ }
+
+ public <T extends ShardInfoListener> ShardInfoListenerRegistration<T> registerShardInfoListener(final T listener) {
+ final ShardInfoListenerRegistration<T> reg = new ShardInfoListenerRegistration<T>(listener, this);
+
+ synchronized (shardInfoListeners) {
+ shardInfoListeners.add(reg);
+ }
+ return reg;
+ }
+
+ protected void removeShardInfoListener(final ShardInfoListenerRegistration<?> registration) {
+ synchronized (shardInfoListeners) {
+ shardInfoListeners.remove(registration);
+ }
+ }
+
+ @Override
+ public void onRemoval(final RemovalNotification<String, Future<PrimaryShardInfo>> notification) {
+ synchronized (shardInfoListeners) {
+ for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
+ reg.getInstance().onShardInfoUpdated(notification.getKey(), null);
+ }
+ }
+ }