소스 검색

feat:客户端使用selector,并实现服务端向客户端广播

yang yi 2 달 전
부모
커밋
639c9d5aee

+ 9 - 0
chat-gwng/chat-client/src/main/java/space/anyi/chatClient/Client.java

@@ -3,6 +3,8 @@ package space.anyi.chatClient;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.Scanner;
@@ -24,6 +26,12 @@ public class Client {
         socketChannel.configureBlocking(false);
         //写
         socketChannel.write(ByteBuffer.wrap("Hello World!".getBytes(StandardCharsets.UTF_8)));
+        //读
+        Selector selector = Selector.open();
+        socketChannel.register(selector, SelectionKey.OP_READ);
+        ReadHandler readHandler = new ReadHandler(selector);
+        new Thread(readHandler).start();
+
         Scanner scanner = new Scanner(System.in);
         while(true){
             String line = scanner.nextLine();
@@ -33,6 +41,7 @@ public class Client {
             socketChannel.write(ByteBuffer.wrap(line.getBytes(StandardCharsets.UTF_8)));
         }
         //释放资源
+        readHandler.exit();
         socketChannel.close();
     }
 }

+ 96 - 0
chat-gwng/chat-client/src/main/java/space/anyi/chatClient/ReadHandler.java

@@ -0,0 +1,96 @@
+package space.anyi.chatClient;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * @ProjectName: chat-gwng
+ * @FileName: ReadHandler
+ * @Author: 杨逸
+ * @Data:2025/9/22 19:20
+ * @Description:
+ */
+public class ReadHandler implements Runnable, Closeable {
+    private Selector readSelector;
+    private boolean isExit = false;
+
+    public ReadHandler(Selector readSelector) {
+        this.readSelector = readSelector;
+    }
+
+    @Override
+    public void close() throws IOException {
+        exit();
+        System.out.println("close readHandler");
+        if (Objects.nonNull(readSelector) && readSelector.isOpen()) {
+            readSelector.close();
+        }
+    }
+
+    public void exit() {
+        isExit = true;
+    }
+
+    @Override
+    public void run() {
+        System.out.println("readHandler starter");
+        while(true){
+            if (isExit)break;
+            int select = 0;
+            try {
+                select = readSelector.select(100);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            if (select > 0) {
+                Set<SelectionKey> selectionKeys = readSelector.selectedKeys();
+                for (SelectionKey selectionKey : selectionKeys) {
+                    if (selectionKey.isReadable() && selectionKey.isValid()) {
+                        //拿到channel
+                        SelectableChannel channel = selectionKey.channel();
+                        SocketChannel socketChannel = (SocketChannel) channel;
+                        //读取数据
+                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+                        int len = 0;
+                        try {
+                            len = socketChannel.read(byteBuffer);
+                        } catch (SocketException e) {
+                            //连接意外中断导致异常
+                            e.printStackTrace();
+                            selectionKey.cancel();
+                            try {
+                                //关闭相关的channel
+                                socketChannel.close();
+                            } catch (IOException ex) {
+                                ex.printStackTrace();
+                            }
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        byteBuffer.flip();
+                        if (len > 0) {
+                            String msg = new String(byteBuffer.array(), 0, len);
+                            System.out.println(msg);
+                        }
+                    }
+                    //处理完,移除selectionKey
+                    selectionKeys.remove(selectionKey);
+                }
+            }
+        }
+        //关闭资源
+        try {
+            close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 54 - 0
chat-gwng/chat-server/src/main/java/space/anyi/chatServer/ChatServer.java

@@ -5,10 +5,12 @@ import lombok.Data;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.Set;
 
@@ -59,6 +61,13 @@ public class ChatServer implements Runnable, Closeable {
         }
     }
 
+    /**
+     * @throws IOException
+     * @description: 服务端监听客户端连接
+     * @author: 杨逸
+     * @data:2025/09/22 19:24:34
+     * @since 1.0.0
+     */
     private void listen() throws IOException {
         //注册连接事件
         serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
@@ -86,6 +95,51 @@ public class ChatServer implements Runnable, Closeable {
         this.close();
     }
 
+    /**
+     * @param msg 信息
+     * @description: 服务端广播信息到客户端
+     * @author: 杨逸
+     * @data:2025/09/22 19:25:58
+     * @since 1.0.0
+     */
+    public void broadcast(String msg){
+        //获取所有的key
+        Set<SelectionKey> keys = readSelector.keys();
+        for (SelectionKey key : keys) {
+            SocketChannel channel = (SocketChannel) key.channel();
+            try {
+                //发信息
+                channel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * @param msg 信息
+     * @param target 目标
+     * @description: 发送私聊信息
+     * @author: 杨逸
+     * @data:2025/09/22 19:31:09
+     * @since 1.0.0
+     */
+    public void sendMessageWithPrivate(String msg,String target){
+        //todo
+    }
+
+    /**
+     * @param msg 信息
+     * @param group 群
+     * @description: 群发信息
+     * @author: 杨逸
+     * @data:2025/09/22 19:32:08
+     * @since 1.0.0
+     */
+    public void sendMessageWithGroup(String msg,String group){
+        //todo
+    }
+
     @Override
     public void close() throws IOException {
         exit();

+ 13 - 1
chat-gwng/chat-server/src/main/java/space/anyi/chatServer/ReadHandler.java

@@ -4,6 +4,7 @@ import lombok.Data;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
@@ -37,7 +38,7 @@ public class ReadHandler implements Runnable, Closeable {
         }
     }
 
-    private void exit() {
+    public void exit() {
         isExit = true;
     }
 
@@ -64,6 +65,16 @@ public class ReadHandler implements Runnable, Closeable {
                         int len = 0;
                         try {
                             len = socketChannel.read(byteBuffer);
+                        } catch (SocketException e) {
+                            //连接意外中断导致异常
+                            e.printStackTrace();
+                            selectionKey.cancel();
+                            try {
+                                //关闭相关的channel
+                                socketChannel.close();
+                            } catch (IOException ex) {
+                                ex.printStackTrace();
+                            }
                         } catch (IOException e) {
                             e.printStackTrace();
                         }
@@ -78,6 +89,7 @@ public class ReadHandler implements Runnable, Closeable {
                 }
             }
         }
+        //关闭资源
         try {
             close();
         } catch (IOException e) {

+ 7 - 0
chat-gwng/chat-server/src/main/java/space/anyi/chatServer/Server.java

@@ -1,7 +1,12 @@
 package space.anyi.chatServer;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
 import java.util.Scanner;
+import java.util.Set;
 
 /**
  * @ProjectName: chat-gwng
@@ -23,6 +28,8 @@ public class Server {
                 System.out.println("服务器关闭");
                 break;
             }
+            //服务端广播信息
+            chatServer.broadcast("服务端广播:" + msg);
         }
     }
 }