2 * Copyright (c) 2017 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.concurrent.atomic.AtomicLong;
18 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
19 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
24 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 public class TableWriter implements FlowCounterMBean {
29 private static final Logger LOG = LoggerFactory.getLogger(TableWriter.class);
31 private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
32 private final AtomicLong taskCompletionTime = new AtomicLong(BulkOMaticUtils.DEFAULT_COMPLETION_TIME);
33 private final AtomicInteger successfulWrites = new AtomicInteger();
34 private final AtomicInteger failedWrites = new AtomicInteger();
35 private final DataBroker dataBroker;
36 private final ExecutorService tablePusher;
38 public TableWriter(final DataBroker dataBroker, final ExecutorService tablePusher) {
39 this.dataBroker = dataBroker;
40 this.tablePusher = tablePusher;
43 public void addTables(final int dpnCount, final short startTableId, final short endTableId) {
44 LOG.info("Starting to add tables: {} to {} on each of {}", startTableId, endTableId, dpnCount);
45 TableHandlerTask task = new TableHandlerTask(dpnCount, startTableId, endTableId, true);
46 tablePusher.execute(task);
49 public void deleteTables(int dpnCount, short startTableId, short endTableId) {
50 LOG.info("Starting to delete tables: {} to {} on each of {}", startTableId, endTableId, dpnCount);
51 TableHandlerTask task = new TableHandlerTask(dpnCount, startTableId, endTableId, false);
52 tablePusher.execute(task);
56 public int getWriteOpStatus() {
57 return writeOpStatus.get();
61 public long getTaskCompletionTime() {
62 return taskCompletionTime.get();
66 public long getTableCount() {
67 return successfulWrites.get();
70 private class TableHandlerTask implements Runnable {
72 private final short startTableId;
73 private final short endTableId;
74 private final int dpnCount;
75 private final boolean isAdd;
77 TableHandlerTask(int dpnCount, short startTableId, short endTableId, boolean isAdd) {
78 this.dpnCount = dpnCount;
79 this.startTableId = startTableId;
80 this.endTableId = endTableId;
86 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
87 int totalTables = dpnCount * (endTableId - startTableId + 1);
89 for (int dpn = 1; dpn <= dpnCount; dpn++) {
90 String dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + String.valueOf(dpn);
91 for (short tableId = startTableId; tableId <= endTableId; tableId++) {
92 WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
93 Table table = new TableBuilder().withKey(new TableKey(tableId)).setId(tableId).build();
94 InstanceIdentifier<Table> tableIId = BulkOMaticUtils.getTableId(tableId, dpId);
97 wtx.put(LogicalDatastoreType.CONFIGURATION, tableIId, table, true);
99 wtx.delete(LogicalDatastoreType.CONFIGURATION, tableIId);
102 Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
104 public void onSuccess(Void voidParameter) {
105 if (successfulWrites.incrementAndGet() == totalTables) {
106 if (failedWrites.get() > 0) {
107 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
109 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
115 public void onFailure(Throwable throwable) {
116 LOG.error("Table addition Failed. Error: {}", throwable);
117 if (failedWrites.incrementAndGet() == totalTables) {
118 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
121 }, MoreExecutors.directExecutor());