源本科技 | 码上会

Java TCP 线程池优化

2026/03/04
19
0

为什么要引入线程池

在之前的“多发多收”模型中,我们为每个客户端连接创建了一个新的 Thread 对象。这种模式在客户端数量较少时工作良好,但在高并发场景下存在严重隐患:

传统模式的弊端

  • 资源消耗巨大:每个线程都需要占用一定的栈内存(默认约 1MB)。如果有 10,000 个客户端同时连接,仅线程栈就需要消耗约 10GB 内存,极易导致 OutOfMemoryError

  • CPU 上下文切换开销:操作系统需要在成千上万个线程之间频繁切换,消耗大量 CPU 时间片,导致实际处理业务逻辑的时间减少,系统响应变慢甚至“瘫痪”。

  • 启动延迟:频繁创建和销毁线程会带来额外的时间开销。

线程池的优势

引入 线程池 (Thread Pool) 是解决上述问题的标准方案:

  • 复用线程:预先创建一定数量的线程,重复利用,避免频繁创建 / 销毁。

  • 控制并发量:限制系统中同时运行的线程总数,保护服务器资源不被耗尽。

  • 任务队列管理:当所有线程都在忙时,新任务进入队列等待,提供缓冲机制。

  • 统一管理:便于监控线程状态、设置超时策略和优雅关闭。

结论:在生产环境中,严禁accept() 循环中直接 new Thread(),必须使用线程池来管理客户端处理任务。


核心组件

Java 提供了 java.util.concurrent.ExecutorService 接口和 ThreadPoolExecutor 实现类来构建线程池。

构造函数参数

new ThreadPoolExecutor(
    int corePoolSize,              // 核心线程数:即使空闲也不会被回收的线程数
    int maximumPoolSize,           // 最大线程数:允许创建的最大线程数量
    long keepAliveTime,            // 非核心线程的空闲存活时间
    TimeUnit unit,                 // 时间单位
    BlockingQueue<Runnable> workQueue, // 任务队列:存放等待执行的任务
    RejectedExecutionHandler handler   // 拒绝策略:当队列满且线程达最大值时的处理方式
)

参数配置建议

针对 TCP 服务端

  • CorePoolSize: 建议设置为 CPU 核数的 2-4 倍,或者根据业务 IO 密集程度调整。

  • MaximumPoolSize: 根据服务器内存和业务响应时间要求设定上限,防止无限膨胀。

  • WorkQueue:

    • LinkedBlockingQueue (无界队列):慎用。如果任务堆积过快,可能导致内存溢出。

    • ArrayBlockingQueue (有界队列):推荐。指定容量,配合合理的拒绝策略,更能保护系统。

  • RejectedExecutionHandler:

    • AbortPolicy (默认): 抛出异常,快速失败。

    • CallerRunsPolicy: 由调用者线程(即主线程)执行该任务,起到限流作用,适合服务端

    • DiscardPolicy: 直接丢弃任务。


服务端重构

基于线程池的高并发模型

我们将之前的“每连接一线程”模型重构为“线程池处理模型”。

优化后的服务端代码

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;

