4356007af249a08ae828e1409581702670660dc1
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / role / RoleContextImpl.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.role;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.JdkFutureAdapters;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import io.netty.util.HashedWheelTimer;
16 import io.netty.util.Timeout;
17 import io.netty.util.TimerTask;
18 import java.util.Collection;
19 import java.util.HashSet;
20 import java.util.concurrent.CancellationException;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicReference;
25 import javax.annotation.Nonnull;
26 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
27 import org.opendaylight.openflowplugin.api.OFConstants;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
29 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
30 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
31 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
32 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
33 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
34 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
42 import org.opendaylight.yangtools.yang.common.RpcResult;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public class RoleContextImpl implements RoleContext {
47     private static final Logger LOG = LoggerFactory.getLogger(RoleContextImpl.class);
48     private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
49
50     // Timeout  after what we will give up on propagating role
51     private static final long SET_ROLE_TIMEOUT = 10000;
52
53     private final DeviceInfo deviceInfo;
54     private final HashedWheelTimer timer;
55     private final AtomicReference<ListenableFuture<RpcResult<SetRoleOutput>>> lastRoleFuture = new AtomicReference<>();
56     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
57     private final Timeout slaveTask;
58     private final OpenflowProviderConfig config;
59     private ContextChainMastershipWatcher contextChainMastershipWatcher;
60     private SalRoleService roleService;
61
62     RoleContextImpl(@Nonnull final DeviceInfo deviceInfo,
63                     @Nonnull final HashedWheelTimer timer,
64                     final long checkRoleMasterTimeout,
65                     final OpenflowProviderConfig config) {
66         this.deviceInfo = deviceInfo;
67         this.timer = timer;
68         this.config = config;
69         slaveTask = timer.newTimeout((timerTask) -> makeDeviceSlave(), checkRoleMasterTimeout, TimeUnit.MILLISECONDS);
70
71         LOG.info("Started timer for setting SLAVE role on device {} if no role will be set in {}s.",
72                 deviceInfo,
73                 checkRoleMasterTimeout / 1000L);
74     }
75
76     @Override
77     public DeviceInfo getDeviceInfo() {
78         return deviceInfo;
79     }
80
81     @Override
82     public void setRoleService(final SalRoleService salRoleService) {
83         roleService = salRoleService;
84     }
85
86     @Override
87     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
88         this.contextChainMastershipWatcher = newWatcher;
89     }
90
91     @Override
92     public void close() {
93         changeLastRoleFuture(null);
94         requestContexts.forEach(requestContext -> RequestContextUtil
95                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
96         requestContexts.clear();
97     }
98
99     @Override
100     public void instantiateServiceInstance() {
101         final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMEMASTER);
102         changeLastRoleFuture(future);
103         Futures.addCallback(future, new MasterRoleCallback(), Executors.newSingleThreadExecutor());
104     }
105
106     @Override
107     public ListenableFuture<Void> closeServiceInstance() {
108         changeLastRoleFuture(null);
109         return Futures.immediateFuture(null);
110     }
111
112     @Override
113     public <T> RequestContext<T> createRequestContext() {
114         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
115             @Override
116             public void close() {
117                 requestContexts.remove(this);
118             }
119         };
120
121         requestContexts.add(ret);
122         return ret;
123     }
124
125     @Nonnull
126     @Override
127     public ServiceGroupIdentifier getIdentifier() {
128         return deviceInfo.getServiceIdentifier();
129     }
130
131     private void changeLastRoleFuture(final ListenableFuture<RpcResult<SetRoleOutput>> newFuture) {
132         slaveTask.cancel();
133         lastRoleFuture.getAndUpdate(lastFuture -> {
134             if (lastFuture != null && !lastFuture.isCancelled() && !lastFuture.isDone()) {
135                 lastFuture.cancel(true);
136             }
137
138             return newFuture;
139         });
140     }
141
142     private ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
143         final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
144         changeLastRoleFuture(future);
145         Futures.addCallback(future, new SlaveRoleCallback(), MoreExecutors.directExecutor());
146         return future;
147     }
148
149     private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
150         final Boolean isEqualRole = config.isEnableEqualRole();
151         if (isEqualRole) {
152             LOG.warn("Skip sending role change request to device {} as user enabled"
153                     + " equal role for controller", deviceInfo);
154             return Futures.immediateFuture(null);
155         }
156         LOG.debug("Sending new role {} to device {}", newRole, deviceInfo);
157
158         if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
159             final SetRoleInput setRoleInput = new SetRoleInputBuilder()
160                     .setControllerRole(newRole)
161                     .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier()))
162                     .build();
163
164             final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture = roleService.setRole(setRoleInput);
165
166             final TimerTask timerTask = timeout -> {
167                 if (!setRoleOutputFuture.isDone()) {
168                     LOG.warn("New role {} was not propagated to device {} during {} sec", newRole,
169                             deviceInfo, SET_ROLE_TIMEOUT);
170                     setRoleOutputFuture.cancel(true);
171                 }
172             };
173
174             timer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.MILLISECONDS);
175             return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
176         }
177
178         LOG.info("Device: {} with version: {} does not support role {}", deviceInfo, deviceInfo.getVersion(), newRole);
179         return Futures.immediateFuture(null);
180     }
181
182     private final class MasterRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
183         @Override
184         public void onSuccess(RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
185             contextChainMastershipWatcher.onMasterRoleAcquired(
186                     deviceInfo,
187                     ContextChainMastershipState.MASTER_ON_DEVICE);
188             OF_EVENT_LOG.debug("Master Elected, Node: {}", deviceInfo.getDatapathId());
189             LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo);
190         }
191
192         @Override
193         public void onFailure(final Throwable throwable) {
194             if (!(throwable instanceof CancellationException)) {
195                 contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
196                         deviceInfo,
197                         "Was not able to propagate MASTER role on device. Error: " + throwable.toString());
198             }
199         }
200     }
201
202     private final class SlaveRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
203         @Override
204         public void onSuccess(final RpcResult<SetRoleOutput> result) {
205             contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
206             LOG.debug("Role SLAVE was successfully set on device, node {}", deviceInfo);
207             OF_EVENT_LOG.debug("Role SLAVE was successfully set on device, node {}", deviceInfo);
208         }
209
210         @Override
211         public void onFailure(final Throwable throwable) {
212             if (!(throwable instanceof CancellationException)) {
213                 contextChainMastershipWatcher.onSlaveRoleNotAcquired(deviceInfo,
214                         "Was not able to propagate SLAVE role on device. Error: " + throwable.toString());
215             }
216         }
217     }
218 }