socket.io的集群使用

/ 后端 / 没有评论 / 351浏览

一. 服务器篇

1.引入maven依赖,编写配置信息

由于是集群启动,则需要引入redisson,这个是官方建议,也可以自己实现分发消息

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.19.3</version>
        </dependency>
        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>1.7.23</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.15</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

由于需要启动两个服务,则需要指定不同的server端口和socket端口

8080

server:
  port: 8080

socketio:
  host: 127.0.0.1
  port: 18888
  maxFramePayloadLength: 1048576
  maxHttpContentLength: 1048576
  bossCount: 1
  workCount: 100
  allowCustomRequests: true
  upgradeTimeout: 1000000
  pingTimeout: 60000
  pingInterval: 25000

redisson:
  hostname: 127.0.0.1
  password: 123123
  port: 6379

8081

server:
  port: 8081

socketio:
  host: 127.0.0.1
  port: 18887
  maxFramePayloadLength: 1048576
  maxHttpContentLength: 1048576
  bossCount: 1
  workCount: 100
  allowCustomRequests: true
  upgradeTimeout: 1000000
  pingTimeout: 60000
  pingInterval: 25000

redisson:
  hostname: 127.0.0.1
  password: 123123
  port: 6379

2.编写配置类

@Configuration
public class SocketIOConfig {

    @Value("${socketio.host}")
    private String host;

    @Value("${socketio.port}")
    private Integer port;

    @Value("${socketio.bossCount}")
    private int bossCount;

    @Value("${socketio.workCount}")
    private int workCount;

    @Value("${socketio.allowCustomRequests}")
    private boolean allowCustomRequests;

    @Value("${socketio.upgradeTimeout}")
    private int upgradeTimeout;

    @Value("${socketio.pingTimeout}")
    private int pingTimeout;

    @Value("${socketio.pingInterval}")
    private int pingInterval;

    @Value("${redisson.hostname}")
    private String redissonHostname;

    @Value("${redisson.password}")
    private String redissonPassword;

    @Value("${redisson.port}")
    private String redissonPort;

    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        config.setCodec(new JsonJacksonCodec());
        config.useSingleServer().
                setDatabase(10).
                setAddress("redis://" + redissonHostname + ":" + redissonPort).
                setPassword(redissonPassword).
                setRetryInterval(5000).
                setTimeout(10000).
                setConnectionMinimumIdleSize(8).
                setConnectionPoolSize(16).
                setConnectTimeout(10000);
        return Redisson.create(config);
    }

    @ConditionalOnBean(RedissonClient.class)
    @Bean
    public SocketIOServer socketIOServer(RedissonClient redissonClient) {
        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setTcpNoDelay(true);
        socketConfig.setSoLinger(0);
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        config.setSocketConfig(socketConfig);
        config.setHostname(host);
        config.setPort(port);
        config.setBossThreads(bossCount);
        config.setWorkerThreads(workCount);
        config.setAllowCustomRequests(allowCustomRequests);
        config.setUpgradeTimeout(upgradeTimeout);
        config.setPingTimeout(pingTimeout);
        config.setPingInterval(pingInterval);

        // 校验token的合法性,实际业务需要校验token是否过期
        // 如果认证不通过会返回一个 Socket.EVENT_CONNECT_ERROR事件
        // 配合前端CONNECT_ERROR事件,避免前端循环请求浪费资源
        config.setAuthorizationListener(data -> {
            String uid = data.getSingleUrlParam("UID");
            if (!"1".equals(uid)) {
                return false;
            }
            return true;
        });

        // 服务端集群推荐使用redisson
        RedissonStoreFactory redissonStoreFactory = new RedissonStoreFactory((Redisson) redissonClient);
        config.setStoreFactory(redissonStoreFactory);

        //异常处理-默认打印错误日志
        config.setExceptionListener(new DefaultExceptionListener());

        return new SocketIOServer(config);
    }


    //用于扫描netty-socketIo的注解,比如 @OnConnect、@OnEvent
    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer server) {
        return new SpringAnnotationScanner(server);
    }

    //事件管理
    //@Bean
    public PubSubStore pubSubStore(SocketIOServer socketServer) {
        return socketServer.getConfiguration().getStoreFactory().pubSubStore();
    }
}

3.编写监听及发布业务类

@Service
public class SocketServerAnnoDemo {
    
    //本地保存用户id对应的session连接
    private static Map<String, SocketIOClient> clientMap = new ConcurrentHashMap<>();

    @Autowired
    private SocketIOServer socketIOServer;
    
    //回调启动socket服务
    @PostConstruct
    public void start() {
        socketIOServer.start();
        System.out.println("socket.io初始化服务完成");
    }

    //回调关闭socket服务
    @PreDestroy
    public void stop() {
        if (socketIOServer != null) {
            socketIOServer.stop();
            socketIOServer = null;
        }
    }

    //监听socket服务新连接
    @OnConnect
    public void onConnect(SocketIOClient client) {
        String uid = getParamsByClient(client);
        if (uid != null) {
            clientMap.put(uid, client);
            System.out.println("有新的客户端连接UID:" + uid);
            //加入房间
            client.joinRoom("im");
        }
    }

