-final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
- private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
-
- private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
- private final AtomicLong nextHistoryId = new AtomicLong(1);
- private final ModuleShardBackendResolver resolver;
- private final SingleClientHistory singleHistory;
-
- private volatile Throwable aborted;
-
- DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
- super(context);
- resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext);
- singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
- }
-
- //
- //
- // Methods below are invoked from the client actor thread
- //
- //
-
- @Override
- protected void haltClient(final Throwable cause) {
- // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up
- // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
- // thread.
- if (aborted != null) {
- abortOperations(cause);
- }
- }
-
- private void abortOperations(final Throwable cause) {
- // This acts as a barrier, application threads check this after they have added an entry in the maps,
- // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
- aborted = cause;
-
- for (ClientLocalHistory h : histories.values()) {
- h.localAbort(cause);
- }
- histories.clear();
- }
-
- private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
- abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
- return null;
- }