百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

聊聊分布式下的WebSocket解决方案

off999 2025-03-06 18:32 25 浏览 0 评论

前言

最近自己搭建了个项目,项目本身很简单,但是里面有使用WebSocket进行消息提醒的功能,大体情况是这样的。

发布消息者在系统中发送消息,实时的把消息推送给对应的一个部门下的所有人。

这里面如果是单机应用的情况时,我们可以通过部门的id和用户的id组成一个唯一的key,与应用服务器建立WebSocket长连接,然后就可以接收到发布消息者发送的消息了。

但是真正把项目应用于生产环境中时,我们是不可能就部署一个单机应用的,而是要部署一个集群。

所以我通过Nginx+两台Tomcat搭建了一个简单的负载均衡集群,作为测试使用

但是问题出现了,我们的客户端浏览器只会与一台服务器建立WebSocket长连接,所以发布消息者在发送消息时,就没法保证所有目标部门的人都能接收到消息(因为这些人连接的可能不是一个服务器)。

本篇文章就是针对于这么一个问题展开讨论,提出一种解决方案,当然解决方案不止一种,那我们开始吧。

WebSocket单体应用介绍

在介绍分布式集群之前,我们先来看一下王子的WebSocket代码实现,先来看java后端代码如下:

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@ServerEndpoint("/webSocket/{key}")
public class WebSocket {
    private static int onlineCount = 0;
    /**
     * 存储连接的客户端
     */
    private static Map clients = new ConcurrentHashMap();
    private Session session;
    /**
     * 发送的目标科室code
     */
    private String key;

    @OnOpen
    public void onOpen(@PathParam("key") String key, Session session) throws IOException {
        this.key = key;
        this.session = session;
        if (!clients.containsKey(key)) {
            addOnlineCount();
        }
        clients.put(key, this);
        Log.info(key+"已连接消息服务!");
    }

    @OnClose
    public void onClose() throws IOException {
        clients.remove(key);
        subOnlineCount();
    }

    @OnMessage
    public void onMessage(String message) throws IOException {
        if(message.equals("ping")){
            return ;
        }
        JSONObject jsonTo = JSON.parseObject(message);
        String mes = (String) jsonTo.get("message");
        if (!jsonTo.get("to").equals("All")){
            sendMessageTo(mes, jsonTo.get("to").toString());
        }else{
            sendMessageAll(mes);
        }
    }

    @OnError
    public void onError(Session session, Throwable error) {
        error.printStackTrace();
    }

    private void sendMessageTo(String message, String To) throws IOException {
        for (WebSocket item : clients.values()) {
            if (item.key.contains(To) )
                item.session.getAsyncRemote().sendText(message);
        }
    }

    private void sendMessageAll(String message) throws IOException {
        for (WebSocket item : clients.values()) {
            item.session.getAsyncRemote().sendText(message);
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocket.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocket.onlineCount--;
    }

    public static synchronized Map getClients() {
        return clients;
    }
}

示例代码中并没有使用Spring,用的是原生的java web编写的,简单和大家介绍一下里面的方法。

onOpen:在客户端与WebSocket服务连接时触发方法执行

onClose:在客户端与WebSocket连接断开的时候触发执行

onMessage:在接收到客户端发送的消息时触发执行

onError:在发生错误时触发执行

可以看到,在onMessage方法中,我们直接根据客户端发送的消息,进行消息的转发功能,这样在单体消息服务中是没有问题的。

再来看一下js代码

var host = document.location.host;

    // 获得当前登录科室
    var deptCodes='${sessionScope.$UserContext.departmentID}';
    deptCodes=deptCodes.replace(/[\[|\]|\s]+/g, "");
    var key = '${sessionScope.$UserContext.userID}'+deptCodes;
    var lockReconnect = false;  //避免ws重复连接
    var ws = null;          // 判断当前浏览器是否支持WebSocket
    var wsUrl = 'ws://' + host + '/webSocket/'+ key;
    createWebSocket(wsUrl);   //连接ws