    //监听socket服务连接段开
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        String uid = getParamsByClient(client);
        if (uid != null) {
            clientMap.remove(uid);
            client.disconnect();
            System.out.println("一条客户端连接中断");
        }
    }

    //监听客户端传来信息,字符串类型接收
    @OnEvent(value = "ServerReceiveStr")
    public void onEvent(SocketIOClient client, AckRequest request, String data) throws InterruptedException {
        System.out.println("client发来的消息:" + data);
    }

    //监听客户端传来信息,json类型接收
    @OnEvent(value = "ServerReceive")
    public void onEvent2(SocketIOClient client, AckRequest request, JSONObject data) {
        String uid = getParamsByClient(client);
        String ip = getIpByClient(client);
        if (uid != null) {
            System.out.println("client发来的消息:" + data.toString());
        }
    }

    //获取到客户端连接的参数
    private String getParamsByClient(SocketIOClient client) {
        // 从请求的连接中拿出参数(这里的sid必须是唯一标识)
        Map<String, List<String>> params = client.getHandshakeData().getUrlParams();
        List<String> list = params.get("UID");
        if (list != null && list.size() > 0) {
            return list.get(0);
        }
        return null;
    }

    //获取到客户端连接的ip
    private String getIpByClient(SocketIOClient client) {
        String sa = client.getRemoteAddress().toString();
        return sa.substring(1, sa.indexOf(":"));
    }
}

群发消息

@RestController
public class WebDemo {

    @Autowired
    private SocketIOServer socketIOServer;

    @GetMapping("test")
    public void t(){
        JSONObject jsonObjectNew = new JSONObject();
        jsonObjectNew.put("name", "goat");
        jsonObjectNew.put("message", "server群发信息");
       socketIOServer.getRoomOperations("im").sendEvent("ClientReceive",jsonObjectNew);
    }
}

二.客户端篇

由于是集群测试,也是两个客户端,连接不同的socket服务;

index18887.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>客户端</title>
</head>
<body>
<textarea id="area111" style="height: 500px;width: 300px"></textarea>
<textarea id="area222" style="height: 500px;width: 300px"></textarea>
<div>
    <button id="btn">发送</button>
</div>

</body>
<script src="socket.io.js"></script>
<script type="text/javascript">
    const serverUri = "http://127.0.0.1:18887";
    const sendEvent = "ServerReceive";
    const sendEventStr = "ServerReceiveStr";
    const receiveEvent = "ClientReceive";

    var socket;

    //连接id,进行验证
    connect(1);

    function connect(uid) {
        socket = io.connect(serverUri, {
            'force new connection': true,
            'query': 'UID=' + uid
        });
        socket.on('connect', function () {
            console.log("连接成功");
            send({
                name: "client",
                message: "hello Server"
            });
        });
        socket.on(receiveEvent, function (data) {
            document.getElementById("area222").innerText = document.getElementById("area222").value + data.message;
        });
        socket.on('disconnect', function () {
            console.log("连接断开");
        });
        socket.on('connect_error',function (){
            alert("没有权限");
            socket.close();
        })
    }

    function send(data) {
        socket.emit(sendEvent, data);
    }

    let areaaaa = document.getElementById("area111");
    document.getElementById("btn").onclick = function () {
        socket.emit(sendEventStr, areaaaa.value);
        areaaaa.value = ""
    };
</script>
</html>

index18888.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>客户端</title>
</head>
<body>
<textarea id="area111" style="height: 500px;width: 300px"></textarea>
<textarea id="area222" style="height: 500px;width: 300px"></textarea>
<div>
    <button id="btn">发送</button>
</div>

</body>
<script src="socket.io.js"></script>
<script type="text/javascript">
    const serverUri = "http://127.0.0.1:18888";
    const sendEvent = "ServerReceive";
    const sendEventStr = "ServerReceiveStr";
    const receiveEvent = "ClientReceive";

    var socket;

    //连接id,进行验证
    connect(1);

    function connect(uid) {
        socket = io.connect(serverUri, {
            'force new connection': true,
            'query': 'UID=' + uid
        });
        socket.on('connect', function () {
            console.log("连接成功");
            send({
                name: "client",
                message: "hello Server"
            });
        });
        socket.on(receiveEvent, function (data) {
            document.getElementById("area222").innerText = document.getElementById("area222").value + data.message;
        });
        socket.on('disconnect', function () {
            console.log("连接断开");
        });
        socket.on('connect_error',function (){
            alert("没有权限");
            socket.close();
        })
    }

    function send(data) {
        socket.emit(sendEvent, data);
    }

    let areaaaa = document.getElementById("area111");
    document.getElementById("btn").onclick = function () {
        socket.emit(sendEventStr, areaaaa.value);
        areaaaa.value = ""
    };
</script>
</html>

三.测试篇

请求lcalhost:8080/test或lcalhost:8081/test任意一台服务器进行群发消息,两台客户端皆能够收到消息;