2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import akka.actor.ActorSelection;
11 import akka.actor.Props;
12 import akka.dispatch.OnComplete;
13 import akka.pattern.AskTimeoutException;
14 import akka.pattern.Patterns;
15 import java.util.ArrayList;
16 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
20 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
21 import org.opendaylight.controller.cluster.datastore.Shard;
22 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
23 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
24 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
27 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
28 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
29 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
33 import org.opendaylight.yangtools.yang.common.QName;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
37 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import scala.concurrent.Future;
45 * Special Shard for EntityOwnership.
47 * @author Thomas Pantelis
49 class EntityOwnershipShard extends Shard {
50 static final YangInstanceIdentifier ENTITY_OWNERS_PATH = YangInstanceIdentifier.of(EntityOwners.QNAME);
51 static final QName ENTITY_QNAME = org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.
52 md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.Entity.QNAME;
53 static final QName CANDIDATE_NAME = QName.create(Candidate.QNAME, "name");
54 static final QName ENTITY_ID = QName.create(ENTITY_QNAME, "id");
55 static final QName ENTITY_TYPE = QName.create(EntityType.QNAME, "type");
57 private int transactionIDCounter = 0;
58 private final String localMemberName;
59 private final List<BatchedModifications> retryModifications = new ArrayList<>();
61 private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
62 return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
65 protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
66 DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
67 super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
68 this.localMemberName = localMemberName;
72 protected void onDatastoreContext(DatastoreContext context) {
73 super.onDatastoreContext(noPersistenceDatastoreContext(context));
77 public void onReceiveCommand(final Object message) throws Exception {
78 if(message instanceof RegisterCandidateLocal) {
79 onRegisterCandidateLocal((RegisterCandidateLocal)message);
80 } else if(message instanceof UnregisterCandidateLocal) {
81 onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
83 super.onReceiveCommand(message);
87 private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
88 LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
90 // TODO - add the listener locally.
92 BatchedModifications modifications = new BatchedModifications(
93 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
94 DataStoreVersions.CURRENT_VERSION, "");
95 modifications.setDoCommitOnReady(true);
96 modifications.setReady(true);
97 modifications.setTotalMessagesSent(1);
99 NormalizedNode<?, ?> entityOwners = createEntityOwnersWithCandidate(registerCandidate.getEntity(), localMemberName);
100 modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
102 tryCommitModifications(modifications);
104 getSender().tell(SuccessReply.INSTANCE, getSelf());
107 private NormalizedNode<?, ?> createEntityOwnersWithCandidate(Entity entity, String memberName) {
108 MapNode candidateNode = ImmutableNodes.mapNodeBuilder(Candidate.QNAME).addChild(
109 ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME, memberName)).build();
111 MapEntryNode entityNode = ImmutableNodes.mapEntryBuilder(ENTITY_QNAME, ENTITY_ID, entity.getId()).
112 addChild(candidateNode).build();
114 MapEntryNode entityTypeNode = ImmutableNodes.mapEntryBuilder(EntityType.QNAME, ENTITY_TYPE, entity.getType()).
115 addChild(ImmutableNodes.mapNodeBuilder(ENTITY_QNAME).addChild(entityNode).build()).build();
117 return ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(EntityOwners.QNAME)).
118 addChild(ImmutableNodes.mapNodeBuilder(EntityType.QNAME).addChild(entityTypeNode).build()).build();
121 private void tryCommitModifications(final BatchedModifications modifications) {
123 if(isIsolatedLeader()) {
124 LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
126 retryModifications.add(modifications);
128 LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
130 // Note that it's possible the commit won't get consensus and will timeout and not be applied
131 // to the state. However we don't need to retry it in that case b/c it will be committed to
132 // the journal first and, once a majority of followers come back on line and it is replicated,
133 // it will be applied at that point.
134 handleBatchedModificationsLocal(modifications, self());
137 final ActorSelection leader = getLeader();
138 if (leader != null) {
139 LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
141 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
142 getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
143 future.onComplete(new OnComplete<Object>() {
145 public void onComplete(Throwable failure, Object response) {
146 if(failure != null) {
147 if(failure instanceof AskTimeoutException) {
148 LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
149 modifications.getTransactionID(), leader);
150 tryCommitModifications(modifications);
152 LOG.error("BatchedModifications {} to leader {} failed",
153 modifications.getTransactionID(), leader, failure);
156 LOG.debug("BatchedModifications {} to leader {} succeeded",
157 modifications.getTransactionID(), leader);
160 }, getContext().dispatcher());
162 LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
164 retryModifications.add(modifications);
170 protected void onStateChanged() {
171 super.onStateChanged();
173 if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
174 LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
176 List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
177 retryModifications.clear();
178 for(BatchedModifications mods: retryModificationsCopy) {
179 tryCommitModifications(mods);
184 private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
186 getSender().tell(SuccessReply.INSTANCE, getSelf());
189 public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
190 final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
191 return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
194 private static class Creator extends AbstractShardCreator {
195 private static final long serialVersionUID = 1L;
197 private final String localMemberName;
199 Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
200 final DatastoreContext datastoreContext, final SchemaContext schemaContext,
201 final String localMemberName) {
202 super(name, peerAddresses, datastoreContext, schemaContext);
203 this.localMemberName = localMemberName;
207 public Shard create() throws Exception {
208 return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);