源本科技 | 码上会

Java TCP 多发多收

2026/03/04
28
0

引言

从“一发一收”到“多发多收”

在基础的 TCP 入门中,我们通常只演示了“客户端发送一条消息 -> 服务端接收一条消息 -> 连接关闭”的流程。但在实际应用中(如即时通讯、文件传输),我们需要更复杂的交互模式:

  1. 多发(客户端):客户端需要在一次连接中,循环向服务端发送多条消息,而不是发完即断。

  2. 多收(服务端):服务端需要在一个连接中,循环读取客户端发来的所有数据,直到客户端主动断开。

  3. 多客户端并发:服务端不能因为处理一个客户端的消息而阻塞其他客户端的连接请求。单线程模型无法满足此需求,必须引入多线程机制。

核心结论

  • 客户端:使用 while 循环包裹发送逻辑,实现连续发送。

  • 服务端:使用 while(true) 循环监听端口,并为每一个新连接的客户端启动一个独立的子线程进行处理。


客户端升级

实现循环发送

客户端需要保持连接存活,允许用户多次输入并发送数据。

关键改进点

  • 循环结构:使用 whilefor 循环控制发送流程。

  • 缓冲流优化:使用 BufferedOutputStream 提高写入效率,减少系统调用次数。

  • 资源管理:确保在循环结束或发生异常时正确关闭 Socket。

支持多发模式的客户端

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;

public class TCPClientMultiSend {
    public static void main(String[] args) {
        String address = "127.0.0.1";
        int port = 12345;

        // 使用 try-with-resources 自动管理 Socket 和 Scanner
        try (Socket socket = new Socket(address, port);
             OutputStream outputStream = socket.getOutputStream();
             BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);
             Scanner scanner = new Scanner(System.in)) {

            System.out.println("已连接到服务器,请输入消息(输入 'exit' 退出):");

            // 死循环:持续等待用户输入并发送
            while (true) {
                System.out.print(">> ");
                String message = scanner.nextLine();

                // 退出条件
                if ("exit".equalsIgnoreCase(message)) {
                    System.out.println("正在退出...");
                    break;
                }

                // 发送数据
                byte[] data = message.getBytes();
                bufferedOutputStream.write(data);
                // 重要:每次写入后刷新缓冲区,确保数据立即发送
                bufferedOutputStream.flush();

                System.out.println("已发送:" + message);
            }

        } catch (IOException e) {
            System.err.println("通信发生错误:" + e.getMessage());
            e.printStackTrace();
        }
    }
}

注意

  • bufferedOutputStream.flush() 至关重要。如果没有它,数据可能停留在内存缓冲区中,直到缓冲区满或流关闭才发送,导致服务端收不到即时消息。

  • 增加了退出机制(输入 exit),避免死循环无法停止。


服务端升级

多线程并发模型

原始的单线程服务端在处理一个客户端时,会阻塞 accept(),导致其他客户端无法连接。

主线程 vs 子线程

  • 主线程

    • 职责:专门负责监听端口 (serverSocket.accept())。

    • 逻辑:一旦发现有新客户端连接,立即创建一个新的子线程来处理该客户端的后续通信,然后主线程立刻回到 accept() 继续监听下一个连接。

  • 子线程

    • 职责:专门负责与特定客户端进行数据收发。

    • 逻辑:循环读取该客户端的数据,直到客户端断开连接。

