9b5942c6283c2a0944a338589ded1220b741aa5e
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractShardBackendResolver.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Preconditions;
13 import com.google.common.primitives.UnsignedLong;
14 import java.util.concurrent.CompletableFuture;
15 import java.util.concurrent.CompletionStage;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.TimeoutException;
18 import java.util.concurrent.atomic.AtomicLong;
19 import javax.annotation.Nonnull;
20 import javax.annotation.Nullable;
21 import javax.annotation.concurrent.GuardedBy;
22 import javax.annotation.concurrent.ThreadSafe;
23 import org.opendaylight.controller.cluster.access.ABIVersion;
24 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
25 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
26 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
27 import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
28 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
29 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
30 import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
31 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
32 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
33 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
34 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import scala.Function1;
39 import scala.compat.java8.FutureConverters;
40
41 /**
42  * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
43  * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding
44  * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}.
45  *
46  * @author Robert Varga
47  */
48 @ThreadSafe
49 abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
50     static final class ShardState {
51         private final CompletionStage<ShardBackendInfo> stage;
52         @GuardedBy("this")
53         private ShardBackendInfo result;
54
55         ShardState(final CompletionStage<ShardBackendInfo> stage) {
56             this.stage = Preconditions.checkNotNull(stage);
57             stage.whenComplete(this::onStageResolved);
58         }
59
60         @Nonnull CompletionStage<ShardBackendInfo> getStage() {
61             return stage;
62         }
63
64         @Nullable synchronized ShardBackendInfo getResult() {
65             return result;
66         }
67
68         private synchronized void onStageResolved(final ShardBackendInfo info, final Throwable failure) {
69             if (failure == null) {
70                 this.result = Preconditions.checkNotNull(info);
71             } else {
72                 LOG.warn("Failed to resolve shard", failure);
73             }
74         }
75     }
76
77     private static final Logger LOG = LoggerFactory.getLogger(AbstractShardBackendResolver.class);
78
79     /**
80      * Connect request timeout. If the shard does not respond within this interval, we retry the lookup and connection.
81      */
82     // TODO: maybe make this configurable somehow?
83     private static final Timeout CONNECT_TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
84
85     private final AtomicLong nextSessionId = new AtomicLong();
86     private final Function1<ActorRef, ?> connectFunction;
87     private final ActorContext actorContext;
88
89     // FIXME: we really need just ActorContext.findPrimaryShardAsync()
90     AbstractShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
91         this.actorContext = Preconditions.checkNotNull(actorContext);
92         this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
93             ABIVersion.current()));
94     }
95
96     protected final void flushCache(final String shardName) {
97         actorContext.getPrimaryShardInfoCache().remove(shardName);
98     }
99
100     protected final ShardState resolveBackendInfo(final String shardName, final long cookie) {
101         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
102
103         final CompletableFuture<ShardBackendInfo> future = new CompletableFuture<>();
104         FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).whenComplete((info, failure) -> {
105             if (failure == null) {
106                 connectShard(shardName, cookie, info, future);
107                 return;
108             }
109
110             LOG.debug("Shard {} failed to resolve", shardName, failure);
111             if (failure instanceof NoShardLeaderException) {
112                 future.completeExceptionally(wrap("Shard has no current leader", failure));
113             } else if (failure instanceof NotInitializedException) {
114                 // FIXME: this actually is an exception we can retry on
115                 LOG.info("Shard {} has not initialized yet", shardName);
116                 future.completeExceptionally(failure);
117             } else if (failure instanceof PrimaryNotFoundException) {
118                 LOG.info("Failed to find primary for shard {}", shardName);
119                 future.completeExceptionally(failure);
120             } else {
121                 future.completeExceptionally(failure);
122             }
123         });
124
125         return new ShardState(future);
126     }
127
128     private static TimeoutException wrap(final String message, final Throwable cause) {
129         final TimeoutException ret = new TimeoutException(message);
130         ret.initCause(Preconditions.checkNotNull(cause));
131         return ret;
132     }
133
134     private void connectShard(final String shardName, final long cookie, final PrimaryShardInfo info,
135             final CompletableFuture<ShardBackendInfo> future) {
136         LOG.debug("Shard {} resolved to {}, attempting to connect", shardName, info);
137
138         FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
139             .whenComplete((response, failure) -> {
140                 onConnectResponse(shardName, cookie, future, response, failure);
141             });
142     }
143
144     private void onConnectResponse(final String shardName, final long cookie,
145             final CompletableFuture<ShardBackendInfo> future, final Object response, final Throwable failure) {
146         if (failure != null) {
147             LOG.debug("Connect attempt to {} failed, will retry", shardName, failure);
148             future.completeExceptionally(wrap("Connection attempt failed", failure));
149             return;
150         }
151         if (response instanceof RequestFailure) {
152             final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
153             LOG.debug("Connect attempt to {} failed to process", shardName, cause);
154             final Throwable result = cause instanceof NotLeaderException
155                     ? wrap("Leader moved during establishment", cause) : cause;
156             future.completeExceptionally(result);
157             return;
158         }
159
160         LOG.debug("Resolved backend information to {}", response);
161         Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response %s",
162             response);
163         final ConnectClientSuccess success = (ConnectClientSuccess) response;
164         future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
165             success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
166             success.getMaxMessages()));
167     }
168 }