Skip to content

Latest commit

 

History

History
412 lines (349 loc) · 9.71 KB

0.手写线程池统计耗时与峰值内存框架.md

File metadata and controls

412 lines (349 loc) · 9.71 KB

build.sh:用于运行程序

#!/bin/bash
set -e

if [[ -z $1 ]]; then
  printf "Usage:\n"
  printf "  ./build.sh workspace\n"
  exit 1
fi


rm -rf a./out
g++ main.cc $1/thread_test.cc -pthread -std=c++11 -o data/a.out
time ./data/a.out

main.cc:执行任务,并统计耗时与峰值内存

#include <stdio.h>
#include <vector>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include <numeric>
#include <chrono>

#define DATA_NUM 100000000

class Timer {
 public:
  Timer(const char* tag): tag_(tag) {
    start_ = std::chrono::high_resolution_clock::now();
  }
  ~Timer() {
    auto end = std::chrono::high_resolution_clock::now();
    size_t dis = std::chrono::duration_cast<std::chrono::milliseconds>(end - start_).count();
    printf("[%s] time: %zu ms\n", tag_.c_str(), dis);
  }
 private:
  std::string tag_;
  std::chrono::time_point<std::chrono::high_resolution_clock> start_;
};

void ReadData(std::vector<int> &data)
{
  FILE *fp = fopen("./data/random_data", "r");
  if (fp)
  {
    size_t rs = fread(data.data(), sizeof(int), data.size(), fp);
    fclose(fp);
    if(rs != DATA_NUM) {
      printf("Error:***\n");
      printf("read size:%zu. But size of file [./data/random_data] should be %d\n", rs, DATA_NUM);
      exit(1);
    }
  }
  else
  {
    printf("Error:***\n");
    printf("read fopen error:%s. make sure [./data/random_data] file exists\n", strerror(errno));
    exit(1);
  }
}

int PeakMemory()
{
  int ret = -1;
  int pid = getpid();
  char path[30] = {0};
  char buffer[1000];
  sprintf(path, "/proc/%d/status", pid);

  size_t size = 0;
  std::string s = "";
  FILE *fp = fopen(path, "r");
  if (nullptr == fp)
  {
    return -1;
  }

  do
  {
    memset(buffer, 0, sizeof(buffer));
    size = (size_t)fread(buffer, 1, sizeof(buffer), fp);
    s += buffer;

  } while (size > 0);

  std::string::size_type pos = s.find("VmHWM:");

  if (pos != std::string::npos)
  {
    size_t begin = pos + 6;
    std::string::size_type pos2 = s.find("kB", begin);

    if (pos2 != std::string::npos)
    {
      std::string sum = s.substr(begin, pos2 - begin - 1);

      ret = atoi(sum.c_str());
    }
  }
  fclose(fp);
  return ret;
}

double CalAvg(std::vector<int> &data) {
  return std::accumulate(data.begin(), data.end(), 0.0) / data.size();
}

extern void MyInit();
extern void AddPrev(std::vector<int> &data);
extern void SubOperate(std::vector<int> &data);

void DataCalTest() {
  std::vector<int> data(DATA_NUM, 0);
  ReadData(data);
  Timer t("DataCalculate");
  AddPrev(data);
  SubOperate(data);
  printf("cal_avg:%g\n", CalAvg(data));
}

extern void ThreadPoolTest();

int main()
{
  MyInit();
  ThreadPoolTest();
  DataCalTest();
  printf("peak mem:%d kB\n", PeakMemory());
  return 0;
}

thread_test.cc:实现线程池,并完成任务

/*
 * Copyright (c) 2021 AnC Technology Co., Ltd. All rights reserved.
 * @Author      : wangyuhang
 * @Date        : 2023-05-26
 * @Description :
 */

#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)

using namespace std;

// 1、实现自己的线程池,可设置线程数目、任务优先级(优先级从0到9,数字越高优先级越高),可正常关闭
class ThreadPool {
 public:
  ThreadPool(int size) {
    for (int i = 0; i < size; i++) {
      AddThread();
    }
  }

