BUG-5280: add AbstractClientConnection
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / DistributedDataStoreClientBehavior.java
index 9940ae57f32858f042084440341d56a660dcb89b..bc393a4c0f9cdffe61e0a49525edebcef92a24e1 100644 (file)
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import com.google.common.base.Throwables;
-import com.google.common.base.Verify;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
+import java.util.function.Function;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
-import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 /**
- * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
- * frontend.
- *
- * <p>
- * This class is not visible outside of this package because it breaks the actor containment. Services provided to
- * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
- *
- * <p>
- * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
- *            When touching internal state, be mindful of the execution context from which execution context, Actor
- *            or POJO, is the state being accessed or modified.
- *
- * <p>
- * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
- *                threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
- *                doubt, feel free to synchronize on this object.
- *
- * <p>
- * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
- *              performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
- *              for correctness and performance impact.
- *
- * <p>
- * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
- *             for performing work and charging applications for it. That has two positive effects:
- *             - CPU usage is distributed across applications, minimizing work done in the actor thread
- *             - CPU usage provides back-pressure towards the application.
+ * {@link AbstractDataStoreClientBehavior} which performs module-based sharding.
  *
  * @author Robert Varga
  */
-final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
-    private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
-
-    private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
-    private final AtomicLong nextHistoryId = new AtomicLong(1);
-    private final ModuleShardBackendResolver resolver;
-    private final SingleClientHistory singleHistory;
-
-    private volatile Throwable aborted;
-
-    DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
-        super(context);
-        resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext);
-        singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
-    }
-
-    //
-    //
-    // Methods below are invoked from the client actor thread
-    //
-    //
-
-    @Override
-    protected void haltClient(final Throwable cause) {
-        // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up
-        // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
-        // thread.
-        if (aborted != null) {
-            abortOperations(cause);
-        }
-    }
-
-    private void abortOperations(final Throwable cause) {
-        // This acts as a barrier, application threads check this after they have added an entry in the maps,
-        // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
-        aborted = cause;
-
-        for (ClientLocalHistory h : histories.values()) {
-            h.localAbort(cause);
-        }
-        histories.clear();
-    }
-
-    private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
-        abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
-        return null;
-    }
+final class DistributedDataStoreClientBehavior extends AbstractDataStoreClientBehavior {
+    private final Function<YangInstanceIdentifier, Long> pathToShard;
 
-    @Override
-    protected DistributedDataStoreClientBehavior onCommand(final Object command) {
-        if (command instanceof GetClientRequest) {
-            ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
-        } else {
-            LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
-        }
-
-        return this;
-    }
-
-    //
-    //
-    // Methods below are invoked from application threads
-    //
-    //
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private static <K, V extends LocalAbortable> V returnIfOperational(final Map<K , V> map, final K key, final V value,
-            final Throwable aborted) {
-        Verify.verify(map.put(key, value) == null);
-
-        if (aborted != null) {
-            try {
-                value.localAbort(aborted);
-            } catch (Exception e) {
-                LOG.debug("Close of {} failed", value, e);
-            }
-            map.remove(key, value);
-            throw Throwables.propagate(aborted);
-        }
-
-        return value;
+    private DistributedDataStoreClientBehavior(final ClientActorContext context,
+            final ModuleShardBackendResolver resolver) {
+        super(context, resolver);
+        pathToShard = resolver::resolveShardForPath;
     }
 
-    @Override
-    public ClientLocalHistory createLocalHistory() {
-        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
-            nextHistoryId.getAndIncrement());
-        final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
-        LOG.debug("{}: creating a new local history {}", persistenceId(), history);
-
-        return returnIfOperational(histories, historyId, history, aborted);
-    }
-
-    @Override
-    public ClientTransaction createTransaction() {
-        return singleHistory.createTransaction();
-    }
-
-    @Override
-    public void close() {
-        context().executeInActor(this::shutdown);
+    DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
+        this(context, new ModuleShardBackendResolver(context.getIdentifier(), actorContext));
     }
 
     @Override
-    protected ModuleShardBackendResolver resolver() {
-        return resolver;
-    }
-
-    void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
-        sendRequest(request, response -> {
-            completer.accept(response);
-            return this;
-        });
+    Long resolveShardForPath(final YangInstanceIdentifier path) {
+        return pathToShard.apply(path);
     }
-
 }