Ver código fonte

feat:服务端改为使用两个selector,一个用于accept,一个用于read,实现客户到服务的数据传输

yang yi 2 meses atrás
pai
commit
7b72b81c06

+ 11 - 6
chat-gwng/chat-client/src/main/java/space/anyi/chatClient/Client.java

@@ -5,6 +5,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
+import java.util.Scanner;
 
 /**
  * @ProjectName: chat-gwng
@@ -14,19 +15,23 @@ import java.nio.charset.StandardCharsets;
  * @Description:
  */
 public class Client {
-    public static void main(String[] args) throws IOException, InterruptedException {
+    public static void main(String[] args) throws IOException {
         String host = "localhost";
         int port = 8000;
         InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);
         //创建连接
         SocketChannel socketChannel = SocketChannel.open(inetSocketAddress);
+        socketChannel.configureBlocking(false);
         //写
         socketChannel.write(ByteBuffer.wrap("Hello World!".getBytes(StandardCharsets.UTF_8)));
-        //读
-        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
-        int len = socketChannel.read(byteBuffer);
-        byte[] array = byteBuffer.array();
-        System.out.println(new String(array, 0, len));
+        Scanner scanner = new Scanner(System.in);
+        while(true){
+            String line = scanner.nextLine();
+            if ("exit".equals(line)){
+                break;
+            }
+            socketChannel.write(ByteBuffer.wrap(line.getBytes(StandardCharsets.UTF_8)));
+        }
         //释放资源
         socketChannel.close();
     }

+ 7 - 0
chat-gwng/chat-commom/pom.xml

@@ -15,5 +15,12 @@
         <maven.compiler.source>17</maven.compiler.source>
         <maven.compiler.target>17</maven.compiler.target>
     </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.32</version>
+        </dependency>
+    </dependencies>
 
 </project>

+ 110 - 0
chat-gwng/chat-server/src/main/java/space/anyi/chatServer/ChatServer.java

@@ -0,0 +1,110 @@
+package space.anyi.chatServer;
+
+import lombok.Data;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * @ProjectName: chat-gwng
+ * @FileName: ChatServer
+ * @Author: 杨逸
+ * @Data:2025/9/22 15:45
+ * @Description:
+ */
+@Data
+public class ChatServer implements Runnable, Closeable {
+    private InetSocketAddress inetSocketAddress;
+    private ServerSocketChannel serverSocketChannel;
+    private Selector acceptSelector;
+    private Selector readSelector;
+    private ReadHandler readHandler;
+    private boolean isExit = false;
+
+    public ChatServer(int port) throws IOException {
+        System.out.println("server init");
+        //配置服务监听的端口
+        this.inetSocketAddress = new InetSocketAddress(port);
+        this.serverSocketChannel = ServerSocketChannel.open().bind(inetSocketAddress);
+        this.serverSocketChannel.configureBlocking(false);
+        //创建一个监听连接事件的选择器
+        this.acceptSelector = Selector.open();
+        //创建一个监听读事件的选择器
+        this.readSelector = Selector.open();
+        //自定义读事件处理器,用处理客户端发送的消息
+        this.readHandler = new ReadHandler(readSelector);
+    }
+
+
+    @Override
+    public void run() {
+        System.out.println("server listen...");
+        try {
+            listen();
+        } catch (IOException e) {
+            e.printStackTrace();
+            System.err.println("server bad by error");
+            try {
+                this.close();
+            } catch (IOException ex) {
+                ex.printStackTrace();
+            }
+        }
+    }
+
+    private void listen() throws IOException {
+        //注册连接事件
+        serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
+        //启动读事件处理器
+        new Thread(readHandler).start();
+        while(true){
+            if (isExit)break;
+            int select = acceptSelector.select(100);
+            if(select > 0){
+                Set<SelectionKey> selectionKeys = acceptSelector.selectedKeys();
+                for (SelectionKey selectionKey : selectionKeys) {
+                    if (selectionKey.isAcceptable()) {
+                        //有客户端连接
+                        SocketChannel socketChannel = serverSocketChannel.accept();
+                        System.out.println("client connect");
+                        socketChannel.configureBlocking(false);
+                        //连接channel注册到读事件处理器
+                        socketChannel.register(readSelector,SelectionKey.OP_READ);
+                    }
+                    //移除selectionKey,防止重复处理
+                    selectionKeys.remove(selectionKey);
+                }
+            }
+        }
+        this.close();
+    }
+
+    @Override
+    public void close() throws IOException {
+        exit();
+        System.out.println("close chatServer");
+        if (Objects.nonNull(acceptSelector) && acceptSelector.isOpen()) {
+            acceptSelector.close();
+        }
+        if (Objects.nonNull(serverSocketChannel) && serverSocketChannel.isOpen()) {
+            serverSocketChannel.close();
+        }
+        if (Objects.nonNull(readSelector) && readSelector.isOpen()) {
+            readSelector.close();
+        }
+        if (Objects.nonNull(readHandler)) {
+            readHandler.close();
+        }
+    }
+
+    public void exit(){
+        this.isExit = true;
+    }
+}

+ 87 - 0
chat-gwng/chat-server/src/main/java/space/anyi/chatServer/ReadHandler.java

@@ -0,0 +1,87 @@
+package space.anyi.chatServer;
+
+import lombok.Data;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * @ProjectName: chat-gwng
+ * @FileName: ReadHandler
+ * @Author: 杨逸
+ * @Data:2025/9/22 16:18
+ * @Description: 专门读取数据的处理器
+ */
+@Data
+public class ReadHandler implements Runnable, Closeable {
+    private Selector readSelector;
+    private boolean isExit = false;
+
+    public ReadHandler(Selector readSelector) {
+        this.readSelector = readSelector;
+    }
+
+    @Override
+    public void close() throws IOException {
+        exit();
+        System.out.println("close readHandler");
+        if (Objects.nonNull(readSelector) && readSelector.isOpen()) {
+            readSelector.close();
+        }
+    }
+
+    private void exit() {
+        isExit = true;
+    }
+
+    @Override
+    public void run() {
+        System.out.println("readHandler starter");
+        while(true){
+            if (isExit)break;
+            int select = 0;
+            try {
+                select = readSelector.select(100);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            if (select > 0) {
+                Set<SelectionKey> selectionKeys = readSelector.selectedKeys();
+                for (SelectionKey selectionKey : selectionKeys) {
+                    if (selectionKey.isReadable() && selectionKey.isValid()) {
+                        //拿到channel
+                        SelectableChannel channel = selectionKey.channel();
+                        SocketChannel socketChannel = (SocketChannel) channel;
+                        //读取数据
+                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+                        int len = 0;
+                        try {
+                            len = socketChannel.read(byteBuffer);
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        byteBuffer.flip();
+                        if (len > 0) {
+                            String msg = new String(byteBuffer.array(), 0, len);
+                            System.out.println(msg);
+                        }
+                    }
+                    //处理完,移除selectionKey
+                    selectionKeys.remove(selectionKey);
+                }
+            }
+        }
+        try {
+            close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 13 - 24
chat-gwng/chat-server/src/main/java/space/anyi/chatServer/Server.java

@@ -1,11 +1,7 @@
 package space.anyi.chatServer;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
+import java.util.Scanner;
 
 /**
  * @ProjectName: chat-gwng
@@ -16,24 +12,17 @@ import java.nio.charset.StandardCharsets;
  */
 public class Server {
     public static void main(String[] args) throws IOException, InterruptedException {
-        int port = 8000;
-        InetSocketAddress inetSocketAddress = new InetSocketAddress(port);
-        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        //监听
-        serverSocketChannel.bind(inetSocketAddress);
-
-        //创建连接
-        SocketChannel socketChannel = serverSocketChannel.accept();
-
-        //读
-        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
-        int len = socketChannel.read(byteBuffer);
-        System.out.println(new String(byteBuffer.array(),0,len));
-        //写
-        ByteBuffer wrap = ByteBuffer.wrap("server send message".getBytes(StandardCharsets.UTF_8));
-        socketChannel.write(wrap);
-        //释放资源
-        socketChannel.close();
-        serverSocketChannel.close();
+        ChatServer chatServer = new ChatServer(8000);
+        Thread thread = new Thread(chatServer);
+        thread.start();
+        Scanner scanner = new Scanner(System.in);
+        while(true){
+            String msg = scanner.nextLine();
+            if ("exit".equals(msg)){
+                chatServer.exit();
+                System.out.println("服务器关闭");
+                break;
+            }
+        }
     }
 }