ee549d36118e8f026f1ed6e83c80b842fe5bbf50
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ModuleShardBackendResolver.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.BiMap;
12 import com.google.common.collect.ImmutableBiMap;
13 import com.google.common.collect.ImmutableBiMap.Builder;
14 import java.util.concurrent.CompletionStage;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import javax.annotation.concurrent.GuardedBy;
18 import javax.annotation.concurrent.ThreadSafe;
19 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
20 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
21 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
22 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
23 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
29  * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding
30  * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}.
31  *
32  * @author Robert Varga
33  */
34 @ThreadSafe
35 final class ModuleShardBackendResolver extends AbstractShardBackendResolver {
36     private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
37
38     private final ConcurrentMap<Long, ShardState> backends = new ConcurrentHashMap<>();
39     private final ActorContext actorContext;
40
41     @GuardedBy("this")
42     private long nextShard = 1;
43
44     private volatile BiMap<String, Long> shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L);
45
46     // FIXME: we really need just ActorContext.findPrimaryShardAsync()
47     ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
48         super(clientId, actorContext);
49         this.actorContext = Preconditions.checkNotNull(actorContext);
50     }
51
52     Long resolveShardForPath(final YangInstanceIdentifier path) {
53         final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
54         Long cookie = shards.get(shardName);
55         if (cookie == null) {
56             synchronized (this) {
57                 cookie = shards.get(shardName);
58                 if (cookie == null) {
59                     cookie = nextShard++;
60
61                     Builder<String, Long> builder = ImmutableBiMap.builder();
62                     builder.putAll(shards);
63                     builder.put(shardName, cookie);
64                     shards = builder.build();
65                 }
66             }
67         }
68
69         return cookie;
70     }
71
72
73     @Override
74     public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
75         /*
76          * We cannot perform a simple computeIfAbsent() here because we need to control sequencing of when the state
77          * is inserted into the map and retired from it (based on the stage result).
78          *
79          * We do not want to hook another stage one processing completes and hooking a removal on failure from a compute
80          * method runs the inherent risk of stage completing before the insertion does (i.e. we have a removal of
81          * non-existent element.
82          */
83         final ShardState existing = backends.get(cookie);
84         if (existing != null) {
85             return existing.getStage();
86         }
87
88         final String shardName = shards.inverse().get(cookie);
89         if (shardName == null) {
90             LOG.warn("Failing request for non-existent cookie {}", cookie);
91             throw new IllegalArgumentException("Cookie " + cookie + " does not have a shard assigned");
92         }
93
94         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
95         final ShardState toInsert = resolveBackendInfo(shardName, cookie);
96
97         final ShardState raced = backends.putIfAbsent(cookie, toInsert);
98         if (raced != null) {
99             // We have had a concurrent insertion, return that
100             LOG.debug("Race during insertion of state for cookie {} shard {}", cookie, shardName);
101             return raced.getStage();
102         }
103
104         // We have succeeded in populating the map, now we need to take care of pruning the entry if it fails to
105         // complete
106         final CompletionStage<ShardBackendInfo> stage = toInsert.getStage();
107         stage.whenComplete((info, failure) -> {
108             if (failure != null) {
109                 LOG.debug("Resolution of cookie {} shard {} failed, removing state", cookie, shardName, failure);
110                 backends.remove(cookie, toInsert);
111
112                 // Remove cache state in case someone else forgot to invalidate it
113                 flushCache(shardName);
114             }
115         });
116
117         return stage;
118     }
119
120     @Override
121     public CompletionStage<ShardBackendInfo> refreshBackendInfo(final Long cookie,
122             final ShardBackendInfo staleInfo) {
123         final ShardState existing = backends.get(cookie);
124         if (existing != null) {
125             if (!staleInfo.equals(existing.getResult())) {
126                 return existing.getStage();
127             }
128
129             LOG.debug("Invalidating backend information {}", staleInfo);
130             flushCache(staleInfo.getShardName());
131
132             LOG.trace("Invalidated cache %s", staleInfo);
133             backends.remove(cookie, existing);
134         }
135
136         return getBackendInfo(cookie);
137     }
138 }