支持多客户端并发的服务端

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class TCPServerMultiClient {
    public static void main(String[] args) {
        int port = 12345;

        try (ServerSocket serverSocket = new ServerSocket(port)) {
            System.out.println("服务端已启动,监听端口:" + port);
            System.out.println("等待客户端连接...(支持多客户端并发)");

            // 主循环:一直监听
            while (true) {
                // 阻塞等待,直到有客户端连接
                Socket clientSocket = serverSocket.accept();
                System.out.println("新客户端已连接:" + clientSocket.getInetAddress());

                // 【核心】为每个客户端启动一个独立的子线程
                Thread clientThread = new Thread(() -> {
                    handleClient(clientSocket);
                });

                // 启动线程
                clientThread.start();
            }

        } catch (IOException e) {
            System.err.println("服务端启动失败:" + e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 处理单个客户端通信的逻辑(运行在子线程中)
     */
    private static void handleClient(Socket clientSocket) {
        // 使用 try-with-resources 确保当前客户端的 Socket 和流被正确关闭
        try (InputStream inputStream = clientSocket.getInputStream();
             BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream)) {

            byte[] buffer = new byte[1024];
            int bytesRead;

            // 循环读取该客户端发送的所有消息
            while ((bytesRead = bufferedInputStream.read(buffer)) != -1) {
                String message = new String(buffer, 0, bytesRead);
                System.out.println("[" + Thread.currentThread().getName() + "] 收到消息:" + message);
                
                // 这里可以添加业务逻辑,比如回显给客户端,或者广播给其他人
            }
            
            System.out.println("客户端断开连接:" + clientSocket.getInetAddress());

        } catch (IOException e) {
            System.err.println("处理客户端通信时出错:" + e.getMessage());
        } finally {
            // 确保即使发生异常也关闭 Socket
            try {
                if (!clientSocket.isClosed()) {
                    clientSocket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

逻辑解析

  1. 为什么需要 while(true)accept() 外层?

    • 如果去掉循环,服务端处理完第一个客户端后程序就会结束,无法服务第二个客户端。

  2. 为什么要把处理逻辑放到 handleClient 方法并在子线程运行?

    • accept() 返回后,如果主线程直接在这里写 while 循环读取数据,主线程会被阻塞在这个客户端上,无法执行下一次 accept()。这意味着同一时间只能有一个客户端能连接成功,其他客户端会一直卡在连接状态。

    • 通过 new Thread(...).start(),主线程瞬间解脱,可以继续去 accept 新的连接,实现了并发

  3. 资源隔离

    • 每个子线程拥有自己独立的 Socket 对象和 InputStream。线程 A 读取的是客户端 A 的数据,不会干扰线程 B。


常见问题

粘包问题的再思考

在“多发”场景下,粘包问题会更加明显。

  • 场景:客户端快速发送 "Hello" 和 "World"。

  • 现象:服务端可能一次读到 "HelloWorld",或者分三次读到 "He", "llo", "World"。

  • 解决建议:在实际项目中,通常会在消息末尾添加换行符 \n,并使用 BufferedReader.readLine() 代替 InputStream.read(),或者自定义协议头(包含消息长度)。

线程池优化

上面的代码为每个客户端创建一个 new Thread()。如果并发量极大(如 1 万个客户端),创建 1 万个线程会导致内存溢出或 CPU 上下文切换过载。

  • 优化方案:使用 ExecutorService 线程池。

// 伪代码示例
ExecutorService pool = Executors.newFixedThreadPool(50); // 限制最大线程数为 50

while (true) {
    Socket clientSocket = serverSocket.accept();
    pool.submit(() -> handleClient(clientSocket)); // 提交任务给线程池
}

优雅停机

当前的 while(true) 很难手动停止服务端。

  • 建议:可以设置一个标志位,或者捕获 SIGINT 信号来关闭 ServerSocket,从而跳出循环并关闭线程池。

双向通信

目前的示例是“客户端发,服务端收”。如果要实现类似微信的聊天:

  • 客户端线程也需要开启一个子线程专门接收服务端的回复。

  • 服务端在 handleClient 中,读完消息处理后,也要通过 clientSocket.getOutputStream() 写回数据。

  • 此时,一个连接需要两个线程(或一个线程内的两个循环)分别处理读和写,或者使用 NIO (Non-blocking IO) 模型。

通过引入死循环实现持续通信,通过多线程实现高并发,这是 Java TCP 网络编程从“玩具”走向“实用”的关键一步。