diff --git a/demo/fiber/iotdb-sync-stress-demo/README.md b/demo/fiber/iotdb-sync-stress-demo/README.md new file mode 100644 index 00000000000..69ff1bef422 --- /dev/null +++ b/demo/fiber/iotdb-sync-stress-demo/README.md @@ -0,0 +1,39 @@ +dependency: +1. iotdb [v0.12.1](https://www.apache.org/dyn/closer.cgi/iotdb/0.12.1/apache-iotdb-0.12.1-server-bin.zip) + +2. start iotdb server +``` +# Unix/OS X +> nohup sbin/start-server.sh >/dev/null 2>&1 & +or +> nohup sbin/start-server.sh -c -rpc_port >/dev/null 2>&1 & + +# Windows +> sbin\start-server.bat -c -rpc_port +``` +3. start iotdb cli + +``` +# Unix/OS X +> sbin/start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root + +# Windows +> sbin\start-cli.bat -h 127.0.0.1 -p 6667 -u root -pw root +``` + +test steps: +1. create DATABASE zm;(execute "SET STORAGE GROUP TO root.zm") +2. insert data into root.zm;(execute "INSERT INTO root.zm.wf01.wt01(timestamp,status,temperature) values(200,false,20.71)"); +3. execute "java -jar target/iotdb-sync-stress-demo-1.0-SNAPSHOT-jar-with-dependencies.jar 1000 100000 0"(1000 means thread num, 100000 means request count, 0 means use fiber) + + +test result: +| thread count | use thread directly | use thread pool | KonaFiber | async | +| ------------ | ------------ | ------------ | ------------ | ------------ | +| 1000 |30238.88 |38624.95 | 44326.24 | 51440.32 | +| 3000 |2151.46 | 16680.56 |42354.93 | 51786.63 | + +test device: +MacBook Pro (16-inch, 2019) +2.6 GHz 6-core processor Core i7 +16 GB 2667 MHz DDR4 \ No newline at end of file diff --git a/demo/fiber/iotdb-sync-stress-demo/pom.xml b/demo/fiber/iotdb-sync-stress-demo/pom.xml new file mode 100644 index 00000000000..e9b89fdcda0 --- /dev/null +++ b/demo/fiber/iotdb-sync-stress-demo/pom.xml @@ -0,0 +1,65 @@ + + + 4.0.0 + + org.example + iotdb-sync-stress-demo + 1.0-SNAPSHOT + + + 8 + 8 + + + + org.apache.iotdb + iotdb-session + 0.12.0 + + + org.apache.iotdb + iotdb-jdbc + 0.12.0 + + + + + + + src/main/resource + true + + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.4.1 + + + + jar-with-dependencies + + + + + SyncDatabaseDemo + + + + + + make-assembly + + package + + single + + + + + + + + \ No newline at end of file diff --git a/demo/fiber/iotdb-sync-stress-demo/src/main/java/ConnectionNode.java b/demo/fiber/iotdb-sync-stress-demo/src/main/java/ConnectionNode.java new file mode 100644 index 00000000000..bbaa986c174 --- /dev/null +++ b/demo/fiber/iotdb-sync-stress-demo/src/main/java/ConnectionNode.java @@ -0,0 +1,46 @@ +/* + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * DO NOT ALTER OR REMOVE NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. THL A29 Limited designates + * this particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; + +public class ConnectionNode { + private static String url = "jdbc:iotdb://127.0.0.1:6667/"; + private static String u = "root"; + private static String p = "root"; + + ConnectionNode next = null; + Connection con; + Statement stm; + + public ConnectionNode() { + try { + Class.forName("org.apache.iotdb.jdbc.IoTDBDriver"); + con = DriverManager.getConnection(url, u, p); + + stm = con.createStatement(); + } catch (Exception e) { + } + } +} diff --git a/demo/fiber/iotdb-sync-stress-demo/src/main/java/ConnectionPool.java b/demo/fiber/iotdb-sync-stress-demo/src/main/java/ConnectionPool.java new file mode 100644 index 00000000000..3c6f6b3ce9d --- /dev/null +++ b/demo/fiber/iotdb-sync-stress-demo/src/main/java/ConnectionPool.java @@ -0,0 +1,94 @@ +/* + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * DO NOT ALTER OR REMOVE NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. THL A29 Limited designates + * this particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class ConnectionPool { + private static Lock lock = new ReentrantLock(); + + static ConnectionNode head = null; + static final int connectionCount = 16; + + public static ConnectionNode getConnection() { + lock.lock(); + if (head != null) { + ConnectionNode target = head; + head = head.next; + lock.unlock(); + return target; + } + + lock.unlock(); + + try { + Thread.sleep(100); + } catch (Exception e) { + } + + return null; + } + + public static void releaseConnection(ConnectionNode node) { + addConnectionNode(node); + } + + public static void addConnectionNode(ConnectionNode current) { + lock.lock(); + + current.next = head; + head = current; + lock.unlock(); + } + + public static void initConnectionPool() { + try { + for (int i = 0; i < connectionCount; i++) { + ConnectionNode current = new ConnectionNode(); + addConnectionNode(current); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void closeConnection() { + try { + for (int i = 0; i < connectionCount; i++) { + ConnectionNode node; + do { + node = getConnection(); + } while (node == null); + + if (node.stm != null) { + node.stm.close(); + } + + if (node.con != null) { + node.con.close(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/demo/fiber/iotdb-sync-stress-demo/src/main/java/SyncDatabaseDemo.java b/demo/fiber/iotdb-sync-stress-demo/src/main/java/SyncDatabaseDemo.java new file mode 100755 index 00000000000..c925a04c875 --- /dev/null +++ b/demo/fiber/iotdb-sync-stress-demo/src/main/java/SyncDatabaseDemo.java @@ -0,0 +1,224 @@ +/* + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * DO NOT ALTER OR REMOVE NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. THL A29 Limited designates + * this particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +import java.sql.ResultSet; +import java.util.Date; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +public class SyncDatabaseDemo { + private static ExecutorService db_executor; + private static ExecutorService e; + + private static int threadCount; + private static int requestCount; + private static int testOption; + private static int statsInterval; + + private static final int useFiber = 0; + private static final int useThreadDirect = 1; + private static final int useThreadAndThreadPool = 2; + private static final int useAsync = 3; + + public static String execQuery(String sql) throws InterruptedException, ExecutionException { + StringBuilder queryResult = new StringBuilder(); + try { + ConnectionNode node; + do { + node = ConnectionPool.getConnection(); + } while (node == null); + ResultSet rs = node.stm.executeQuery(sql); + + while (rs.next()) { + + Date time = rs.getDate("Time"); + String temperature = rs.getString("root.zm.wf01.wt01.temperature"); + boolean status = rs.getBoolean("root.zm.wf01.wt01.status"); + queryResult.append("time: ").append(time.toString()).append(" temperature:").append(temperature).append(" status: ").append(status).append("\n"); + } + + rs.close(); + ConnectionPool.releaseConnection(node); + } catch (Exception e) { + e.printStackTrace(); + } + + return queryResult.toString(); + } + + public static String submitQuery(String sql) throws InterruptedException, ExecutionException { + CompletableFuture future = new CompletableFuture<>(); + + Runnable r = new Runnable() { + @Override + public void run() { + try { + future.complete(execQuery(sql)); + } catch (Exception e) { + + } + } + }; + db_executor.execute(r); + + return future.get(); + } + + public static void testAsyncQuery() throws Exception { + CountDownLatch startSignal = new CountDownLatch(1); + CountDownLatch doneSignal = new CountDownLatch(requestCount); + AtomicLong count = new AtomicLong(); + AtomicLong statsTimes = new AtomicLong(); + + for (int i = 0; i < requestCount; i++) { + // Execute async operation + CompletableFuture cf = CompletableFuture.supplyAsync(() -> { + String result = null; + try { + startSignal.await(); + result = execQuery("select * from root.zm"); + } catch (Exception e) { + } + + return result; + }, e); + + // async operation is done, update statistics + cf.thenAccept(result -> { + long val = count.addAndGet(1); + if ((val % statsInterval) == 0) { + long time = System.currentTimeMillis(); + long prev = statsTimes.getAndSet(time); + System.out.println("interval " + val + " throughput " + statsInterval / ((time - prev) / 1000.0)); + } + doneSignal.countDown(); + }); + } + + long before = System.currentTimeMillis(); + statsTimes.set(before); + startSignal.countDown(); + doneSignal.await(); + + long after = System.currentTimeMillis(); + long duration = (after - before); + System.out.println("finish " + count.get() + " time " + duration + "ms throughput " + (count.get() / (duration / 1000.0))); + + e.shutdown(); + e.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } + + public static void testSyncQuery() throws Exception { + CountDownLatch startSignal = new CountDownLatch(1); + CountDownLatch doneSignal = new CountDownLatch(requestCount); + AtomicLong count = new AtomicLong(); + AtomicLong statsTimes = new AtomicLong(); + + Runnable r = new Runnable() { + @Override + public void run() { + try { + startSignal.await(); + String sql = "select * from root.zm"; + String result; + if (testOption == useFiber || testOption == useThreadAndThreadPool) { + // submit query to an independent thread pool; + result = submitQuery(sql); + } else { + // execute query direct(use current thread) + result = execQuery(sql); + } + //System.out.println("execute sql result is " + result); + + long val = count.addAndGet(1); + if ((val % statsInterval) == 0) { + long time = System.currentTimeMillis(); + long prev = statsTimes.getAndSet(time); + System.out.println("interval " + val + " throughput " + statsInterval / ((time - prev) / 1000.0)); + } + doneSignal.countDown(); + } catch (Exception e) { + + } + } + }; + + for (int i = 0; i < requestCount; i++) { + e.execute(r); + } + + long before = System.currentTimeMillis(); + statsTimes.set(before); + startSignal.countDown(); + doneSignal.await(); + + long after = System.currentTimeMillis(); + long duration = (after - before); + System.out.println("finish " + count.get() + " time " + duration + "ms throughput " + (count.get() / (duration / 1000.0))); + + e.shutdown(); + e.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + + if (testOption == useFiber || testOption == useThreadAndThreadPool) { + db_executor.shutdown(); + } + } + + public static void initExecutor() { + ThreadFactory factory; + if (testOption == useFiber) { + factory = Thread.ofVirtual().factory(); + } else { + factory = Thread.ofPlatform().factory(); + } + + if (testOption == useAsync) { + // thread count is equal to available processors when useAsync + threadCount = Runtime.getRuntime().availableProcessors(); + e = Executors.newWorkStealingPool(threadCount); + } else { + e = Executors.newFixedThreadPool(threadCount, factory); + } + + if (testOption == useFiber || testOption == useThreadAndThreadPool) { + // an independent thread pool which has 16 threads + db_executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); + } + } + + public static void main(String[] args) throws Exception { + threadCount = Integer.parseInt(args[0]); + requestCount = Integer.parseInt(args[1]); + testOption = Integer.parseInt(args[2]); + statsInterval = requestCount / 10; + + initExecutor(); + + ConnectionPool.initConnectionPool(); + if (testOption == useAsync) { + testAsyncQuery(); + } else { + testSyncQuery(); + } + ConnectionPool.closeConnection(); + } +}