    function createWebSocket(url) {
        try{
            if('WebSocket' in window){
                ws = new WebSocket(url);
            }else if('MozWebSocket' in window){  
                ws = new MozWebSocket(url);
            }else{
                  layer.alert("您的浏览器不支持websocket协议,建议使用新版谷歌、火狐等浏览器,请勿使用IE10以下浏览器,360浏览器请使用极速模式,不要使用兼容模式!"); 
            }
            initEventHandle();
        }catch(e){
            reconnect(url);
            console.log(e);
        }     
    }

    function initEventHandle() {
        ws.onclose = function () {
            reconnect(wsUrl);
            console.log("llws连接关闭!"+new Date().toUTCString());
        };
        ws.onerror = function () {
            reconnect(wsUrl);
            console.log("llws连接错误!");
        };
        ws.onopen = function () {
            heartCheck.reset().start();      //心跳检测重置
            console.log("llws连接成功!"+new Date().toUTCString());
        };
        ws.onmessage = function (event) {    //如果获取到消息,心跳检测重置
            heartCheck.reset().start();      //拿到任何消息都说明当前连接是正常的//接收到消息实际业务处理
        ...
        };
    }
    // 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
    window.onbeforeunload = function() {
        ws.close();
    }  

    function reconnect(url) {
        if(lockReconnect) return;
        lockReconnect = true;
        setTimeout(function () {     //没连接上会一直重连,设置延迟避免请求过多
            createWebSocket(url);
            lockReconnect = false;
        }, 2000);
    }

    //心跳检测
    var heartCheck = {
        timeout: 300000,        //5分钟发一次心跳
        timeoutObj: null,
        serverTimeoutObj: null,
        reset: function(){
            clearTimeout(this.timeoutObj);
            clearTimeout(this.serverTimeoutObj);
            return this;
        },
        start: function(){
            var self = this;
            this.timeoutObj = setTimeout(function(){
                //这里发送一个心跳,后端收到后,返回一个心跳消息,
                //onmessage拿到返回的心跳就说明连接正常
                ws.send("ping");
                console.log("ping!")
                self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了
                    ws.close();     //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
                }, self.timeout)
            }, this.timeout)
        }
  }

js部分使用的是原生H5编写的,如果为了更好的兼容浏览器,也可以使用SockJS,有兴趣小伙伴们可以自行百度。

接下来我们就手动的优化代码,实现WebSocket对分布式架构的支持。

解决方案的思考

现在我们已经了解单体应用下的代码结构,也清楚了WebSocket在分布式环境下面临的问题,那么是时候思考一下如何能够解决这个问题了。

我们先来看一看发生这个问题的根本原因是什么。

简单思考一下就能明白,单体应用下只有一台服务器,所有的客户端连接的都是这一台消息服务器,所以当发布消息者发送消息时,所有的客户端其实已经全部与这台服务器建立了连接,直接群发消息就可以了。

换成分布式系统后,假如我们有两台消息服务器,那么客户端通过Nginx负载均衡后,就会有一部分连接到其中一台服务器,另一部分连接到另一台服务器,所以发布消息者发送消息时,只会发送到其中的一台服务器上,而这台消息服务器就可以执行群发操作,但问题是,另一台服务器并不知道这件事,也就无法发送消息了。

现在我们知道了根本原因是生产消息时,只有一台消息服务器能够感知到,所以我们只要让另一台消息服务器也能感知到就可以了,这样感知到之后,它就可以群发消息给连接到它上边的客户端了。

那么什么方法可以实现这种功能呢,王子很快想到了引入消息中间件,并使用它的发布订阅模式来通知所有消息服务器就可以了。

引入RabbitMQ解决分布式下的WebSocket问题

在消息中间件的选择上,王子选择了RabbitMQ,原因是它的搭建比较简单,功能也很强大,而且我们只是用到它群发消息的功能。

RabbitMQ有一个广播模式(fanout),我们使用的就是这种模式。

