百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

Spring WebFlux基于反应式WebSocket的应用

off999 2025-03-19 01:02 36 浏览 0 评论

环境:Springboot2.4.13


WebSocket介绍

WebSocket协议RFC 6455提供了一种标准化的方式,通过一个TCP连接在客户端和服务器之间建立全双工、双向的通信通道。它是一个不同于HTTP的TCP协议,但设计为在HTTP之上工作,使用80和443端口,并允许重用现有的防火墙规则。

WebSocket交互开始于一个HTTP请求,使用HTTP Upgrade Header进行升级,在本例中是切换到WebSocket协议。下面的例子展示了这种交互:

GET /spring-websocket-portfolio/portfolio HTTP/1.1

Host: localhost:8080

Upgrade: websocket // The Upgrade header.
Connection: Upgrade // Using the Upgrade connection.

Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==

Sec-WebSocket-Protocol: v10.stomp, v11.stomp

Sec-WebSocket-Version: 13 Origin: http://localhost:8080

支持WebSocket的服务器会返回类似下面的输出,而不是通常的200状态码:

HTTP/1.1 101 Switching Protocols

Upgrade: websocket

Connection: Upgrade

Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=

Sec-WebSocket-Protocol: v10.stomp

握手成功后,HTTP upgrade请求的TCP套接字保持打开,客户端和服务器可以继续发送和接收消息。

对WebSockets工作原理的完整介绍超出了本文档的范围。请参阅RFC 6455、HTML5中有关WebSocket的章节,或者网上的任何介绍和教程。

注意,如果WebSocket服务器运行在web服务器(例如nginx)后面,你可能需要配置它来将 WebSocket升级请求传递给WebSocket服务器。

自定义HandlerMapping

自定义HandlerMapping是为了在项目中能够自动的失败0到N的不同请求的WebSocket连接

public class WebSocketHandlerMapping extends SimpleUrlHandlerMapping {
  
  @Override
  public void initApplicationContext() throws BeansException {
    Map handlers = new HashMap<>();
    ApplicationContext context = getApplicationContext() ;
    Map beans = context.getBeansOfType(WebSocketHandler.class) ;
    for (WebSocketHandler handler : beans.values()) {
      WebSocketMapping webSocketMapping = AnnotatedElementUtils.findMergedAnnotation(handler.getClass(), WebSocketMapping.class) ;
      if (webSocketMapping != null) {
        String value = webSocketMapping.value() ;
        if (StringUtils.hasLength(value)) {
          handlers.put(value, handler) ;
        }
      }
    }
    if (handlers.size() > 0) {
      this.setUrlMap(handlers) ;
      super.initApplicationContext();
    }
  }

  @Override
  public int getOrder() {
    return Ordered.HIGHEST_PRECEDENCE ;
  }
  
}

在这HandlerMapping中使用了自定义的Mapping注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface WebSocketMapping {
  /**请求路径*/
  String value() default "" ;
}

通过上面的HandlerMapping处理能够识别出当前环境下所有带有@WebSocketMapping注解的Bean,然后进行注册到当前的URL集合中。

@Component
@WebSocketMapping("/chat2/{name}")
public class ChatWebSocketHandler2 implements WebSocketHandler {
  private static final Logger logger = LoggerFactory.getLogger(ChatWebSocketHandler2.class) ;
  public static final Map sessions = new ConcurrentHashMap<>() ;

  @Override
  public Mono handle(WebSocketSession session) {
    System.out.println(session) ;
    URI uri = session.getHandshakeInfo().getUri() ;
    String path = uri.getPath() ;
    String username = path.split("/")[2] ;
    logger.info("Client id: {} Connected, Request URI: {}", session.getId(), uri) ;
    HttpHeaders headers = session.getHandshakeInfo().getHeaders() ;
    logger.info("Request Headers: {}", headers) ;
    Mono receive = session.receive()
        .doOnNext(message -> {
          // 这里如果header中没有to,那么返回null,所以要做好判断,不然默认异常是不会被抛出的
          // 导致连接即关闭,只有加了下面的onErrorMap才能看到异常信息
          List tos = headers.get("to") ;
          if (tos !=null && !tos.isEmpty()) {
            String to = tos.get(0) ;
            WebSocketWrapper wsw = sessions.get(to) ;
            if (wsw != null) {
              String msg = message.getPayloadAsText() ;
              logger.info("给 {} 发送消息: {}", tos, msg) ;
              wsw.send(msg) ;
            }
          } else {
            logger.info("Chat 接收到消息: {}", message.getPayloadAsText());
          }
        }).onErrorMap(ex -> {
          ex.printStackTrace();
          return ex ;
        }).then() ;
    Mono sender = session.send(Flux.create(sink -> sessions.put(username, new WebSocketWrapper(session, sink)))) ;
    return Mono.zip(receive, sender).doFinally(signalType -> {
            logger.info("Client id: {}, 断开连接. 信号: {}", session.getId(), signalType.name());
            sessions.remove(username) ;
            session.close() ;
          }).then() ;
  }
}

WebSocketWrapper

public class WebSocketWrapper {
  private WebSocketSession session ;
  private FluxSink sink ; 
  public void send(String payload) {
    this.sink.next(session.textMessage(payload)) ;
  }
}

测试:

点对点消息

完毕!!!

长期创作关注不迷路!!!

Spring WebFlux使用函数式编程之HandlerFunction(1)
Spring WebFlux使用函数式编程之RouterFunction(2)
Spring WebFlux中使用WebClient远程接口调用
一文带你彻底理解Spring WebFlux的工作原理
Spring WebFlux请求处理流程
SpringBoot WebFlux整合MongoDB实现CRUD及分页功能
Spring WebFlux核心处理组件DispatcherHandler
SpringBoot WebFlux整合R2DBC实现数据库反应式编程
Spring WebFlux使用函数式编程之Filtering Handler Functions
Spring WebFlux入门实例并整合数据库实现基本的增删改查

相关推荐

微信记录恢复助手(微信记录恢复助手安全吗)
  • 微信记录恢复助手(微信记录恢复助手安全吗)
  • 微信记录恢复助手(微信记录恢复助手安全吗)
  • 微信记录恢复助手(微信记录恢复助手安全吗)
  • 微信记录恢复助手(微信记录恢复助手安全吗)
五笔打字怎么打(曹五笔打字怎么打)

操作步骤:1、按住Ctrl+空格键切换到五笔输入法;2、在输入面板上面单击右键选择“软键盘”——特殊符号;3、在打开的软键盘上单击“☆”即可。五笔打字是指采用五笔字型输入法向电脑中输入汉字。这种输入...

内存不能为written修复工具(一直出现内存不能为written)
  • 内存不能为written修复工具(一直出现内存不能为written)
  • 内存不能为written修复工具(一直出现内存不能为written)
  • 内存不能为written修复工具(一直出现内存不能为written)
  • 内存不能为written修复工具(一直出现内存不能为written)
电脑高手24在线咨询(电脑高手联系方式)

现在的电脑一般都不要重启键了。如果死机的话,按住开关键五秒,自动关机。再开机就行。至于他们说的快速关机CTRL+ALT+ENT快速重启CTRL+ALT+HOME是GOHST版安装后自带的快捷键,你的系...

你的电脑未正确启动自动修复

1、试试“禁止驱动强制签名”能不能进入桌面在“疑难解答”->“高级选项”->“启动模式”中选择“禁止驱动强制签名”,如果这样能正常开启,那么就说明是某个驱动的问题2、把错误驱动删掉如果你安...

电脑开机弹出系统恢复选项(电脑开机经常出现系统恢复界面)

  这种情况一般都是系统引导出现问题,可以【F8】选择【最后一次正确配置】,重启后一般都能恢复;如果不行可以选择进入【安全模式】【恢复我的计算机到一个较早的时间】。以下是详细介绍:  1、开机时多次点...

手机版电脑模拟器下载(手机电脑模拟器下载的文件在哪)
手机版电脑模拟器下载(手机电脑模拟器下载的文件在哪)

  玩家们想要在电脑上畅快地玩真实手机,首先就需要先下载它的电脑版模拟器啦。在这里推荐大家使用的是电脑安卓模拟器,这是一款十分流畅好用的真实手机安卓模拟器,性能强悍,功能完备。  1、下载完真实手机安卓模拟器。  2、在电脑上进行安装,双击...

2026-01-01 16:03 off999

下载计算机到手机(手机下载计算机怎么下载)
  • 下载计算机到手机(手机下载计算机怎么下载)
  • 下载计算机到手机(手机下载计算机怎么下载)
  • 下载计算机到手机(手机下载计算机怎么下载)
  • 下载计算机到手机(手机下载计算机怎么下载)
u盘启动不了怎么回事(u盘启动也启动不了)

原因三:USB传输性能不佳导致;解决三:换个USB插口试试,建议将u盘插入到电脑机箱后置的USB插口处。原因四:u盘自身的质量问题导致;解决四:换一个u盘制作试一试。原因五:电脑系统问题导致;解决五:...

联想笔记本电脑键盘输入没反应

1.首先在可以进行输入的位置,长按下某个按键1秒以上,看看有没有反应。有反应看第2,没反应看第3。2.控制面板~轻松使用~轻松使用设置中心~更改键盘的工作方式~取消筛选键并应用即可。3.打开设备管理器...

软件管家电脑版下载官网(软件管家电脑版下载官网安装)

要下载和安装应用程序,您可以按照以下步骤使用电脑管家:1.打开电脑管家应用程序。2.在主界面上,您可以找到一个名为“应用中心”的选项,点击它。3.在应用中心中,您可以浏览各种应用程序的列表。您可...

台式电脑怎么取消定时关机(台式电脑设置自动关机怎么取消)

电脑设置了每天定时关机,取消的方法有多种,以下提供三种方式:方法一:打开任务计划程序(TaskScheduler)。找到“任务计划程序库”(TaskSchedulerLibrary),找到设置的...

win7怎么截屏快捷键(win7怎样截屏快捷键)

在Win7系统中,自带的截图快捷键是“PrtScn”键,即PrintScreen键。按下这个键后,系统会将当前屏幕的内容复制到剪贴板中,然后用户可以将其粘贴到其他应用程序中进行编辑或保存。此外,Wi...

如何查看笔记本配置(如何查看笔记本配置高低)

两种方法一种你在笔记本背面有个ID号,也就是序列号,你把它抄下来,输到笔记本品牌的官网上,查看他的配置就可以,这是第1种方法,第2种方法,你开机后,我的电脑上单击右键,点属性,就会在出来你的CPU内存...

linux软件(linux软件图标)

Linux是一种自由和开放源代码的类UNIX操作系统。该操作系统的内核由林纳斯·托瓦兹在1991年10月5日首次发布。在加上用户空间的应用程序之后,成为Linux操作系统。Linux也是自由软件和开放...

取消回复欢迎 发表评论: