2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
11 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import akka.pattern.Patterns;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
19 import org.opendaylight.controller.cluster.datastore.Shard;
20 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
21 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
24 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
25 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
26 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
27 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
28 import scala.concurrent.Future;
31 * Special Shard for EntityOwnership.
33 * @author Thomas Pantelis
35 class EntityOwnershipShard extends Shard {
36 private final String localMemberName;
37 private final EntityOwnershipShardCommitCoordinator commitCoordinator;
39 private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
40 return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
43 protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
44 DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
45 super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
46 this.localMemberName = localMemberName;
47 this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
51 protected void onDatastoreContext(DatastoreContext context) {
52 super.onDatastoreContext(noPersistenceDatastoreContext(context));
56 public void onReceiveCommand(final Object message) throws Exception {
57 if(message instanceof RegisterCandidateLocal) {
58 onRegisterCandidateLocal((RegisterCandidateLocal)message);
59 } else if(message instanceof UnregisterCandidateLocal) {
60 onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
61 } else if(!commitCoordinator.handleMessage(message, this)) {
62 super.onReceiveCommand(message);
66 private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
67 LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
69 // TODO - add the listener locally.
71 NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
72 registerCandidate.getEntity().getId(), localMemberName);
73 commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
75 getSender().tell(SuccessReply.INSTANCE, getSelf());
78 void tryCommitModifications(final BatchedModifications modifications) {
80 LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
82 // Note that it's possible the commit won't get consensus and will timeout and not be applied
83 // to the state. However we don't need to retry it in that case b/c it will be committed to
84 // the journal first and, once a majority of followers come back on line and it is replicated,
85 // it will be applied at that point.
86 handleBatchedModificationsLocal(modifications, self());
88 final ActorSelection leader = getLeader();
90 LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
92 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
93 getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
95 Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
100 boolean hasLeader() {
101 return getLeader() != null && !isIsolatedLeader();
105 protected void onStateChanged() {
106 super.onStateChanged();
108 commitCoordinator.onStateChanged(this, isLeader());
111 private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
113 getSender().tell(SuccessReply.INSTANCE, getSelf());
116 public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
117 final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
118 return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
121 private static class Creator extends AbstractShardCreator {
122 private static final long serialVersionUID = 1L;
124 private final String localMemberName;
126 Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
127 final DatastoreContext datastoreContext, final SchemaContext schemaContext,
128 final String localMemberName) {
129 super(name, peerAddresses, datastoreContext, schemaContext);
130 this.localMemberName = localMemberName;
134 public Shard create() throws Exception {
135 return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);