  ~ThreadPool() {
    {
      unique_lock<mutex> lck(mtx_);
      is_stop_ = true;
    }

    cv_.notify_all();

    for (auto& tid : pool_)
      if (tid.joinable()) tid.join();
  }

  void AddThread() {
    pool_.emplace_back([this]() {
      while (true) {
        Task task;
        {
          unique_lock<mutex> lck(mtx_);
          cv_.wait(lck, [this]() { return !task_.empty() || is_stop_; });

          if (is_stop_ && task_.empty()) return;

          task = move(task_.top());
          task_.pop();
        }
        task.func();
      }
    });
  }

  template <typename F, typename... Args>
  void PutTask(int priority, F&& f, Args&&... args) {
    auto func = bind(forward<F>(f), forward<Args>(args)...);
    {
      unique_lock<mutex> lck(mtx_);
      task_.emplace(priority, func);
    }
    cv_.notify_one();
  }

 private:
  struct Task {
    int priority;
    function<void()> func;

    Task() = default;
    Task(int p, function<void()> f) : priority(p), func(move(f)) {}

    bool operator<(const Task& other) const {
      return priority < other.priority;
    }
  };

  priority_queue<Task> task_;
  vector<thread> pool_;
  int size_{0};
  bool is_stop_{false};
  condition_variable cv_;
  mutex mtx_;
};

void MyInit() {
  // 2、个人的其它全局自定义初始化
}

void ThreadPoolTest() {
  /* 3、 线程优先级测试。
    a、构建线程数为2的线程池
    b、添加10个随机优先级(0~9)的任务,每个任务建议sleep时间 >= 100ms
    c、打印任务执行顺序和对应优先级
  */
  cout << "------------ThreadPoolTest Start------------" << endl;

  ThreadPool tp(2);
  for (int i = 0; i < 10; i++) {
    tp.PutTask(
        i,
        [](int num) {
          this_thread::sleep_for(chrono::milliseconds(100));
          cout << "Tid: " << this_thread::get_id() << " Priority: " << num
               << endl;
        },
        i);
  }

  cout << "------------ThreadPoolTest End------------" << endl;
}

void AddPrev(std::vector<int>& data) {
  // 4、实现每个元素数据和前一个原始数据的求和
  // 【多线程实现】

  cout << "------------AddPrev Start------------" << endl;

  std::vector<int> data_bak(data);

  int start_idx = 0;
  int each_group_size = data.size() / 10;
  int remain_size = data.size() % 10;
  // cout << "each_group_size: " << each_group_size << endl;
  // cout << "remain_size: " << remain_size << endl;

  // serial 方式1
  // for (int i = 1; i < data_bak.size(); i++) {
  //   data[i] = data_bak[i] + ((data_bak[i - 1]) % 10);
  // }

  // serial 方式2
  // for (int i = 0; i < 10; i++) {
  //   for (int j = 1; j < each_group_size; j++) {
  //     data[i * each_group_size + j] =
  //         data_bak[i * each_group_size + j] +
  //         ((data_bak[i * each_group_size + j - 1]) % 10);
  //   }
  // }

  // parallel 方式1
  // ThreadPool tp(8);
  // for (int i = 0; i < 10; i++) {
  //   tp.PutTask(1, [&]() { data[i] = data_bak[i] + ((data_bak[i - 1]) % 10);
  //   });
  // }

  // parallel 方式2
  ThreadPool tp(8);
  for (int i = 0; i < 10; i++) {
    tp.PutTask(1, [&, i]() {
      for (int j = 0; j < each_group_size; j++) {
        if (UNLIKELY(i == 0 && j == 0)) continue;
        data[i * each_group_size + j] =
            data_bak[i * each_group_size + j] +
            ((data_bak[i * each_group_size + j - 1]) % 10);
      }
    });
  }

  cout << "------------AddPrev End------------" << endl;
}

