3800413eb1b9964d00207f526177862eb69c5885
[controller.git] / opendaylight / md-sal / topology-manager / src / main / java / org / opendaylight / md / controller / topology / manager / OperationProcessor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.md.controller.topology.manager;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 final class OperationProcessor implements Runnable {
22     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
23     private static final int MAX_TRANSACTION_OPERATIONS = 100;
24     private static final int OPERATION_QUEUE_DEPTH = 500;
25
26     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
27     private final DataBroker dataBroker;
28
29     OperationProcessor(final DataBroker dataBroker) {
30         this.dataBroker = Preconditions.checkNotNull(dataBroker);
31     }
32
33     void enqueueOperation(final TopologyOperation task) {
34         try {
35             queue.put(task);
36         } catch (InterruptedException e) {
37             LOG.warn("Interrupted while submitting task {}", task, e);
38         }
39     }
40
41     @Override
42     public void run() {
43         try {
44             for (; ; ) {
45                 TopologyOperation op = queue.take();
46
47                 LOG.debug("New operations available, starting transaction");
48                 final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
49
50                 int ops = 0;
51                 do {
52                     op.applyOperation(tx);
53
54                     ops++;
55                     if (ops < MAX_TRANSACTION_OPERATIONS) {
56                         op = queue.poll();
57                     } else {
58                         op = null;
59                     }
60                 } while (op != null);
61
62                 LOG.debug("Processed {} operations, submitting transaction", ops);
63
64                 final CheckedFuture txResultFuture = tx.submit();
65                 Futures.addCallback(txResultFuture, new FutureCallback() {
66                     @Override
67                     public void onSuccess(Object o) {
68                         LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
69                     }
70
71                     @Override
72                     public void onFailure(Throwable throwable) {
73                         LOG.error("Topology export transaction {} failed", tx.getIdentifier(), throwable.getCause());
74                     }
75                 });
76             }
77         } catch (InterruptedException e) {
78             LOG.info("Interrupted processing, terminating", e);
79         }
80
81         // Drain all events, making sure any blocked threads are unblocked
82         while (!queue.isEmpty()) {
83             queue.poll();
84         }
85     }
86 }