首先我们写一个RabbitMQ的连接类:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class RabbitMQUtil {
    private static Connection connection;

    /**
     * 与rabbitmq建立连接
     * @return
     */
    public static Connection getConnection() {
        if (connection != null&&connection.isOpen()) {
            return connection;
        }

        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("192.168.220.110"); // 用的是虚拟IP地址
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");

        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

        return connection;
    }
}

这个类没什么说的,就是获取MQ连接的一个工厂类。

然后按照我们的思路,就是每次服务器启动的时候,都会创建一个MQ的消费者监听MQ的消息,王子这里测试使用的是Servlet的监听器,如下:

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;


public class InitListener implements ServletContextListener {
    @Override
    public void contextInitialized(ServletContextEvent servletContextEvent) {
        WebSocket.init();
    }

    @Override
    public void contextDestroyed(ServletContextEvent servletContextEvent) {

    }
}

记得要在Web.xml中配置监听器信息



    
        InitListener
    

WebSocket中增加init方法,作为MQ消费者部分

public  static void init() {
        try {
            Connection connection = RabbitMQUtil.getConnection();
            Channel channel = connection.createChannel();
            //交换机声明(参数为:交换机名称;交换机类型)
            channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT);
            //获取一个临时队列
            String queueName = channel.queueDeclare().getQueue();
            //队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
            channel.queueBind(queueName,"fanoutLogs","");


            //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String message = new String(body,"UTF-8");
                    System.out.println(message);
            //这里可以使用WebSocket通过消息内容发送消息给对应的客户端
                }
            };

            //声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体)
            channel.basicConsume(queueName,true,consumer);
            //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

同时在接收到消息时,不是直接通过WebSocket发送消息给对应客户端,而是发送消息给MQ,这样如果消息服务器有多个,就都会从MQ中获得消息,之后通过获取的消息内容再使用WebSocket推送给对应的客户端就可以了。

WebSocket的onMessage方法增加内容如下:

try {
            //尝试获取一个连接
            Connection connection = RabbitMQUtil.getConnection();
            //尝试创建一个channel
            Channel channel = connection.createChannel();
            //声明交换机(参数为:交换机名称; 交换机类型,广播模式)
            channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT);
            //消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可)
            channel.basicPublish("fanoutLogs","", null,msg.getBytes("UTF-8"));
            System.out.println("发布消息");
            channel.close();
        } catch (IOException |TimeoutException e) {
            e.printStackTrace();
        }

增加后删除掉原来的Websocket推送部分代码。

这样一整套的解决方案就完成了。

总结

到这里,我们就解决了分布式下WebSocket的推送消息问题。

我们主要是引入了RabbitMQ,通过RabbitMQ的发布订阅模式,让每个消息服务器启动的时候都去订阅消息,而无论哪台消息服务器在发送消息的时候都会发送给MQ,这样每台消息服务器就都会感知到发送消息的时间,从而再通过Websocket发送给客户端。

大体流程就是这样,那么小伙伴们有没有想过,如果RabbitMQ挂掉了几分钟,之后重启了,消费者是否可以重新连接到RabbitMQ?是否还能正常接收消息呢?

生产环境下,这个问题是必须考虑的。

这里已经测试过,消费者是支持自动重连的,所以我们可以放心的使用这套架构来解决此问题。

本文到这里就结束了,欢迎各位小伙伴留言讨论,一起学习,一起进步。

相关推荐

路由器登陆账号密码(路由器登陆账号密码忘了)

1、一般路由器的账号为admin,密码也是admin;还有路由器产品出厂时默认登录密码是guest,有点路由器产品的出厂时初始登录密码为【空】,也就是没有登录密码,直接就可以进入设置界面的;  2、您...

新风系统安装步骤(新风系统安装方案图)

1.设计与规划:在安装新风系统之前,首先需要进行设计和规划。根据建筑物的空间布局、通风需求、使用人数等因素,确定新风系统的类型(如全热交换新风系统、单向流新风系统等)和安装位置。2.现场勘查:在设...

