线程池的学习和使用

什么是线程池

线程池的作用是初始化一些线程,当有任务的时候,就从中启动一个来执行相关任务,执行完后,线程资源重新回收到线程池中,达到复用的效果,从而减少资源的开销;

本文将介绍线程池的创建方式、优化和实际使用这几个方面进行讲解;

创建线程池

在JDK中,Executors类已经帮我们封装了创建线程池的方法。

1
2
3
Executors.newFixedThreadPool();
Executors.newCachedThreadPool();
Executors.newScheduledThreadPool();

但是点进去看的话,

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

它的内部实现还是基于ThreadPoolExecutor来实现的。通过阿里代码规范插件扫描会提示我们用ThreadPoolExecutor去实现线程池。通过查看ThreadPoolExecutor的构造方法

1
2
3
4
5
6
7
8
9
10
11
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
do something
...
}

我觉得有以下几方面的原因。

  1. 可以灵活设置keepAliveTime(当线程池中线程数大于corePoolSize的数m, 为这m个线程设置的最长等待时间 ),节约系统资源。
  2. workQueue:线程等待队列,在Executors中默认的是LinkedBlockingQueue。可以理解是一种无界的数组,当有不断有线程来的时候,可能会撑爆机器内存。
  3. 可以设线程工厂,里面添加自己想要的一些元素,只需要实现JDK的ThreadFactory类。
  4. 按照自己的业务设置合适的拒绝策略。策略有以下几种
    1. AbortPolicy:直接抛出拒绝异常(继承自RuntimeException),会中断调用者的处理过程,所以除非有明确需求,一般不推荐
    2. DiscardPolicy:默默丢弃无法加载的任务。
    3. DiscardOldestPolicy:丢弃队列中最老的,然后再次尝试提交新任务。
    4. CallerRunsPolicy:在调用者线程中(也就是说谁把 r 这个任务甩来的),运行当前被丢弃的任务。只会用调用者所在线程来运行任务,也就是说任务不会进入线程池。如果线程池已经被关闭,则直接丢弃该任务。

使用线程池

声明ThreadFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class NacosSyncThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger(1);
private String threadPrefix = null;
private ThreadGroup threadGroup;

public NacosSyncThreadFactory(String prefix) {
this.threadPrefix = "thread" + "-" + prefix + "-" ;
threadGroup = Thread.currentThread().getThreadGroup();

}

public NacosSyncThreadFactory() {
this("pool");
}

@Override
public Thread newThread(Runnable r) {
String name = threadPrefix + threadNum.incrementAndGet();
Thread thread = new Thread(threadGroup, r, name);
return thread;
}
}

创建线程池类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MyThreadPool {
private ThreadFactory threadFactory;
private int threadNum;
private BlockingQueue blockingQueue;
private RejectedExecutionHandler handler;

public MyThreadPool(ThreadFactory threadFactory, int threadNum,
BlockingQueue blockingQueue,
RejectedExecutionHandler handler ) {
this.threadFactory = threadFactory;
this.threadNum = threadNum;
this.blockingQueue = blockingQueue;
this.handler = handler;
}

public MyThreadPool() {
this(Executors.defaultThreadFactory(), 10,
new ArrayBlockingQueue(10), new ThreadPoolExecutor.AbortPolicy());
}

public ThreadPoolExecutor initThreadPool(ThreadFactory threadFactory, int threadNum, BlockingQueue blockingQueue, RejectedExecutionHandler handler) {
if (handler == null) {
handler = new ThreadPoolExecutor.AbortPolicy();
}


return new ThreadPoolExecutor(1, threadNum, 5, TimeUnit.SECONDS, blockingQueue, threadFactory, handler);
}

}

调用线程池

  1. 初始化线程池类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
      MyThreadPool myThreadPool = new MyThreadPool();

    threadPoolExecutor = myThreadPool.initThreadPool(
    new NacosSyncThreadFactory("nacos-sync"),
    threadNum,
    new ArrayBlockingQueue(10),
    new ThreadPoolExecutor.DiscardPolicy()

    );

    }
  2. 创建Callable(FutureTask)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
     /**
    * 分页获取task信息
    * @return
    */
    private List<Task> getTask(int pageNum) {
    IPage<Task> page = new Page(pageNum, 25);
    IPage<Task> taskIPage = this.taskService.page(page);
    if (null == taskIPage || CollectionUtils.isEmpty(taskIPage.getRecords())) {
    return null;
    }

    return taskIPage.getRecords();

    }

    // 执行任务
    private FutureTask<String> assembleTaskFuture(Task task) {
    FutureTask<String> futureTask = new FutureTask(() -> {

    // 执行任务
    this.doSyncWork(task);
    return "success";
    });

    return futureTask;
    }
  3. 执行任务(FutureTask)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public void zkSync() {
    // 获取数据总数,得到线程数
    int count = this.taskService.count();
    int pageSize = 25;
    int num = count / pageSize;
    int pageTotal = count % pageSize == 0 ? num : num + 1;
    log.info("========总记录数:{}=====总页数:{}", count, pageTotal);

    for (int i = 1; i <= pageTotal; i++) {
    List<Task> taskList = this.getTask(i);
    if (CollectionUtils.isEmpty(taskList)) {
    break;
    }
    List<Integer> collect = taskList.stream().map(task -> task.getId()).collect(Collectors.toList());
    taskList.forEach(task -> {
    FutureTask<String> futureTask = this.assembleTaskFuture(task);
    threadPoolExecutor.execute(futureTask);
    });

    }

    threadPoolExecutor.shutdown();

    }