在之前的“多发多收”模型中,我们为每个客户端连接创建了一个新的 Thread 对象。这种模式在客户端数量较少时工作良好,但在高并发场景下存在严重隐患:
资源消耗巨大:每个线程都需要占用一定的栈内存(默认约 1MB)。如果有 10,000 个客户端同时连接,仅线程栈就需要消耗约 10GB 内存,极易导致 OutOfMemoryError。
CPU 上下文切换开销:操作系统需要在成千上万个线程之间频繁切换,消耗大量 CPU 时间片,导致实际处理业务逻辑的时间减少,系统响应变慢甚至“瘫痪”。
启动延迟:频繁创建和销毁线程会带来额外的时间开销。
引入 线程池 (Thread Pool) 是解决上述问题的标准方案:
复用线程:预先创建一定数量的线程,重复利用,避免频繁创建 / 销毁。
控制并发量:限制系统中同时运行的线程总数,保护服务器资源不被耗尽。
任务队列管理:当所有线程都在忙时,新任务进入队列等待,提供缓冲机制。
统一管理:便于监控线程状态、设置超时策略和优雅关闭。
结论:在生产环境中,严禁在
accept()循环中直接new Thread(),必须使用线程池来管理客户端处理任务。
Java 提供了 java.util.concurrent.ExecutorService 接口和 ThreadPoolExecutor 实现类来构建线程池。
new ThreadPoolExecutor(
int corePoolSize, // 核心线程数:即使空闲也不会被回收的线程数
int maximumPoolSize, // 最大线程数:允许创建的最大线程数量
long keepAliveTime, // 非核心线程的空闲存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列:存放等待执行的任务
RejectedExecutionHandler handler // 拒绝策略:当队列满且线程达最大值时的处理方式
)针对 TCP 服务端
CorePoolSize: 建议设置为 CPU 核数的 2-4 倍,或者根据业务 IO 密集程度调整。
MaximumPoolSize: 根据服务器内存和业务响应时间要求设定上限,防止无限膨胀。
WorkQueue:
LinkedBlockingQueue (无界队列):慎用。如果任务堆积过快,可能导致内存溢出。
ArrayBlockingQueue (有界队列):推荐。指定容量,配合合理的拒绝策略,更能保护系统。
RejectedExecutionHandler:
AbortPolicy (默认): 抛出异常,快速失败。
CallerRunsPolicy: 由调用者线程(即主线程)执行该任务,起到限流作用,适合服务端。
DiscardPolicy: 直接丢弃任务。
基于线程池的高并发模型
我们将之前的“每连接一线程”模型重构为“线程池处理模型”。
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;
public class TCPServerWithPool {
public static void main(String[] args) {
int serverPort = 12345;
// 【关键】配置线程池
// 核心线程数:5,最大线程数:20,空闲存活 60 秒
// 使用有界队列 (容量 100),防止内存溢出
// 拒绝策略:CallerRunsPolicy (由主线程处理,起到背压作用)
ExecutorService executorService = new ThreadPoolExecutor(
5,
20,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy()
);
System.out.println("服务端已启动,监听端口:" + serverPort);
System.out.println("线程池配置 -> 核心:" + 5 + ", 最大:" + 20 + ", 队列容量:" + 100);
try (ServerSocket serverSocket = new ServerSocket(serverPort)) {
while (true) {
// 1. 接受连接 (阻塞)
Socket clientSocket = serverSocket.accept();
System.out.println("新客户端连接:" + clientSocket.getInetAddress());
// 2. 将处理任务提交给线程池
try {
executorService.execute(() -> handleClient(clientSocket));
} catch (RejectedExecutionException e) {
// 如果线程池已满且队列已满,执行拒绝策略
System.err.println("服务器负载过高,拒绝连接:" + clientSocket.getInetAddress());
try {
clientSocket.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
} catch (IOException e) {
System.err.println("服务端异常:" + e.getMessage());
e.printStackTrace();
} finally {
// 优雅关闭线程池 (通常在接收到停止信号时调用)
executorService.shutdown();
}
}
/**
* 具体的客户端业务处理逻辑
*/
private static void handleClient(Socket clientSocket) {
// 使用 try-with-resources 确保当前连接的资源释放
try (BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true)) {
String message;
// 循环读取客户端消息
while ((message = reader.readLine()) != null) {
System.out.println("[" + Thread.currentThread().getName() + "] 收到: " + message);
// 模拟业务处理耗时
// Thread.sleep(100);
// 回显响应
writer.println("Server Echo: " + message);
}
System.out.println("客户端断开:" + clientSocket.getInetAddress());
} catch (IOException e) {
System.err.println("通信错误:" + e.getMessage());
} finally {
try {
if (!clientSocket.isClosed()) {
clientSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}有界队列 (ArrayBlockingQueue):替代了原始的无界 LinkedBlockingQueue。当并发请求超过处理能力时,队列满了会触发拒绝策略,而不是无限消耗内存。
合理的拒绝策略 (CallerRunsPolicy):当服务器过载时,让主线程(accept 线程)自己去处理任务。这会减慢 accept 的速度,从而自然地降低客户端的连接速率,形成背压 (Backpressure) 机制,保护系统不崩溃。
异常处理隔离:在 execute 调用处捕获 RejectedExecutionException,防止因拒绝任务导致主线程崩溃。
资源封闭:handleClient 方法内部严格管理 Socket 和流的关闭。
多线程压力测试
为了验证服务端的并发能力,我们编写一个多线程客户端,模拟多个用户同时发起请求。
import java.io.*;
import java.net.Socket;
import java.util.concurrent.*;
public class TCPClientStressTest {
public static void main(String[] args) {
String serverAddress = "127.0.0.1";
int serverPort = 12345;
int totalRequests = 10; // 模拟发送的请求总数
// 客户端也使用线程池来模拟并发用户
ExecutorService clientPool = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
System.out.println("开始压力测试,发送 " + totalRequests + " 个请求...");
CountDownLatch latch = new CountDownLatch(totalRequests);
for (int i = 0; i < totalRequests; i++) {
final int taskId = i;
clientPool.execute(() -> {
try {
// 模拟网络延迟,让请求错落有致
Thread.sleep((long) (Math.random() * 500));
try (Socket socket = new Socket(serverAddress, serverPort);
PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
String message = "Request from Client-" + taskId;
writer.println(message);
System.out.println("发送:" + message);
String response = reader.readLine();
System.out.println("收到响应:" + response);
} catch (IOException e) {
System.err.println("客户端 " + taskId + " 通信失败:" + e.getMessage());
} finally {
latch.countDown();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 等待所有任务完成
try {
latch.await();
System.out.println("所有测试完成。");
} catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭客户端线程池
clientPool.shutdown();
}
}CountDownLatch: 用于主线程等待所有子线程任务执行完毕,确保程序不会提前退出。
随机延迟: Thread.sleep 模拟真实用户操作的不确定性,避免所有请求完全同步到达。
自动资源关闭: 同样使用了 try-with-resources。
在实际部署中,线程池参数不能硬编码,应根据服务器性能动态调整:
CPU 密集型任务:corePoolSize = CPU 核数 + 1。
IO 密集型任务 (如 TCP 网络读写):corePoolSize = CPU 核数 * 2 或更多 (因为线程大部分时间在等待 IO)。
监控:集成监控系统(如 Prometheus + Grafana),实时观察线程池的 ActiveCount, QueueSize, CompletedTaskCount 等指标。
替代方案:对于超高并发(如 C10K, C100K 问题),传统的 BIO (Blocking IO) + 线程池可能仍显吃力,此时应考虑 NIO (Non-blocking IO) 框架,如 Netty 或 Java 原生的 Selector 机制。
线程泄露:如果任务中发生异常且未捕获,可能导致线程终止,线程池需配置 ThreadFactory 来处理未捕获异常。
死锁:在线程池任务中提交任务到同一个线程池(且队列满时),可能导致死锁。
长时间阻塞:如果 handleClient 中有耗时极长的操作(如复杂计算、外部 API 慢响应),会占用线程池名额,导致其他正常请求被拒绝。解决方案:将耗时操作异步化或放入专门的独立线程池。