public class TCPServerWithPool {
    public static void main(String[] args) {
        int serverPort = 12345;

        // 【关键】配置线程池
        // 核心线程数:5,最大线程数:20,空闲存活 60 秒
        // 使用有界队列 (容量 100),防止内存溢出
        // 拒绝策略:CallerRunsPolicy (由主线程处理,起到背压作用)
        ExecutorService executorService = new ThreadPoolExecutor(
                5, 
                20, 
                60L, 
                TimeUnit.SECONDS, 
                new ArrayBlockingQueue<>(100), 
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        System.out.println("服务端已启动,监听端口:" + serverPort);
        System.out.println("线程池配置 -> 核心:" + 5 + ", 最大:" + 20 + ", 队列容量:" + 100);

        try (ServerSocket serverSocket = new ServerSocket(serverPort)) {
            
            while (true) {
                // 1. 接受连接 (阻塞)
                Socket clientSocket = serverSocket.accept();
                System.out.println("新客户端连接:" + clientSocket.getInetAddress());

                // 2. 将处理任务提交给线程池
                try {
                    executorService.execute(() -> handleClient(clientSocket));
                } catch (RejectedExecutionException e) {
                    // 如果线程池已满且队列已满,执行拒绝策略
                    System.err.println("服务器负载过高,拒绝连接:" + clientSocket.getInetAddress());
                    try {
                        clientSocket.close();
                    } catch (IOException ex) {
                        ex.printStackTrace();
                    }
                }
            }

        } catch (IOException e) {
            System.err.println("服务端异常:" + e.getMessage());
            e.printStackTrace();
        } finally {
            // 优雅关闭线程池 (通常在接收到停止信号时调用)
            executorService.shutdown();
        }
    }

    /**
     * 具体的客户端业务处理逻辑
     */
    private static void handleClient(Socket clientSocket) {
        // 使用 try-with-resources 确保当前连接的资源释放
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
             PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true)) {

            String message;
            // 循环读取客户端消息
            while ((message = reader.readLine()) != null) {
                System.out.println("[" + Thread.currentThread().getName() + "] 收到: " + message);
                
                // 模拟业务处理耗时
                // Thread.sleep(100); 

                // 回显响应
                writer.println("Server Echo: " + message);
            }
            
            System.out.println("客户端断开:" + clientSocket.getInetAddress());

        } catch (IOException e) {
            System.err.println("通信错误:" + e.getMessage());
        } finally {
            try {
                if (!clientSocket.isClosed()) {
                    clientSocket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

改进点

  1. 有界队列 (ArrayBlockingQueue):替代了原始的无界 LinkedBlockingQueue。当并发请求超过处理能力时,队列满了会触发拒绝策略,而不是无限消耗内存。

  2. 合理的拒绝策略 (CallerRunsPolicy):当服务器过载时,让主线程(accept 线程)自己去处理任务。这会减慢 accept 的速度,从而自然地降低客户端的连接速率,形成背压 (Backpressure) 机制,保护系统不崩溃。

  3. 异常处理隔离:在 execute 调用处捕获 RejectedExecutionException,防止因拒绝任务导致主线程崩溃。

  4. 资源封闭handleClient 方法内部严格管理 Socket 和流的关闭。


客户端模拟

多线程压力测试

为了验证服务端的并发能力,我们编写一个多线程客户端,模拟多个用户同时发起请求。

客户端代码

import java.io.*;
import java.net.Socket;
import java.util.concurrent.*;

public class TCPClientStressTest {
    public static void main(String[] args) {
        String serverAddress = "127.0.0.1";
        int serverPort = 12345;
        int totalRequests = 10; // 模拟发送的请求总数

        // 客户端也使用线程池来模拟并发用户
        ExecutorService clientPool = new ThreadPoolExecutor(
                2, 
                5, 
                10, 
                TimeUnit.SECONDS, 
                new LinkedBlockingQueue<>()
        );

        System.out.println("开始压力测试,发送 " + totalRequests + " 个请求...");

        CountDownLatch latch = new CountDownLatch(totalRequests);

        for (int i = 0; i < totalRequests; i++) {
            final int taskId = i;
            clientPool.execute(() -> {
                try {
                    // 模拟网络延迟,让请求错落有致
                    Thread.sleep((long) (Math.random() * 500));

                    try (Socket socket = new Socket(serverAddress, serverPort);
                         PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
                         BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {

                        String message = "Request from Client-" + taskId;
                        writer.println(message);
                        System.out.println("发送:" + message);

                        String response = reader.readLine();
                        System.out.println("收到响应:" + response);

                    } catch (IOException e) {
                        System.err.println("客户端 " + taskId + " 通信失败:" + e.getMessage());
                    } finally {
                        latch.countDown();
                    }

                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // 等待所有任务完成
        try {
            latch.await();
            System.out.println("所有测试完成。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 关闭客户端线程池
        clientPool.shutdown();
    }
}

测试要点

  • CountDownLatch: 用于主线程等待所有子线程任务执行完毕,确保程序不会提前退出。

  • 随机延迟: Thread.sleep 模拟真实用户操作的不确定性,避免所有请求完全同步到达。

  • 自动资源关闭: 同样使用了 try-with-resources。


最佳实践

三种模型对比

特性

单线程模型

每连接一线程

线程池模型

并发能力

极低 (1 个)

中等 (受限于内存)

高 (可控)

资源消耗

高 (随连接数线性增长)

稳定 (有上限)

响应速度

慢 (排队)

快 (但上下文切换多)

快且稳

抗冲击性

差 (易 OOM)

强 (有背压机制)

适用场景

学习 Demo

低并发内部工具

生产环境服务器

生产环境配置建议

在实际部署中,线程池参数不能硬编码,应根据服务器性能动态调整:

  1. CPU 密集型任务corePoolSize = CPU 核数 + 1。

  2. IO 密集型任务 (如 TCP 网络读写):corePoolSize = CPU 核数 * 2 或更多 (因为线程大部分时间在等待 IO)。

  3. 监控:集成监控系统(如 Prometheus + Grafana),实时观察线程池的 ActiveCount, QueueSize, CompletedTaskCount 等指标。

  4. 替代方案:对于超高并发(如 C10K, C100K 问题),传统的 BIO (Blocking IO) + 线程池可能仍显吃力,此时应考虑 NIO (Non-blocking IO) 框架,如 Netty 或 Java 原生的 Selector 机制。

常见陷阱

  • 线程泄露:如果任务中发生异常且未捕获,可能导致线程终止,线程池需配置 ThreadFactory 来处理未捕获异常。

  • 死锁:在线程池任务中提交任务到同一个线程池(且队列满时),可能导致死锁。

  • 长时间阻塞:如果 handleClient 中有耗时极长的操作(如复杂计算、外部 API 慢响应),会占用线程池名额,导致其他正常请求被拒绝。解决方案:将耗时操作异步化或放入专门的独立线程池。