2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.BiMap;
12 import com.google.common.collect.ImmutableBiMap;
13 import com.google.common.collect.ImmutableBiMap.Builder;
14 import java.util.concurrent.CompletionStage;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import javax.annotation.concurrent.GuardedBy;
18 import javax.annotation.concurrent.ThreadSafe;
19 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
20 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
21 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
22 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
23 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
29 * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding
30 * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}.
32 * @author Robert Varga
35 final class ModuleShardBackendResolver extends AbstractShardBackendResolver {
36 private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
38 private final ConcurrentMap<Long, ShardState> backends = new ConcurrentHashMap<>();
39 private final ActorContext actorContext;
42 private long nextShard = 1;
44 private volatile BiMap<String, Long> shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L);
46 // FIXME: we really need just ActorContext.findPrimaryShardAsync()
47 ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
48 super(clientId, actorContext);
49 this.actorContext = Preconditions.checkNotNull(actorContext);
52 Long resolveShardForPath(final YangInstanceIdentifier path) {
53 final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
54 Long cookie = shards.get(shardName);
57 cookie = shards.get(shardName);
61 Builder<String, Long> builder = ImmutableBiMap.builder();
62 builder.putAll(shards);
63 builder.put(shardName, cookie);
64 shards = builder.build();
74 public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
76 * We cannot perform a simple computeIfAbsent() here because we need to control sequencing of when the state
77 * is inserted into the map and retired from it (based on the stage result).
79 * We do not want to hook another stage one processing completes and hooking a removal on failure from a compute
80 * method runs the inherent risk of stage completing before the insertion does (i.e. we have a removal of
81 * non-existent element.
83 final ShardState existing = backends.get(cookie);
84 if (existing != null) {
85 return existing.getStage();
88 final String shardName = shards.inverse().get(cookie);
89 if (shardName == null) {
90 LOG.warn("Failing request for non-existent cookie {}", cookie);
91 throw new IllegalArgumentException("Cookie " + cookie + " does not have a shard assigned");
94 LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
95 final ShardState toInsert = resolveBackendInfo(shardName, cookie);
97 final ShardState raced = backends.putIfAbsent(cookie, toInsert);
99 // We have had a concurrent insertion, return that
100 LOG.debug("Race during insertion of state for cookie {} shard {}", cookie, shardName);
101 return raced.getStage();
104 // We have succeeded in populating the map, now we need to take care of pruning the entry if it fails to
106 final CompletionStage<ShardBackendInfo> stage = toInsert.getStage();
107 stage.whenComplete((info, failure) -> {
108 if (failure != null) {
109 LOG.debug("Resolution of cookie {} shard {} failed, removing state", cookie, shardName, failure);
110 backends.remove(cookie, toInsert);
118 public CompletionStage<ShardBackendInfo> refreshBackendInfo(final Long cookie,
119 final ShardBackendInfo staleInfo) {
120 final ShardState existing = backends.get(cookie);
121 if (existing != null) {
122 if (!staleInfo.equals(existing.getResult())) {
123 return existing.getStage();
126 LOG.debug("Invalidating backend information {}", staleInfo);
127 flushCache(staleInfo.getShardName());
129 LOG.trace("Invalidated cache %s", staleInfo);
130 backends.remove(cookie, existing);
133 return getBackendInfo(cookie);