4dfbc87eb9a2b1b7bcb3545e1e67dc2c03304c3a
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
11 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
13 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
14 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
15 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
16 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
17 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
18 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
19 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
20 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSelection;
23 import akka.actor.Props;
24 import akka.pattern.Patterns;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Strings;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.TimeUnit;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
35 import org.opendaylight.controller.cluster.datastore.Shard;
36 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
37 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
38 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
39 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
40 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
41 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
42 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
43 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
44 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
45 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
46 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
47 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
48 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
49 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
50 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
53 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
54 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
55 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
56 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
57 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
58 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
59 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
60 import scala.concurrent.Future;
61
62 /**
63  * Special Shard for EntityOwnership.
64  *
65  * @author Thomas Pantelis
66  */
67 class EntityOwnershipShard extends Shard {
68     private final String localMemberName;
69     private final EntityOwnershipShardCommitCoordinator commitCoordinator;
70     private final EntityOwnershipListenerSupport listenerSupport;
71     private final Set<String> downPeerMemberNames = new HashSet<>();
72     private final Map<String, String> peerIdToMemberNames = new HashMap<>();
73
74     private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
75         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
76     }
77
78     protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
79             DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
80         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
81         this.localMemberName = localMemberName;
82         this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
83         this.listenerSupport = new EntityOwnershipListenerSupport(getContext());
84
85         for(String peerId: peerAddresses.keySet()) {
86             ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
87             peerIdToMemberNames.put(peerId, shardId.getMemberName());
88         }
89     }
90
91     @Override
92     protected void onDatastoreContext(DatastoreContext context) {
93         super.onDatastoreContext(noPersistenceDatastoreContext(context));
94     }
95
96     @Override
97     protected void onRecoveryComplete() {
98         super.onRecoveryComplete();
99
100         new CandidateListChangeListener(getSelf()).init(getDataStore());
101         new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
102     }
103
104     @Override
105     public void onReceiveCommand(final Object message) throws Exception {
106         if(message instanceof RegisterCandidateLocal) {
107             onRegisterCandidateLocal((RegisterCandidateLocal)message);
108         } else if(message instanceof UnregisterCandidateLocal) {
109             onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
110         } else if(message instanceof CandidateAdded){
111             onCandidateAdded((CandidateAdded) message);
112         } else if(message instanceof CandidateRemoved){
113             onCandidateRemoved((CandidateRemoved) message);
114         } else if(message instanceof PeerDown) {
115             onPeerDown((PeerDown) message);
116         } else if(message instanceof PeerUp) {
117             onPeerUp((PeerUp) message);
118         } if(message instanceof RegisterListenerLocal) {
119             onRegisterListenerLocal((RegisterListenerLocal)message);
120         } if(message instanceof UnregisterListenerLocal) {
121             onUnregisterListenerLocal((UnregisterListenerLocal)message);
122         } else if(!commitCoordinator.handleMessage(message, this)) {
123             super.onReceiveCommand(message);
124         }
125     }
126
127     private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
128         LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate);
129
130         listenerSupport.addEntityOwnershipListener(registerCandidate.getEntity(), registerCandidate.getCandidate());
131
132         NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
133                 registerCandidate.getEntity().getId(), localMemberName);
134         commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
135
136         getSender().tell(SuccessReply.INSTANCE, getSelf());
137     }
138
139     private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
140         LOG.debug("{}: onUnregisterCandidateLocal: {}", persistenceId(), unregisterCandidate);
141
142         Entity entity = unregisterCandidate.getEntity();
143         listenerSupport.removeEntityOwnershipListener(entity, unregisterCandidate.getCandidate());
144
145         YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getId(), localMemberName);
146         commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
147
148         getSender().tell(SuccessReply.INSTANCE, getSelf());
149     }
150
151     private void onRegisterListenerLocal(final RegisterListenerLocal registerListener) {
152         LOG.debug("{}: onRegisterListenerLocal: {}", persistenceId(), registerListener);
153
154         listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());
155
156         getSender().tell(SuccessReply.INSTANCE, getSelf());
157
158         searchForEntitiesOwnedBy(localMemberName, new EntityWalker() {
159             @Override
160             public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
161                 Optional<DataContainerChild<? extends PathArgument, ?>> possibleType =
162                         entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
163                 String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
164                 if(registerListener.getEntityType().equals(entityType)) {
165                     Entity entity = new Entity(entityType,
166                             (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
167                     listenerSupport.notifyEntityOwnershipListener(entity, false, true, registerListener.getListener());
168                 }
169             }
170         });
171     }
172
173     private void onUnregisterListenerLocal(UnregisterListenerLocal unregisterListener) {
174         LOG.debug("{}: onUnregisterListenerLocal: {}", persistenceId(), unregisterListener);
175
176         listenerSupport.removeEntityOwnershipListener(unregisterListener.getEntityType(), unregisterListener.getListener());
177
178         getSender().tell(SuccessReply.INSTANCE, getSelf());
179     }
180
181     void tryCommitModifications(final BatchedModifications modifications) {
182         if(isLeader()) {
183             LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(), modifications.getTransactionID());
184
185             // Note that it's possible the commit won't get consensus and will timeout and not be applied
186             // to the state. However we don't need to retry it in that case b/c it will be committed to
187             // the journal first and, once a majority of followers come back on line and it is replicated,
188             // it will be applied at that point.
189             handleBatchedModificationsLocal(modifications, self());
190         } else {
191             final ActorSelection leader = getLeader();
192             if (leader != null) {
193                 if(LOG.isDebugEnabled()) {
194                     LOG.debug("{}: Sending BatchedModifications {} to leader {}", persistenceId(),
195                             modifications.getTransactionID(), leader);
196                 }
197
198                 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
199                         getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
200
201                 Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
202             }
203         }
204     }
205
206     boolean hasLeader() {
207         return getLeader() != null && !isIsolatedLeader();
208     }
209
210     @Override
211     protected void onStateChanged() {
212         super.onStateChanged();
213
214         commitCoordinator.onStateChanged(this, isLeader());
215     }
216
217     @Override
218     protected void onLeaderChanged(String oldLeader, String newLeader) {
219         super.onLeaderChanged(oldLeader, newLeader);
220
221         LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader,
222                 newLeader, isLeader());
223
224         if(isLeader()) {
225             // We were just elected leader. If the old leader is down, select new owners for the entities
226             // owned by the down leader.
227
228             String oldLeaderMemberName = peerIdToMemberNames.get(oldLeader);
229
230             LOG.debug("{}: oldLeaderMemberName: {}", persistenceId(), oldLeaderMemberName);
231
232             if(downPeerMemberNames.contains(oldLeaderMemberName)) {
233                 selectNewOwnerForEntitiesOwnedBy(oldLeaderMemberName);
234             }
235         }
236     }
237
238     private void onCandidateRemoved(CandidateRemoved message) {
239         if(!isLeader()) {
240             return;
241         }
242
243         LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
244
245         String currentOwner = getCurrentOwner(message.getEntityPath());
246         if(message.getRemovedCandidate().equals(currentOwner)){
247             writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
248         }
249     }
250
251     private void onCandidateAdded(CandidateAdded message) {
252         if(!isLeader()){
253             return;
254         }
255
256         LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
257
258         String currentOwner = getCurrentOwner(message.getEntityPath());
259         if(Strings.isNullOrEmpty(currentOwner)){
260             writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
261         }
262     }
263
264     private void onPeerDown(PeerDown peerDown) {
265         LOG.debug("{}: onPeerDown: {}", persistenceId(), peerDown);
266
267         String downMemberName = peerDown.getMemberName();
268         if(downPeerMemberNames.add(downMemberName) && isLeader()) {
269             // Select new owners for entities owned by the down peer.
270             selectNewOwnerForEntitiesOwnedBy(downMemberName);
271         }
272     }
273
274     private void onPeerUp(PeerUp peerUp) {
275         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
276
277         peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
278
279         if(downPeerMemberNames.remove(peerUp.getMemberName()) && isLeader()) {
280             // This peer was previously down - for its previously owned entities, if there were no other
281             // candidates, the owner would have been cleared so handle that here by trying to re-assign
282             // ownership for entities whose owner is cleared.
283             selectNewOwnerForEntitiesOwnedBy("");
284         }
285     }
286
287     private void selectNewOwnerForEntitiesOwnedBy(String owner) {
288         final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
289         searchForEntitiesOwnedBy(owner, new EntityWalker() {
290             @Override
291             public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
292                 Object newOwner = newOwner(getCandidateNames(entityNode));
293                 YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
294                         node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
295                         node(ENTITY_OWNER_NODE_ID).build();
296
297                 LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
298
299                 modifications.addModification(new WriteModification(entityPath,
300                         ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
301             }
302         });
303
304         commitCoordinator.commitModifications(modifications, this);
305     }
306
307     private void searchForEntitiesOwnedBy(String owner, EntityWalker walker) {
308         DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
309         Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
310         if(!possibleEntityTypes.isPresent()) {
311             return;
312         }
313
314         LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
315
316         for(MapEntryNode entityType:  ((MapNode) possibleEntityTypes.get()).getValue()) {
317             Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
318                     entityType.getChild(ENTITY_NODE_ID);
319             if(!possibleEntities.isPresent()) {
320                 continue; // shouldn't happen but handle anyway
321             }
322
323             for(MapEntryNode entity:  ((MapNode) possibleEntities.get()).getValue()) {
324                 Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
325                         entity.getChild(ENTITY_OWNER_NODE_ID);
326                 if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
327                     walker.onEntity(entityType, entity);
328                 }
329             }
330         }
331     }
332
333     private Collection<String> getCandidateNames(MapEntryNode entity) {
334         Collection<MapEntryNode> candidates = ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getValue();
335         Collection<String> candidateNames = new ArrayList<>(candidates.size());
336         for(MapEntryNode candidate: candidates) {
337             candidateNames.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
338         }
339
340         return candidateNames;
341     }
342
343     private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) {
344         LOG.debug("{}: Writing new owner {} for entity {}", persistenceId(), newOwner, entityPath);
345
346         commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
347                 ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
348     }
349
350     private String newOwner(Collection<String> candidates) {
351         for(String candidate: candidates) {
352             if(!downPeerMemberNames.contains(candidate)) {
353                 return candidate;
354             }
355         }
356
357         return "";
358     }
359
360     private String getCurrentOwner(YangInstanceIdentifier entityId) {
361         DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
362         Optional<NormalizedNode<?, ?>> optionalEntityOwner = snapshot.readNode(entityId.node(ENTITY_OWNER_QNAME));
363         if(optionalEntityOwner.isPresent()){
364             return optionalEntityOwner.get().getValue().toString();
365         }
366         return null;
367     }
368
369     public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
370             final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
371         return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
372     }
373
374     private static class Creator extends AbstractShardCreator {
375         private static final long serialVersionUID = 1L;
376
377         private final String localMemberName;
378
379         Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
380                 final DatastoreContext datastoreContext, final SchemaContext schemaContext,
381                 final String localMemberName) {
382             super(name, peerAddresses, datastoreContext, schemaContext);
383             this.localMemberName = localMemberName;
384         }
385
386         @Override
387         public Shard create() throws Exception {
388             return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
389         }
390     }
391
392     private static interface EntityWalker {
393         void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
394     }
395 }