004df590fe2f2c7fe0c9d75a4d9a2cfd9975c918
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ModuleShardBackendResolver.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static akka.pattern.Patterns.ask;
11
12 import akka.dispatch.ExecutionContexts;
13 import akka.dispatch.OnComplete;
14 import akka.util.Timeout;
15 import com.google.common.collect.BiMap;
16 import com.google.common.collect.ImmutableBiMap;
17 import com.google.common.collect.ImmutableBiMap.Builder;
18 import java.util.concurrent.CompletionStage;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.TimeUnit;
22 import javax.annotation.concurrent.GuardedBy;
23 import javax.annotation.concurrent.ThreadSafe;
24 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
25 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
26 import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges;
27 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
28 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
29 import org.opendaylight.yangtools.concepts.Registration;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import scala.concurrent.Future;
34
35 /**
36  * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
37  * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding
38  * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}.
39  *
40  * @author Robert Varga
41  */
42 @ThreadSafe
43 final class ModuleShardBackendResolver extends AbstractShardBackendResolver {
44     private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
45
46     private final ConcurrentMap<Long, ShardState> backends = new ConcurrentHashMap<>();
47
48     private final Future<Registration> shardAvailabilityChangesRegFuture;
49
50     @GuardedBy("this")
51     private long nextShard = 1;
52
53     private volatile BiMap<String, Long> shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L);
54
55     // FIXME: we really need just ActorContext.findPrimaryShardAsync()
56     ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
57         super(clientId, actorContext);
58
59         shardAvailabilityChangesRegFuture = ask(actorContext.getShardManager(), new RegisterForShardAvailabilityChanges(
60             this::onShardAvailabilityChange), Timeout.apply(60, TimeUnit.MINUTES))
61                 .map(reply -> (Registration)reply, ExecutionContexts.global());
62
63         shardAvailabilityChangesRegFuture.onComplete(new OnComplete<Registration>() {
64             @Override
65             public void onComplete(Throwable failure, Registration reply) {
66                 if (failure != null) {
67                     LOG.error("RegisterForShardAvailabilityChanges failed", failure);
68                 }
69             }
70         }, ExecutionContexts.global());
71     }
72
73     private void onShardAvailabilityChange(String shardName) {
74         LOG.debug("onShardAvailabilityChange for {}", shardName);
75
76         Long cookie = shards.get(shardName);
77         if (cookie == null) {
78             LOG.debug("No shard cookie found for {}", shardName);
79             return;
80         }
81
82         notifyStaleBackendInfoCallbacks(cookie);
83     }
84
85     Long resolveShardForPath(final YangInstanceIdentifier path) {
86         final String shardName = actorContext().getShardStrategyFactory().getStrategy(path).findShard(path);
87         Long cookie = shards.get(shardName);
88         if (cookie == null) {
89             synchronized (this) {
90                 cookie = shards.get(shardName);
91                 if (cookie == null) {
92                     cookie = nextShard++;
93
94                     Builder<String, Long> builder = ImmutableBiMap.builder();
95                     builder.putAll(shards);
96                     builder.put(shardName, cookie);
97                     shards = builder.build();
98                 }
99             }
100         }
101
102         return cookie;
103     }
104
105     @Override
106     public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
107         /*
108          * We cannot perform a simple computeIfAbsent() here because we need to control sequencing of when the state
109          * is inserted into the map and retired from it (based on the stage result).
110          *
111          * We do not want to hook another stage one processing completes and hooking a removal on failure from a compute
112          * method runs the inherent risk of stage completing before the insertion does (i.e. we have a removal of
113          * non-existent element.
114          */
115         final ShardState existing = backends.get(cookie);
116         if (existing != null) {
117             return existing.getStage();
118         }
119
120         final String shardName = shards.inverse().get(cookie);
121         if (shardName == null) {
122             LOG.warn("Failing request for non-existent cookie {}", cookie);
123             throw new IllegalArgumentException("Cookie " + cookie + " does not have a shard assigned");
124         }
125
126         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
127         final ShardState toInsert = resolveBackendInfo(shardName, cookie);
128
129         final ShardState raced = backends.putIfAbsent(cookie, toInsert);
130         if (raced != null) {
131             // We have had a concurrent insertion, return that
132             LOG.debug("Race during insertion of state for cookie {} shard {}", cookie, shardName);
133             return raced.getStage();
134         }
135
136         // We have succeeded in populating the map, now we need to take care of pruning the entry if it fails to
137         // complete
138         final CompletionStage<ShardBackendInfo> stage = toInsert.getStage();
139         stage.whenComplete((info, failure) -> {
140             if (failure != null) {
141                 LOG.debug("Resolution of cookie {} shard {} failed, removing state", cookie, shardName, failure);
142                 backends.remove(cookie, toInsert);
143
144                 // Remove cache state in case someone else forgot to invalidate it
145                 flushCache(shardName);
146             }
147         });
148
149         return stage;
150     }
151
152     @Override
153     public CompletionStage<ShardBackendInfo> refreshBackendInfo(final Long cookie,
154             final ShardBackendInfo staleInfo) {
155         final ShardState existing = backends.get(cookie);
156         if (existing != null) {
157             if (!staleInfo.equals(existing.getResult())) {
158                 return existing.getStage();
159             }
160
161             LOG.debug("Invalidating backend information {}", staleInfo);
162             flushCache(staleInfo.getShardName());
163
164             LOG.trace("Invalidated cache {}", staleInfo);
165             backends.remove(cookie, existing);
166         }
167
168         return getBackendInfo(cookie);
169     }
170
171     @Override
172     public void close() {
173         shardAvailabilityChangesRegFuture.onComplete(new OnComplete<Registration>() {
174             @Override
175             public void onComplete(Throwable failure, Registration reply) {
176                 reply.close();
177             }
178         }, ExecutionContexts.global());
179     }
180 }