2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.serialization.Serialization;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Preconditions;
17 import com.google.common.base.Strings;
18 import java.util.HashSet;
19 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.controller.cluster.access.concepts.MemberName;
26 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
27 import org.opendaylight.controller.cluster.datastore.Shard;
28 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
29 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
30 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
31 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
32 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardInitialized;
33 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardReady;
34 import org.opendaylight.controller.cluster.raft.RaftState;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 final class ShardInformation {
41 private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
43 private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
44 private final Map<String, String> initialPeerAddresses;
45 private final ShardPeerAddressResolver addressResolver;
46 private final ShardIdentifier shardId;
47 private final String shardName;
49 // This reference indirection is required to have the ability to update the SchemaContext
50 // inside actor props. Otherwise we would be keeping an old SchemaContext there, preventing
51 // it from becoming garbage.
52 private final AtomicShardContextProvider schemaContextProvider = new AtomicShardContextProvider();
53 private ActorRef actor;
55 private Optional<ReadOnlyDataTree> localShardDataTree;
56 private boolean leaderAvailable = false;
58 // flag that determines if the actor is ready for business
59 private boolean actorInitialized = false;
61 private boolean followerSyncStatus = false;
64 private String leaderId;
65 private short leaderVersion;
67 private DatastoreContext datastoreContext;
68 private Shard.AbstractBuilder<?, ?> builder;
69 private boolean activeMember = true;
71 ShardInformation(final String shardName, final ShardIdentifier shardId,
72 final Map<String, String> initialPeerAddresses, final DatastoreContext datastoreContext,
73 final Shard.AbstractBuilder<?, ?> builder, final ShardPeerAddressResolver addressResolver) {
74 this.shardName = shardName;
75 this.shardId = shardId;
76 this.initialPeerAddresses = initialPeerAddresses;
77 this.datastoreContext = datastoreContext;
78 this.builder = builder;
79 this.addressResolver = addressResolver;
83 Props props = requireNonNull(builder).id(shardId).peerAddresses(initialPeerAddresses)
84 .datastoreContext(datastoreContext).schemaContextProvider(schemaContextProvider).props();
89 String getShardName() {
93 @Nullable ActorRef getActor() {
97 void setActor(final ActorRef actor) {
101 ShardIdentifier getShardId() {
105 void setLocalDataTree(final Optional<ReadOnlyDataTree> dataTree) {
106 this.localShardDataTree = dataTree;
109 Optional<ReadOnlyDataTree> getLocalShardDataTree() {
110 return localShardDataTree;
113 DatastoreContext getDatastoreContext() {
114 return datastoreContext;
117 void setDatastoreContext(final DatastoreContext newDatastoreContext, final ActorRef sender) {
118 this.datastoreContext = newDatastoreContext;
120 LOG.debug("Sending new DatastoreContext to {}", shardId);
121 actor.tell(this.datastoreContext, sender);
125 void updatePeerAddress(final String peerId, final String peerAddress, final ActorRef sender) {
126 LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
129 LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", peerId,
130 peerAddress, actor.path());
132 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
135 notifyOnShardInitializedCallbacks();
138 void peerDown(final MemberName memberName, final String peerId, final ActorRef sender) {
140 actor.tell(new PeerDown(memberName, peerId), sender);
144 void peerUp(final MemberName memberName, final String peerId, final ActorRef sender) {
146 actor.tell(new PeerUp(memberName, peerId), sender);
150 boolean isShardReady() {
151 return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
154 boolean isShardReadyWithLeaderId() {
155 return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role)
156 && !RaftState.PreLeader.name().equals(role)
157 && (isLeader() || addressResolver.resolve(leaderId) != null);
160 boolean isShardInitialized() {
161 return getActor() != null && actorInitialized;
165 return Objects.equals(leaderId, shardId.toString());
168 String getSerializedLeaderActor() {
170 return Serialization.serializedActorPath(getActor());
172 return addressResolver.resolve(leaderId);
176 void setActorInitialized() {
177 LOG.debug("Shard {} is initialized", shardId);
179 this.actorInitialized = true;
181 notifyOnShardInitializedCallbacks();
184 private void notifyOnShardInitializedCallbacks() {
185 if (onShardInitializedSet.isEmpty()) {
189 boolean ready = isShardReadyWithLeaderId();
191 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
192 ready ? "ready" : "initialized", onShardInitializedSet.size());
194 Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
195 while (iter.hasNext()) {
196 OnShardInitialized onShardInitialized = iter.next();
197 if (!(onShardInitialized instanceof OnShardReady) || ready) {
199 onShardInitialized.getTimeoutSchedule().cancel();
200 onShardInitialized.getReplyRunnable().run();
205 void addOnShardInitialized(final OnShardInitialized onShardInitialized) {
206 onShardInitializedSet.add(onShardInitialized);
209 void removeOnShardInitialized(final OnShardInitialized onShardInitialized) {
210 onShardInitializedSet.remove(onShardInitialized);
213 void setRole(final String newRole) {
216 notifyOnShardInitializedCallbacks();
223 void setFollowerSyncStatus(final boolean syncStatus) {
224 this.followerSyncStatus = syncStatus;
228 if (RaftState.Follower.name().equals(this.role)) {
229 return followerSyncStatus;
230 } else if (RaftState.Leader.name().equals(this.role)) {
237 boolean setLeaderId(final String newLeaderId) {
238 final boolean changed = !Objects.equals(this.leaderId, newLeaderId);
239 this.leaderId = newLeaderId;
240 if (newLeaderId != null) {
241 this.leaderAvailable = true;
243 notifyOnShardInitializedCallbacks();
248 String getLeaderId() {
252 void setLeaderAvailable(final boolean leaderAvailable) {
253 this.leaderAvailable = leaderAvailable;
255 if (leaderAvailable) {
256 notifyOnShardInitializedCallbacks();
260 short getLeaderVersion() {
261 return leaderVersion;
264 void setLeaderVersion(final short leaderVersion) {
265 this.leaderVersion = leaderVersion;
268 boolean isActiveMember() {
272 void setActiveMember(final boolean isActiveMember) {
273 this.activeMember = isActiveMember;
276 SchemaContext getSchemaContext() {
277 return schemaContextProvider.getSchemaContext();
280 void setSchemaContext(final SchemaContext schemaContext) {
281 schemaContextProvider.set(Preconditions.checkNotNull(schemaContext));
285 Shard.AbstractBuilder<?, ?> getBuilder() {
290 public String toString() {
291 return "ShardInformation [shardId=" + shardId + ", leaderAvailable=" + leaderAvailable + ", actorInitialized="
292 + actorInitialized + ", followerSyncStatus=" + followerSyncStatus + ", role=" + role + ", leaderId="
293 + leaderId + ", activeMember=" + activeMember + "]";