import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.WritableIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
/**
* An actor context associated with this {@link AbstractClientActor}.
*
+ * <p>
* Time-keeping in a client actor is based on monotonic time. The precision of this time can be expected to be the
* same as {@link System#nanoTime()}, but it is not tied to that particular clock. Actor clock is exposed as
* a {@link Ticker}, which can be obtained via {@link #ticker()}.
@Beta
@ThreadSafe
public class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
- private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class);
-
- private final Map<Long, SequencedQueue> queues = new ConcurrentHashMap<>();
- private final ClientIdentifier identifier;
private final ExecutionContext executionContext;
+ private final ClientIdentifier identifier;
private final Scheduler scheduler;
// Hidden to avoid subclassing
}
@Override
- public @Nonnull ClientIdentifier getIdentifier() {
+ @Nonnull
+ public ClientIdentifier getIdentifier() {
return identifier;
}
*
* @return Client actor time source
*/
- public @Nonnull Ticker ticker() {
+ @Nonnull
+ public Ticker ticker() {
return Ticker.systemTicker();
}
* Execute a command in the context of the client actor.
*
* @param command Block of code which needs to be execute
+ * @param <T> BackendInfo type
*/
- public void executeInActor(final @Nonnull InternalCommand command) {
+ public <T extends BackendInfo> void executeInActor(@Nonnull final InternalCommand<T> command) {
self().tell(Preconditions.checkNotNull(command), ActorRef.noSender());
}
- public Cancellable executeInActor(final @Nonnull InternalCommand command, final FiniteDuration delay) {
+ public <T extends BackendInfo> Cancellable executeInActor(@Nonnull final InternalCommand<T> command,
+ final FiniteDuration delay) {
return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command),
executionContext, ActorRef.noSender());
}
-
- SequencedQueue queueFor(final Long cookie) {
- return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker()));
- }
-
- void removeQueue(final SequencedQueue queue) {
- queues.remove(queue.getCookie(), queue);
- }
-
- ClientActorBehavior completeRequest(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
- final WritableIdentifier id = response.getMessage().getTarget();
-
- // FIXME: this will need to be updated for other Request/Response types to extract cookie
- Preconditions.checkArgument(id instanceof TransactionIdentifier);
- final TransactionIdentifier txId = (TransactionIdentifier) id;
-
- final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie());
- if (queue == null) {
- LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
- return current;
- } else {
- return queue.complete(current, response);
- }
- }
-
- void poison(final RequestException cause) {
- for (SequencedQueue q : queues.values()) {
- q.poison(cause);
- }
-
- queues.clear();
- }
}