2 * Copyright (c) 2017 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.atomic.AtomicInteger;
15 import java.util.concurrent.atomic.AtomicLong;
17 import org.opendaylight.mdsal.binding.api.DataBroker;
18 import org.opendaylight.mdsal.binding.api.WriteTransaction;
19 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
23 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 public class TableWriter implements FlowCounterMBean {
28 private static final Logger LOG = LoggerFactory.getLogger(TableWriter.class);
30 private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
31 private final AtomicLong taskCompletionTime = new AtomicLong(BulkOMaticUtils.DEFAULT_COMPLETION_TIME);
32 private final AtomicInteger successfulWrites = new AtomicInteger();
33 private final AtomicInteger failedWrites = new AtomicInteger();
34 private final DataBroker dataBroker;
35 private final ExecutorService tablePusher;
37 public TableWriter(final DataBroker dataBroker, final ExecutorService tablePusher) {
38 this.dataBroker = dataBroker;
39 this.tablePusher = tablePusher;
42 public void addTables(final int dpnCount, final short startTableId, final short endTableId) {
43 LOG.info("Starting to add tables: {} to {} on each of {}", startTableId, endTableId, dpnCount);
44 TableHandlerTask task = new TableHandlerTask(dpnCount, startTableId, endTableId, true);
45 tablePusher.execute(task);
48 public void deleteTables(int dpnCount, short startTableId, short endTableId) {
49 LOG.info("Starting to delete tables: {} to {} on each of {}", startTableId, endTableId, dpnCount);
50 TableHandlerTask task = new TableHandlerTask(dpnCount, startTableId, endTableId, false);
51 tablePusher.execute(task);
55 public int getWriteOpStatus() {
56 return writeOpStatus.get();
60 public long getTaskCompletionTime() {
61 return taskCompletionTime.get();
65 public long getTableCount() {
66 return successfulWrites.get();
69 private class TableHandlerTask implements Runnable {
71 private final short startTableId;
72 private final short endTableId;
73 private final int dpnCount;
74 private final boolean isAdd;
76 TableHandlerTask(int dpnCount, short startTableId, short endTableId, boolean isAdd) {
77 this.dpnCount = dpnCount;
78 this.startTableId = startTableId;
79 this.endTableId = endTableId;
85 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
86 int totalTables = dpnCount * (endTableId - startTableId + 1);
88 for (int dpn = 1; dpn <= dpnCount; dpn++) {
89 String dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + String.valueOf(dpn);
90 for (short tableId = startTableId; tableId <= endTableId; tableId++) {
91 WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
92 Table table = new TableBuilder().withKey(new TableKey(tableId)).setId(tableId).build();
93 InstanceIdentifier<Table> tableIId = BulkOMaticUtils.getTableId(tableId, dpId);
96 wtx.put(LogicalDatastoreType.CONFIGURATION, tableIId, table, true);
98 wtx.delete(LogicalDatastoreType.CONFIGURATION, tableIId);
101 wtx.commit().addCallback(new FutureCallback<Object>() {
103 public void onSuccess(Object voidParameter) {
104 if (successfulWrites.incrementAndGet() == totalTables) {
105 if (failedWrites.get() > 0) {
106 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
108 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
114 public void onFailure(Throwable throwable) {
115 LOG.error("Table addition Failed.", throwable);
116 if (failedWrites.incrementAndGet() == totalTables) {
117 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
120 }, MoreExecutors.directExecutor());