系统win8下载(win8下载软件)

可以按照以下步骤在Win8上下载和安装Word:1.通过微软官网下载购买,或者通过MicrosoftStore应用商店进行购买和下载。2.下载完成后,打开文件夹,双击setup进行安装。3.安...

电脑的详细参数在哪里看(电脑详细参数怎么看)

要查看电脑参数,可以通过以下几种方式:1.使用操作系统提供的系统信息工具:大多数操作系统都会提供一个系统信息工具,可以显示电脑的基本参数。在Windows操作系统中,可以打开"控制面板...

windows无法连接打印机拒绝访问
  • windows无法连接打印机拒绝访问
  • windows无法连接打印机拒绝访问
  • windows无法连接打印机拒绝访问
  • windows无法连接打印机拒绝访问
oppo解除安全模式(oppp取消安全模式)

一般来说,关机重启手机即可退出安全模式。如果重启手机安全模式还没有解除的话,可以使用手机杀毒软件对手机进行全盘查杀,杀完毒再进行重启试试。如果还是不行的话,那就只能备份手机里的重要数据,刷机或者恢复出...

电脑更新后无法正常启动(电脑更新后无法正常使用)

电脑更新无法开机可能有多种原因。首先,可能是由于更新过程中出现了错误或中断,导致系统文件损坏或丢失,从而无法正常启动。其次,可能是更新过程中出现了兼容性问题,导致某些硬件或驱动程序无法与新的系统版本兼...

office免费版下载win7(office2007官方免费版)

office2007官方下载免费完整版win7/win8/win8.1/win10通用的。office2007官方下载免费完整版win7/win8/win8.1/win10通用版本包括Word、Exc...

hp笔记本重装系统按哪个键(hp笔记本win10重装系统详细步骤)

恢复系统重装,在开机时n1SC键,在弹出的菜单中按f2或者是f9键,选项中选择如果是从光盘重装系统的就选择从光盘启动系统,如果使用U盘,启动系统就选择从U盘进入到系统中在开机时按ESC调出启动菜单,有...

磁盘重新分区(磁盘重新分区怎么操作)

您好,以下是给磁盘重新分区的步骤:1.打开磁盘管理工具。在Windows中,可以按Win键+X,然后点击磁盘管理来打开。在Mac中,可以打开磁盘工具来进行分区。2.选择需要分区的磁盘。在磁盘管理工...

如何用手机设置无线路由器密码

首先打开你的手机,在设置中确认连接上你的wifi之后,任意打开一个浏览器,输入“tplogin.cn”;进入页面可以看见一个输入框,输入你设置过的宽带密码,如果你不知道可以打电话给宽带的客服查询;进入...

台式电脑用usb重装系统步骤(台式电脑怎样usb装机)

1.U盘WinPE系统制作先到网上去下载一个u盘启动盘制作工具的工具,常用的有u大师U盘启动盘制作工具。再把U盘插入电脑中,运行软件将u盘制作成启动盘。2.准备系统GHO镜像文件在网上下载GHOST系...

hotmail邮箱格式(hotmail邮箱登录后缀)

电子邮箱的正确格式为user@mail.server.name,其中user是收件人的用户名,mail.server.name是收件人的电子邮件服务器名。具体详情如下:1、QQ邮箱的标准格式为:112...

windows11主题包(win10 win11主题)

1."梦幻星空"是ColorOS11中最好看的主题。2.这是因为"梦幻星空"主题采用了炫酷的星空元素,配合流畅的动画效果,给人一种梦幻般的感觉。同时,主题的配色也...

戴尔官方商城官网(戴尔官方商城官网首页)

https://www.dell.com/戴尔公司于1992年进入《财富》杂志500强之列,戴尔因此成为其中最年轻的首席执行官。戴尔公司名列《财富》杂志500强的第48位。自1995年起,戴尔公司一直...

取消回复欢迎 发表评论: