目标
- 实现okhttp请求阿里云流式接口数据,然后以sse格式返回前端;
集成依赖
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-sse</artifactId>
<version>4.10.0</version>
</dependency>
controller代码
final ScheduledExecutorService timeoutScheduler = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
@GetMapping(value = "/chat")
public SseEmitter licenseInformation(@RequestParam String msg) throws Exception {
//解决中文乱码
SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(30000L);
//接入定时器,定时进行超时返回
timeoutScheduler.schedule(() -> {
try {
sseEmitter.send("服务器响应超时,请稍后再试~", org.springframework.http.MediaType.APPLICATION_JSON);
sseEmitter.complete();
} catch (Exception e) {
}
}, 30000 - 3000, TimeUnit.MILLISECONDS);
Request request = new Request.Builder()
.post(body)
.url(url)
.build();
EventSource.Factory factory = EventSources.createFactory(okhttp);
// 自定义监听器
EventSourceListener eventSourceListener = new ConsoleEventSourceListener(sseEmitter);
// 创建事件
EventSource eventSource = factory.newEventSource(request, eventSourceListener);
return sseEmitter;
}
class SseEmitterUTF8 extends SseEmitter {
public SseEmitterUTF8(Long timeout) {
super(timeout);
}
@Override
protected void extendResponse(ServerHttpResponse outputMessage) {
super.extendResponse(outputMessage);
HttpHeaders headers = outputMessage.getHeaders();
headers.setContentType(new org.springframework.http.MediaType(org.springframework.http.MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
}
}
//监听器
class ConsoleEventSourceListener extends EventSourceListener {
private SseEmitter sseEmitter;
public ConsoleEventSourceListener(SseEmitter sseEmitter) {
this.sseEmitter = sseEmitter;
}
@Override
public void onOpen(EventSource eventSource, Response response) {
System.out.println("连接成功");
}
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
sendUtil(data, sseEmitter);
}
@Override
public void onClosed(EventSource eventSource) {
sendUtil("关闭连接...", sseEmitter);
sseEmitter.complete();
eventSource.cancel();
}
@SneakyThrows
@Override
public void onFailure(EventSource eventSource, Throwable t, Response response) {
if (Objects.isNull(response)) {
sendUtil("异常->无返回", sseEmitter);
sseEmitter.complete();
eventSource.cancel();
return;
}
ResponseBody body = response.body();
if (Objects.nonNull(body)) {
sendUtil("异常->无内容", sseEmitter);
sseEmitter.complete();
} else {
sendUtil(body.string(), sseEmitter);
sseEmitter.complete();
}
eventSource.cancel();
}
}
踩坑
- 当使用到Nginx的时候,前端无法得到实时的流式数据,而是等待全部返回
- 解决办法就是加入配置 proxy_buffering off;
本文由 GY 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
2024/09/26 11:09