2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.role;
10 import javax.annotation.Nullable;
11 import java.util.concurrent.Future;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Verify;
16 import com.google.common.util.concurrent.AsyncFunction;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.JdkFutureAdapters;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import io.netty.util.Timeout;
22 import io.netty.util.TimerTask;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.Semaphore;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
28 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
29 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
30 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
31 import org.opendaylight.openflowplugin.api.OFConstants;
32 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
35 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
36 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
37 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
38 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
45 import org.opendaylight.yangtools.yang.common.RpcResult;
46 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
51 * Created by kramesha on 9/12/15.
53 public class RoleContextImpl implements RoleContext {
54 private static final Logger LOG = LoggerFactory.getLogger(RoleContextImpl.class);
56 private final EntityOwnershipService entityOwnershipService;
57 private EntityOwnershipCandidateRegistration entityOwnershipCandidateRegistration;
58 private EntityOwnershipCandidateRegistration txEntityOwnershipCandidateRegistration;
60 private final DeviceContext deviceContext;
62 private final Entity entity;
63 private final Entity txEntity;
65 private SalRoleService salRoleService;
67 private final Semaphore mainCandidateGuard = new Semaphore(1, true);
68 private final Semaphore txCandidateGuard = new Semaphore(1, true);
70 public RoleContextImpl(final DeviceContext deviceContext, final EntityOwnershipService entityOwnershipService,
71 final Entity entity, final Entity txEntity) {
72 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
73 this.deviceContext = Preconditions.checkNotNull(deviceContext);
74 this.entity = Preconditions.checkNotNull(entity);
75 this.txEntity = Preconditions.checkNotNull(txEntity);
76 salRoleService = new SalRoleServiceImpl(this, deviceContext);
80 public void initialization() throws CandidateAlreadyRegisteredException {
81 LOG.debug("Initialization RoleContext for Node {}", deviceContext.getDeviceState().getNodeId());
82 final AsyncFunction<RpcResult<SetRoleOutput>, Void> initFunction = new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
84 public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> input) throws Exception {
85 LOG.debug("Initialization request OpenflowEntityOwnership for entity {}", entity);
86 getDeviceState().setRole(OfpRole.BECOMESLAVE);
87 entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity);
88 LOG.debug("RoleContextImpl : Candidate registered with ownership service for device :{}", deviceContext
89 .getPrimaryConnectionContext().getNodeId().getValue());
90 return Futures.immediateFuture(null);
93 final ListenableFuture<Void> roleChange = sendRoleChangeToDevice(OfpRole.BECOMESLAVE, initFunction);
94 Futures.addCallback(roleChange, new FutureCallback<Void>() {
97 public void onSuccess(final Void result) {
98 LOG.debug("Initial RoleContext for Node {} is successful", deviceContext.getDeviceState().getNodeId());
102 public void onFailure(final Throwable t) {
103 LOG.warn("Initial RoleContext for Node {} fail", deviceContext.getDeviceState().getNodeId(), t);
104 deviceContext.close();
110 public ListenableFuture<Void> onRoleChanged(final OfpRole oldRole, final OfpRole newRole) {
111 LOG.trace("onRoleChanged method call for Entity {}", entity);
113 if (!isDeviceConnected()) {
114 // this can happen as after the disconnect, we still get a last message from EntityOwnershipService.
115 LOG.info("Device {} is disconnected from this node. Hence not attempting a role change.",
116 deviceContext.getPrimaryConnectionContext().getNodeId());
117 LOG.debug("SetRole cancelled for entity [{}], reason = device disconnected.", entity);
118 return Futures.immediateFailedFuture(new Exception(
119 "Device disconnected - stopped by setRole: " + deviceContext
120 .getPrimaryConnectionContext().getNodeId()));
123 LOG.debug("Role change received from ownership listener from {} to {} for device:{}", oldRole, newRole,
124 deviceContext.getPrimaryConnectionContext().getNodeId());
126 final AsyncFunction<RpcResult<SetRoleOutput>, Void> roleChangeFunction = new AsyncFunction<RpcResult<SetRoleOutput>, Void>() {
128 public ListenableFuture<Void> apply(final RpcResult<SetRoleOutput> setRoleOutputRpcResult) throws Exception {
129 LOG.debug("Role change {} successful made on switch :{}", newRole, deviceContext.getDeviceState()
131 getDeviceState().setRole(newRole);
132 return deviceContext.onClusterRoleChange(oldRole, newRole);
135 return sendRoleChangeToDevice(newRole, roleChangeFunction);
139 public void setupTxCandidate() throws CandidateAlreadyRegisteredException {
140 LOG.debug("setupTxCandidate for entity {} and Transaction entity {}", entity, txEntity);
141 Verify.verify(txEntity != null);
143 txEntityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(txEntity);
147 public void close() {
148 if (entityOwnershipCandidateRegistration != null) {
149 LOG.debug("Closing EntityOwnershipCandidateRegistration for {}", entity);
150 entityOwnershipCandidateRegistration.close();
155 public Entity getEntity() {
160 public Entity getTxEntity() {
164 private boolean isDeviceConnected() {
165 return ConnectionContext.CONNECTION_STATE.WORKING.equals(
166 deviceContext.getPrimaryConnectionContext().getConnectionState());
171 public <T> RequestContext<T> createRequestContext() {
172 final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
174 public void close() {
181 void setSalRoleService(final SalRoleService salRoleService) {
182 this.salRoleService = salRoleService;
186 public DeviceState getDeviceState() {
187 return deviceContext.getDeviceState();
191 public void suspendTxCandidate() {
192 if (txEntityOwnershipCandidateRegistration != null) {
193 txEntityOwnershipCandidateRegistration.close();
194 txEntityOwnershipCandidateRegistration = null;
199 public DeviceContext getDeviceContext() {
200 return deviceContext;
204 public Semaphore getMainCandidateGuard() {
205 return mainCandidateGuard;
209 public Semaphore getTxCandidateGuard() {
210 return txCandidateGuard;
213 private ListenableFuture<Void> sendRoleChangeToDevice(final OfpRole newRole, final AsyncFunction<RpcResult<SetRoleOutput>, Void> function) {
214 LOG.debug("Send new role {} to device {}", newRole, deviceContext.getDeviceState().getNodeId());
215 final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
216 if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) {
217 LOG.debug("Device OF version {} not support ROLE", deviceContext.getDeviceState().getFeatures().getVersion());
218 setRoleOutputFuture = Futures.immediateFuture(RpcResultBuilder.<SetRoleOutput> success().build());
220 final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
221 .setNode(new NodeRef(deviceContext.getDeviceState().getNodeInstanceIdentifier())).build();
222 setRoleOutputFuture = salRoleService.setRole(setRoleInput);
223 final TimerTask timerTask = new TimerTask() {
226 public void run(final Timeout timeout) throws Exception {
227 if (!setRoleOutputFuture.isDone()) {
228 LOG.info("New role {} was not propagated to device {} during 10 sec. Close connection immediately.",
229 newRole, deviceContext.getDeviceState().getNodeId());
230 deviceContext.close();
234 deviceContext.getTimer().newTimeout(timerTask, 10, TimeUnit.SECONDS);
236 return Futures.transform(JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture), function);