2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.serialization.Serialization;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Strings;
15 import java.util.HashSet;
16 import java.util.Iterator;
18 import java.util.Objects;
19 import java.util.Optional;
21 import javax.annotation.Nullable;
22 import org.opendaylight.controller.cluster.access.concepts.MemberName;
23 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
24 import org.opendaylight.controller.cluster.datastore.Shard;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
27 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
28 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
29 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardInitialized;
30 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardReady;
31 import org.opendaylight.controller.cluster.raft.RaftState;
32 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 final class ShardInformation {
38 private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
40 private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
41 private final Map<String, String> initialPeerAddresses;
42 private final ShardPeerAddressResolver addressResolver;
43 private final ShardIdentifier shardId;
44 private final String shardName;
45 private ActorRef actor;
46 private Optional<DataTree> localShardDataTree;
47 private boolean leaderAvailable = false;
49 // flag that determines if the actor is ready for business
50 private boolean actorInitialized = false;
52 private boolean followerSyncStatus = false;
55 private String leaderId;
56 private short leaderVersion;
58 private DatastoreContext datastoreContext;
59 private Shard.AbstractBuilder<?, ?> builder;
60 private boolean isActiveMember = true;
62 ShardInformation(String shardName, ShardIdentifier shardId,
63 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
64 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
65 this.shardName = shardName;
66 this.shardId = shardId;
67 this.initialPeerAddresses = initialPeerAddresses;
68 this.datastoreContext = datastoreContext;
69 this.builder = builder;
70 this.addressResolver = addressResolver;
73 Props newProps(SchemaContext schemaContext) {
74 Preconditions.checkNotNull(builder);
75 Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
76 schemaContext(schemaContext).props();
81 String getShardName() {
90 void setActor(ActorRef actor) {
94 ShardIdentifier getShardId() {
98 void setLocalDataTree(Optional<DataTree> localShardDataTree) {
99 this.localShardDataTree = localShardDataTree;
102 Optional<DataTree> getLocalShardDataTree() {
103 return localShardDataTree;
106 DatastoreContext getDatastoreContext() {
107 return datastoreContext;
110 void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
111 this.datastoreContext = datastoreContext;
113 LOG.debug("Sending new DatastoreContext to {}", shardId);
114 actor.tell(this.datastoreContext, sender);
118 void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
119 LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
122 if(LOG.isDebugEnabled()) {
123 LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
124 peerId, peerAddress, actor.path());
127 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
130 notifyOnShardInitializedCallbacks();
133 void peerDown(MemberName memberName, String peerId, ActorRef sender) {
135 actor.tell(new PeerDown(memberName, peerId), sender);
139 void peerUp(MemberName memberName, String peerId, ActorRef sender) {
141 actor.tell(new PeerUp(memberName, peerId), sender);
145 boolean isShardReady() {
146 return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
149 boolean isShardReadyWithLeaderId() {
150 return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
151 !RaftState.PreLeader.name().equals(role) &&
152 (isLeader() || addressResolver.resolve(leaderId) != null);
155 boolean isShardInitialized() {
156 return getActor() != null && actorInitialized;
160 return Objects.equals(leaderId, shardId.toString());
163 String getSerializedLeaderActor() {
165 return Serialization.serializedActorPath(getActor());
167 return addressResolver.resolve(leaderId);
171 void setActorInitialized() {
172 LOG.debug("Shard {} is initialized", shardId);
174 this.actorInitialized = true;
176 notifyOnShardInitializedCallbacks();
179 private void notifyOnShardInitializedCallbacks() {
180 if(onShardInitializedSet.isEmpty()) {
184 boolean ready = isShardReadyWithLeaderId();
186 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
187 ready ? "ready" : "initialized", onShardInitializedSet.size());
189 Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
190 while(iter.hasNext()) {
191 OnShardInitialized onShardInitialized = iter.next();
192 if (!(onShardInitialized instanceof OnShardReady) || ready) {
194 onShardInitialized.getTimeoutSchedule().cancel();
195 onShardInitialized.getReplyRunnable().run();
200 void addOnShardInitialized(OnShardInitialized onShardInitialized) {
201 onShardInitializedSet.add(onShardInitialized);
204 void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
205 onShardInitializedSet.remove(onShardInitialized);
208 void setRole(String newRole) {
211 notifyOnShardInitializedCallbacks();
214 void setFollowerSyncStatus(boolean syncStatus){
215 this.followerSyncStatus = syncStatus;
219 if(RaftState.Follower.name().equals(this.role)){
220 return followerSyncStatus;
221 } else if(RaftState.Leader.name().equals(this.role)){
228 boolean setLeaderId(String leaderId) {
229 boolean changed = !Objects.equals(this.leaderId, leaderId);
230 this.leaderId = leaderId;
231 if(leaderId != null) {
232 this.leaderAvailable = true;
234 notifyOnShardInitializedCallbacks();
239 String getLeaderId() {
243 void setLeaderAvailable(boolean leaderAvailable) {
244 this.leaderAvailable = leaderAvailable;
246 if(leaderAvailable) {
247 notifyOnShardInitializedCallbacks();
251 short getLeaderVersion() {
252 return leaderVersion;
255 void setLeaderVersion(short leaderVersion) {
256 this.leaderVersion = leaderVersion;
259 boolean isActiveMember() {
260 return isActiveMember;
263 void setActiveMember(boolean isActiveMember) {
264 this.isActiveMember = isActiveMember;