2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.role;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.JdkFutureAdapters;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import io.netty.util.HashedWheelTimer;
16 import io.netty.util.Timeout;
17 import io.netty.util.TimerTask;
18 import java.util.Collection;
19 import java.util.HashSet;
20 import java.util.concurrent.CancellationException;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicReference;
25 import javax.annotation.Nonnull;
26 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
27 import org.opendaylight.openflowplugin.api.OFConstants;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
29 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
30 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
31 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
32 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
33 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
34 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
42 import org.opendaylight.yangtools.yang.common.RpcResult;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 public class RoleContextImpl implements RoleContext {
47 private static final Logger LOG = LoggerFactory.getLogger(RoleContextImpl.class);
48 private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
50 // Timeout after what we will give up on propagating role
51 private static final long SET_ROLE_TIMEOUT = 10000;
53 private final DeviceInfo deviceInfo;
54 private final HashedWheelTimer timer;
55 private final AtomicReference<ListenableFuture<RpcResult<SetRoleOutput>>> lastRoleFuture = new AtomicReference<>();
56 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
57 private final Timeout slaveTask;
58 private final OpenflowProviderConfig config;
59 private ContextChainMastershipWatcher contextChainMastershipWatcher;
60 private SalRoleService roleService;
62 RoleContextImpl(@Nonnull final DeviceInfo deviceInfo,
63 @Nonnull final HashedWheelTimer timer,
64 final long checkRoleMasterTimeout,
65 final OpenflowProviderConfig config) {
66 this.deviceInfo = deviceInfo;
69 slaveTask = timer.newTimeout((timerTask) -> makeDeviceSlave(), checkRoleMasterTimeout, TimeUnit.MILLISECONDS);
71 LOG.info("Started timer for setting SLAVE role on device {} if no role will be set in {}s.",
73 checkRoleMasterTimeout / 1000L);
77 public DeviceInfo getDeviceInfo() {
82 public void setRoleService(final SalRoleService salRoleService) {
83 roleService = salRoleService;
87 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
88 this.contextChainMastershipWatcher = newWatcher;
93 changeLastRoleFuture(null);
94 requestContexts.forEach(requestContext -> RequestContextUtil
95 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
96 requestContexts.clear();
100 public void instantiateServiceInstance() {
101 final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMEMASTER);
102 changeLastRoleFuture(future);
103 Futures.addCallback(future, new MasterRoleCallback(), Executors.newSingleThreadExecutor());
107 public ListenableFuture<Void> closeServiceInstance() {
108 changeLastRoleFuture(null);
109 return Futures.immediateFuture(null);
113 public <T> RequestContext<T> createRequestContext() {
114 final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
116 public void close() {
117 requestContexts.remove(this);
121 requestContexts.add(ret);
127 public ServiceGroupIdentifier getIdentifier() {
128 return deviceInfo.getServiceIdentifier();
131 private void changeLastRoleFuture(final ListenableFuture<RpcResult<SetRoleOutput>> newFuture) {
133 lastRoleFuture.getAndUpdate(lastFuture -> {
134 if (lastFuture != null && !lastFuture.isCancelled() && !lastFuture.isDone()) {
135 lastFuture.cancel(true);
142 private ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
143 final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
144 changeLastRoleFuture(future);
145 Futures.addCallback(future, new SlaveRoleCallback(), MoreExecutors.directExecutor());
149 private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
150 final Boolean isEqualRole = config.isEnableEqualRole();
152 LOG.warn("Skip sending role change request to device {} as user enabled"
153 + " equal role for controller", deviceInfo);
154 return Futures.immediateFuture(null);
156 LOG.debug("Sending new role {} to device {}", newRole, deviceInfo);
158 if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
159 final SetRoleInput setRoleInput = new SetRoleInputBuilder()
160 .setControllerRole(newRole)
161 .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier()))
164 final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture = roleService.setRole(setRoleInput);
166 final TimerTask timerTask = timeout -> {
167 if (!setRoleOutputFuture.isDone()) {
168 LOG.warn("New role {} was not propagated to device {} during {} sec", newRole,
169 deviceInfo, SET_ROLE_TIMEOUT);
170 setRoleOutputFuture.cancel(true);
174 timer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.MILLISECONDS);
175 return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
178 LOG.info("Device: {} with version: {} does not support role {}", deviceInfo, deviceInfo.getVersion(), newRole);
179 return Futures.immediateFuture(null);
182 private final class MasterRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
184 public void onSuccess(RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
185 contextChainMastershipWatcher.onMasterRoleAcquired(
187 ContextChainMastershipState.MASTER_ON_DEVICE);
188 OF_EVENT_LOG.debug("Master Elected, Node: {}", deviceInfo.getDatapathId());
189 LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo);
193 public void onFailure(final Throwable throwable) {
194 if (!(throwable instanceof CancellationException)) {
195 contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
197 "Was not able to propagate MASTER role on device. Error: " + throwable.toString());
202 private final class SlaveRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
204 public void onSuccess(final RpcResult<SetRoleOutput> result) {
205 contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
206 LOG.debug("Role SLAVE was successfully set on device, node {}", deviceInfo);
207 OF_EVENT_LOG.debug("Role SLAVE was successfully set on device, node {}", deviceInfo);
211 public void onFailure(final Throwable throwable) {
212 if (!(throwable instanceof CancellationException)) {
213 contextChainMastershipWatcher.onSlaveRoleNotAcquired(deviceInfo,
214 "Was not able to propagate SLAVE role on device. Error: " + throwable.toString());