2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.core.session;
10 import com.google.common.base.Objects;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.math.BigInteger;
17 import java.util.Comparator;
18 import java.util.concurrent.ArrayBlockingQueue;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.PriorityBlockingQueue;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
24 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionManager;
25 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
26 import org.opendaylight.openflowplugin.openflow.md.util.RoleUtil;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.common.config.impl.rev140326.OfpRole;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
32 * manage OF-role propagation to devices
34 public class OFRoleManager implements AutoCloseable {
37 * starting value of generationId
39 public static final BigInteger MAX_GENERATION_ID = new BigInteger("ffffffffffffffff", 16);
41 private static final Logger LOG = LoggerFactory.getLogger(OFRoleManager.class);
43 private static final long TIMEOUT = 2000;
45 private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
47 private static final int RETRY_LIMIT = 42;
49 private final ListeningExecutorService broadcastPool;
51 private final BlockingQueue<RolePushTask> workQueue;
53 private final SessionManager sessionManager;
56 * @param sessionManager
58 public OFRoleManager(SessionManager sessionManager) {
59 Preconditions.checkNotNull("Session manager can not be empty.", sessionManager);
60 this.sessionManager = sessionManager;
61 workQueue = new PriorityBlockingQueue<>(500, new Comparator<RolePushTask>() {
63 public int compare(RolePushTask o1, RolePushTask o2) {
64 return Integer.compare(o1.getPriority(), o2.getPriority());
67 ThreadPoolLoggingExecutor delegate = new ThreadPoolLoggingExecutor(
68 1, 1, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "ofRoleBroadcast");
69 broadcastPool = MoreExecutors.listeningDecorator(
74 * change role on each connected device
78 public void manageRoleChange(final OfpRole role) {
79 for (final SessionContext session : sessionManager.getAllSessions()) {
81 workQueue.put(new RolePushTask(role, session));
82 } catch (InterruptedException e) {
83 LOG.warn("Processing of role request failed while enqueueing role task: {}", e.getMessage());
87 while (!workQueue.isEmpty()) {
88 RolePushTask task = workQueue.poll();
89 ListenableFuture<Boolean> rolePushResult = broadcastPool.submit(task);
90 CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
91 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
93 Boolean succeeded = rolePushResultChecked.checkedGet(TIMEOUT, TIMEOUT_UNIT);
94 if (!Objects.firstNonNull(succeeded, Boolean.FALSE)) {
95 if (task.getRetryCounter() < RETRY_LIMIT) {
96 workQueue.offer(task);
99 } catch (RolePushException | TimeoutException e) {
100 LOG.warn("failed to process role request: {}", e);
106 public void close() throws Exception {
107 broadcastPool.shutdown();