Bug 4105: Implement RegisterCandidate in EntityOwnershipShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import akka.actor.ActorSelection;
11 import akka.actor.Props;
12 import akka.dispatch.OnComplete;
13 import akka.pattern.AskTimeoutException;
14 import akka.pattern.Patterns;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
20 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
21 import org.opendaylight.controller.cluster.datastore.Shard;
22 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
23 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
24 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
27 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
28 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
29 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
33 import org.opendaylight.yangtools.yang.common.QName;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
37 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import scala.concurrent.Future;
43
44 /**
45  * Special Shard for EntityOwnership.
46  *
47  * @author Thomas Pantelis
48  */
49 class EntityOwnershipShard extends Shard {
50     static final YangInstanceIdentifier ENTITY_OWNERS_PATH = YangInstanceIdentifier.of(EntityOwners.QNAME);
51     static final  QName ENTITY_QNAME = org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.
52             md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.Entity.QNAME;
53     static final QName CANDIDATE_NAME = QName.create(Candidate.QNAME, "name");
54     static final QName ENTITY_ID = QName.create(ENTITY_QNAME, "id");
55     static final QName ENTITY_TYPE = QName.create(EntityType.QNAME, "type");
56
57     private int transactionIDCounter = 0;
58     private final String localMemberName;
59     private final List<BatchedModifications> retryModifications = new ArrayList<>();
60
61     private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
62         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
63     }
64
65     protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
66             DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
67         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
68         this.localMemberName = localMemberName;
69     }
70
71     @Override
72     protected void onDatastoreContext(DatastoreContext context) {
73         super.onDatastoreContext(noPersistenceDatastoreContext(context));
74     }
75
76     @Override
77     public void onReceiveCommand(final Object message) throws Exception {
78         if(message instanceof RegisterCandidateLocal) {
79             onRegisterCandidateLocal((RegisterCandidateLocal)message);
80         } else if(message instanceof UnregisterCandidateLocal) {
81             onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
82         } else {
83             super.onReceiveCommand(message);
84         }
85     }
86
87     private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
88         LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
89
90         // TODO - add the listener locally.
91
92         BatchedModifications modifications = new BatchedModifications(
93                 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
94                 DataStoreVersions.CURRENT_VERSION, "");
95         modifications.setDoCommitOnReady(true);
96         modifications.setReady(true);
97         modifications.setTotalMessagesSent(1);
98
99         NormalizedNode<?, ?> entityOwners = createEntityOwnersWithCandidate(registerCandidate.getEntity(), localMemberName);
100         modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
101
102         tryCommitModifications(modifications);
103
104         getSender().tell(SuccessReply.INSTANCE, getSelf());
105     }
106
107     private NormalizedNode<?, ?> createEntityOwnersWithCandidate(Entity entity, String memberName) {
108         MapNode candidateNode = ImmutableNodes.mapNodeBuilder(Candidate.QNAME).addChild(
109                         ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME, memberName)).build();
110
111         MapEntryNode entityNode = ImmutableNodes.mapEntryBuilder(ENTITY_QNAME, ENTITY_ID, entity.getId()).
112                 addChild(candidateNode).build();
113
114         MapEntryNode entityTypeNode = ImmutableNodes.mapEntryBuilder(EntityType.QNAME, ENTITY_TYPE, entity.getType()).
115                 addChild(ImmutableNodes.mapNodeBuilder(ENTITY_QNAME).addChild(entityNode).build()).build();
116
117         return ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(EntityOwners.QNAME)).
118                 addChild(ImmutableNodes.mapNodeBuilder(EntityType.QNAME).addChild(entityTypeNode).build()).build();
119     }
120
121     private void tryCommitModifications(final BatchedModifications modifications) {
122         if(isLeader()) {
123             if(isIsolatedLeader()) {
124                 LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
125
126                 retryModifications.add(modifications);
127             } else {
128                 LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
129
130                 // Note that it's possible the commit won't get consensus and will timeout and not be applied
131                 // to the state. However we don't need to retry it in that case b/c it will be committed to
132                 // the journal first and, once a majority of followers come back on line and it is replicated,
133                 // it will be applied at that point.
134                 handleBatchedModificationsLocal(modifications, self());
135             }
136         } else {
137             final ActorSelection leader = getLeader();
138             if (leader != null) {
139                 LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
140
141                 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
142                         getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
143                 future.onComplete(new OnComplete<Object>() {
144                     @Override
145                     public void onComplete(Throwable failure, Object response) {
146                         if(failure != null) {
147                             if(failure instanceof AskTimeoutException) {
148                                 LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
149                                         modifications.getTransactionID(), leader);
150                                 tryCommitModifications(modifications);
151                             } else {
152                                 LOG.error("BatchedModifications {} to leader {} failed",
153                                         modifications.getTransactionID(), leader, failure);
154                             }
155                         } else {
156                             LOG.debug("BatchedModifications {} to leader {} succeeded",
157                                     modifications.getTransactionID(), leader);
158                         }
159                     }
160                 }, getContext().dispatcher());
161             } else {
162                 LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
163
164                 retryModifications.add(modifications);
165             }
166         }
167     }
168
169     @Override
170     protected void onStateChanged() {
171         super.onStateChanged();
172
173         if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
174             LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
175
176             List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
177             retryModifications.clear();
178             for(BatchedModifications mods: retryModificationsCopy) {
179                 tryCommitModifications(mods);
180             }
181         }
182     }
183
184     private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
185         // TODO - implement
186         getSender().tell(SuccessReply.INSTANCE, getSelf());
187     }
188
189     public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
190             final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
191         return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
192     }
193
194     private static class Creator extends AbstractShardCreator {
195         private static final long serialVersionUID = 1L;
196
197         private final String localMemberName;
198
199         Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
200                 final DatastoreContext datastoreContext, final SchemaContext schemaContext,
201                 final String localMemberName) {
202             super(name, peerAddresses, datastoreContext, schemaContext);
203             this.localMemberName = localMemberName;
204         }
205
206         @Override
207         public Shard create() throws Exception {
208             return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
209         }
210     }
211 }