void SubOperate(std::vector<int>& data) {
  // 5、总数据平分10份,编号0~9。编号奇数的一份数据和下一份的原始数据逐个求和,偶数份则和下一段原始数据逐个求差,结果保存到当前数据中
  // 【多线程实现】

  cout << "------------SubOperate Start------------" << endl;

  // fill groups from data
  std::vector<std::vector<int>> groups;
  int start_idx = 0;
  int each_group_size = data.size() / 10;
  for (int i = 0; i < 10; ++i) {
    std::vector<int> group(data.begin() + start_idx,
                           data.begin() + start_idx + each_group_size);
    groups.push_back(group);
    start_idx += each_group_size;
  }
  std::vector<std::vector<int>> groups_bak(groups);

  // [1, 2,  3,  4,  5,  6,  7,  8,  9,  10],
  // [2, 3,  4,  5,  6,  7,  8,  9,  10, 11],
  // [3, 4,  5,  6,  7,  8,  9,  10, 11, 12],
  // [4, 5,  6,  7,  8,  9,  10, 11, 12, 13],
  // [5, 6,  7,  8,  9,  10, 11, 12, 13, 14],
  // [6, 7,  8,  9,  10, 11, 12, 13, 14, 15],
  // [7, 8,  9,  10, 11, 12, 13, 14, 15, 16],
  // [8, 9,  10, 11, 12, 13, 14, 14, 15, 16],
  // [9, 10, 11, 12, 13, 14, 15, 14, 15, 16],
  // [9, 10, 11, 12, 13, 14, 15, 14, 15, 16]

  // serial
  // even 0 2 4 6 8
  // for (int i = 0; i < 10; i += 2) {
  //   for (int j = 0; j < each_group_size; j++) {
  //     groups[i][j] = groups_bak[i][j] - groups_bak[i + 1][j];
  //   }
  // }
  // // odd 1 3 5 7
  // for (int i = 1; i < 9; i += 2) {
  //   for (int j = 0; j < each_group_size; j++) {
  //     groups[i][j] = groups_bak[i][j] + groups_bak[i + 1][j];
  //   }
  // }

  // parallel 方式1
  // ThreadPool tp(8);
  // even 0 2 4 6 8
  // for (int i = 0; i < 10; i += 2) {
  //   for (int j = 0; j < groups[i].size(); j++) {
  //     tp.PutTask(
  //         1, [&]() { groups[i][j] = groups_bak[i][j] - groups_bak[i + 1][j];
  //         });
  //   }
  // }
  // odd 1 3 5 7
  // for (int i = 1; i < 9; i += 2) {
  //   for (int j = 0; j < groups[i].size(); j++) {
  //     tp.PutTask(
  //         1, [&]() { groups[i][j] = groups_bak[i][j] + groups_bak[i + 1][j];
  //         });
  //   }
  // }

  // parallel 方式2
  {
    ThreadPool tp(8);
    // even 0 2 4 6 8
    for (int i = 0; i < 10; i += 2) {
      tp.PutTask(1, [&, i]() {
        for (int j = 0; j < each_group_size; j++) {
          groups[i][j] = groups_bak[i][j] - groups_bak[i + 1][j];
        }
      });
    }
    // odd 1 3 5 7
    for (int i = 1; i < 9; i += 2) {
      tp.PutTask(1, [&, i]() {
        for (int j = 0; j < each_group_size; j++) {
          groups[i][j] = groups_bak[i][j] + groups_bak[i + 1][j];
        }
      });
    }
  }

  // copy groups to data
  {
    ThreadPool tp(8);
    for (int i = 0; i < 10; i++) {
      tp.PutTask(1, [&, i]() {
        for (int j = 0; j < each_group_size; j++) {
          data[i * each_group_size + j] = groups[i][j];
        }
      });
    }
  }

  cout << "------------SubOperate End------------" << endl;
}