一. 服务器篇
1.引入maven依赖,编写配置信息
由于是集群启动,则需要引入redisson,这个是官方建议,也可以自己实现分发消息
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.19.3</version>
</dependency>
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.23</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.15</version>
<scope>compile</scope>
</dependency>
</dependencies>
由于需要启动两个服务,则需要指定不同的server端口和socket端口
8080
server:
port: 8080
socketio:
host: 127.0.0.1
port: 18888
maxFramePayloadLength: 1048576
maxHttpContentLength: 1048576
bossCount: 1
workCount: 100
allowCustomRequests: true
upgradeTimeout: 1000000
pingTimeout: 60000
pingInterval: 25000
redisson:
hostname: 127.0.0.1
password: 123123
port: 6379
8081
server:
port: 8081
socketio:
host: 127.0.0.1
port: 18887
maxFramePayloadLength: 1048576
maxHttpContentLength: 1048576
bossCount: 1
workCount: 100
allowCustomRequests: true
upgradeTimeout: 1000000
pingTimeout: 60000
pingInterval: 25000
redisson:
hostname: 127.0.0.1
password: 123123
port: 6379
2.编写配置类
@Configuration
public class SocketIOConfig {
@Value("${socketio.host}")
private String host;
@Value("${socketio.port}")
private Integer port;
@Value("${socketio.bossCount}")
private int bossCount;
@Value("${socketio.workCount}")
private int workCount;
@Value("${socketio.allowCustomRequests}")
private boolean allowCustomRequests;
@Value("${socketio.upgradeTimeout}")
private int upgradeTimeout;
@Value("${socketio.pingTimeout}")
private int pingTimeout;
@Value("${socketio.pingInterval}")
private int pingInterval;
@Value("${redisson.hostname}")
private String redissonHostname;
@Value("${redisson.password}")
private String redissonPassword;
@Value("${redisson.port}")
private String redissonPort;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.setCodec(new JsonJacksonCodec());
config.useSingleServer().
setDatabase(10).
setAddress("redis://" + redissonHostname + ":" + redissonPort).
setPassword(redissonPassword).
setRetryInterval(5000).
setTimeout(10000).
setConnectionMinimumIdleSize(8).
setConnectionPoolSize(16).
setConnectTimeout(10000);
return Redisson.create(config);
}
@ConditionalOnBean(RedissonClient.class)
@Bean
public SocketIOServer socketIOServer(RedissonClient redissonClient) {
SocketConfig socketConfig = new SocketConfig();
socketConfig.setTcpNoDelay(true);
socketConfig.setSoLinger(0);
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
config.setSocketConfig(socketConfig);
config.setHostname(host);
config.setPort(port);
config.setBossThreads(bossCount);
config.setWorkerThreads(workCount);
config.setAllowCustomRequests(allowCustomRequests);
config.setUpgradeTimeout(upgradeTimeout);
config.setPingTimeout(pingTimeout);
config.setPingInterval(pingInterval);
// 校验token的合法性,实际业务需要校验token是否过期
// 如果认证不通过会返回一个 Socket.EVENT_CONNECT_ERROR事件
// 配合前端CONNECT_ERROR事件,避免前端循环请求浪费资源
config.setAuthorizationListener(data -> {
String uid = data.getSingleUrlParam("UID");
if (!"1".equals(uid)) {
return false;
}
return true;
});
// 服务端集群推荐使用redisson
RedissonStoreFactory redissonStoreFactory = new RedissonStoreFactory((Redisson) redissonClient);
config.setStoreFactory(redissonStoreFactory);
//异常处理-默认打印错误日志
config.setExceptionListener(new DefaultExceptionListener());
return new SocketIOServer(config);
}
//用于扫描netty-socketIo的注解,比如 @OnConnect、@OnEvent
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer server) {
return new SpringAnnotationScanner(server);
}
//事件管理
//@Bean
public PubSubStore pubSubStore(SocketIOServer socketServer) {
return socketServer.getConfiguration().getStoreFactory().pubSubStore();
}
}
3.编写监听及发布业务类
@Service
public class SocketServerAnnoDemo {
//本地保存用户id对应的session连接
private static Map<String, SocketIOClient> clientMap = new ConcurrentHashMap<>();
@Autowired
private SocketIOServer socketIOServer;
//回调启动socket服务
@PostConstruct
public void start() {
socketIOServer.start();
System.out.println("socket.io初始化服务完成");
}
//回调关闭socket服务
@PreDestroy
public void stop() {
if (socketIOServer != null) {
socketIOServer.stop();
socketIOServer = null;
}
}
//监听socket服务新连接
@OnConnect
public void onConnect(SocketIOClient client) {
String uid = getParamsByClient(client);
if (uid != null) {
clientMap.put(uid, client);
System.out.println("有新的客户端连接UID:" + uid);
//加入房间
client.joinRoom("im");
}
}
//监听socket服务连接段开
@OnDisconnect
public void onDisconnect(SocketIOClient client) {
String uid = getParamsByClient(client);
if (uid != null) {
clientMap.remove(uid);
client.disconnect();
System.out.println("一条客户端连接中断");
}
}
//监听客户端传来信息,字符串类型接收
@OnEvent(value = "ServerReceiveStr")
public void onEvent(SocketIOClient client, AckRequest request, String data) throws InterruptedException {
System.out.println("client发来的消息:" + data);
}
//监听客户端传来信息,json类型接收
@OnEvent(value = "ServerReceive")
public void onEvent2(SocketIOClient client, AckRequest request, JSONObject data) {
String uid = getParamsByClient(client);
String ip = getIpByClient(client);
if (uid != null) {
System.out.println("client发来的消息:" + data.toString());
}
}
//获取到客户端连接的参数
private String getParamsByClient(SocketIOClient client) {
// 从请求的连接中拿出参数(这里的sid必须是唯一标识)
Map<String, List<String>> params = client.getHandshakeData().getUrlParams();
List<String> list = params.get("UID");
if (list != null && list.size() > 0) {
return list.get(0);
}
return null;
}
//获取到客户端连接的ip
private String getIpByClient(SocketIOClient client) {
String sa = client.getRemoteAddress().toString();
return sa.substring(1, sa.indexOf(":"));
}
}
群发消息
@RestController
public class WebDemo {
@Autowired
private SocketIOServer socketIOServer;
@GetMapping("test")
public void t(){
JSONObject jsonObjectNew = new JSONObject();
jsonObjectNew.put("name", "goat");
jsonObjectNew.put("message", "server群发信息");
socketIOServer.getRoomOperations("im").sendEvent("ClientReceive",jsonObjectNew);
}
}
二.客户端篇
由于是集群测试,也是两个客户端,连接不同的socket服务;
index18887.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>客户端</title>
</head>
<body>
<textarea id="area111" style="height: 500px;width: 300px"></textarea>
<textarea id="area222" style="height: 500px;width: 300px"></textarea>
<div>
<button id="btn">发送</button>
</div>
</body>
<script src="socket.io.js"></script>
<script type="text/javascript">
const serverUri = "http://127.0.0.1:18887";
const sendEvent = "ServerReceive";
const sendEventStr = "ServerReceiveStr";
const receiveEvent = "ClientReceive";
var socket;
//连接id,进行验证
connect(1);
function connect(uid) {
socket = io.connect(serverUri, {
'force new connection': true,
'query': 'UID=' + uid
});
socket.on('connect', function () {
console.log("连接成功");
send({
name: "client",
message: "hello Server"
});
});
socket.on(receiveEvent, function (data) {
document.getElementById("area222").innerText = document.getElementById("area222").value + data.message;
});
socket.on('disconnect', function () {
console.log("连接断开");
});
socket.on('connect_error',function (){
alert("没有权限");
socket.close();
})
}
function send(data) {
socket.emit(sendEvent, data);
}
let areaaaa = document.getElementById("area111");
document.getElementById("btn").onclick = function () {
socket.emit(sendEventStr, areaaaa.value);
areaaaa.value = ""
};
</script>
</html>
index18888.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>客户端</title>
</head>
<body>
<textarea id="area111" style="height: 500px;width: 300px"></textarea>
<textarea id="area222" style="height: 500px;width: 300px"></textarea>
<div>
<button id="btn">发送</button>
</div>
</body>
<script src="socket.io.js"></script>
<script type="text/javascript">
const serverUri = "http://127.0.0.1:18888";
const sendEvent = "ServerReceive";
const sendEventStr = "ServerReceiveStr";
const receiveEvent = "ClientReceive";
var socket;
//连接id,进行验证
connect(1);
function connect(uid) {
socket = io.connect(serverUri, {
'force new connection': true,
'query': 'UID=' + uid
});
socket.on('connect', function () {
console.log("连接成功");
send({
name: "client",
message: "hello Server"
});
});
socket.on(receiveEvent, function (data) {
document.getElementById("area222").innerText = document.getElementById("area222").value + data.message;
});
socket.on('disconnect', function () {
console.log("连接断开");
});
socket.on('connect_error',function (){
alert("没有权限");
socket.close();
})
}
function send(data) {
socket.emit(sendEvent, data);
}
let areaaaa = document.getElementById("area111");
document.getElementById("btn").onclick = function () {
socket.emit(sendEventStr, areaaaa.value);
areaaaa.value = ""
};
</script>
</html>
三.测试篇
请求lcalhost:8080/test或lcalhost:8081/test任意一台服务器进行群发消息,两台客户端皆能够收到消息;
本文由 GY 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
2023/02/17 07:54