Get Even More Visitors To Your Blog, Upgrade To A Business Listing >>

ts封装websocket,支持失败重连、心跳、事件订阅

前言

一直很想体验下websocket,苦于一直没有机会,乘着这次优化,封装了一个原生的websocket处理类,本来是想用Socket.io的,但是它只能和它配到的server端使用,对于一些非封装的服务端,没法直接使用,于是只能自己封装了。

功能:

  1. 支持失败重连
  2. 支持心跳
  3. 支持重新初始化
  4. 事件订阅
  5. ts愉悦的类型推断,传入事件名和回调函数,能自动推断出结果类型
  6. 支持手动断卡,重新初始化即可重新连接

为什么需要重新初始化,因为我们的spa项目中,用户退出登录需要断开socket连接,用户登录后又需要重新连接,所以重新初始化承接的是切换用户重连的功能。

事件订阅是由于socket传递消息全靠原生的onmessage事件,在通过这个事件的event对象里面的一些属性去区分具体事件,所以我们需要一个事件分发机制,用于通知对应事件。

再者,有些事件是长期的,订阅一次后可能没法重新订阅,比如app.vue中的处理,所以我设计的事件订阅会一直持有所有订阅,除非你手动off移除,这样的话,哪怕重新初始化连接socket,相对于的事件通知也不会丢失。

源码

[hide]

首先是类型声明文件: types.ts

  /** socket实例化参数 */
export interface WebSocketOptions {
    /** 链接的url */
    url: string | URL;
    /** 协议字符串或一个协议字符串数组 */
    protocols?: string | string[];
}

/** 监听事件回调 */
export type EventCallback = (data: D) => void;

/** socket事件对应的回调参数类型 */
export interface SocketEventMap {
    /** 默认事件:连接成功 */
    onopen: Event;
    /** 默认事件:连接关闭 */
    onclose: CloseEvent;
    /** 默认事件:message,只有在json解析数据发生错误时触发 */
    onmessage: MessageEvent;
    /** 心跳事件 */
    event_ping: {
        /** 与服务器时间差 */
        delay: number;
        /** 当前与服务器连接的用户id */
        id: number;
        /** 连接类型 */
        type: string;
    };
    /** 用户充值成功通知 */
    event_user_recharge: {
        id: number;
        user_id: number;
        /** 当前充值的金额 */
        pay_amount: string;
        amount: number;
        pay_type_id: number;
        create_time: number;
        update_time: number;
        transaction_id: string;
        ip: string;
        pay_day: number;
        pay_time: number;
        notify_data: string;
        status: number;
        order_id: string;
        pay_name: string;
        bank_code: string;
        pay_times: number;
        pay_email: string;
        pay_phone: number;
        pay_way: number;
        pay_ratio: string;
        bonus: string;
        recharge_times: number;
        gift: string;
        success_ymd: number;
        user_merge_id: number;
    };
    /** 连接成功 */
    event_connect: { client_id: string };
    /** 连接的用户 */
    event_bind: {};
    /** 登录用户的数据 */
    event_real_user: {};
}

/** 客户端socket发送消息参数 */
export interface SocketSendData {
    data: {
        /** 事件名 */
        event: keyof SocketEventMap;
        /** 数据 */
        data: Record | Array | string;
    };
}

SocketEventMap里存放着socket事件和对应的事件参数,如果有新的事件,直接填入对应数据即可。

主体文件: index.ts

  import type { SocketEventMap, EventCallback, WebSocketOptions, SocketSendData } from "./types";
import { store } from "@/store";

export class Socket {
    /** 实例 */
    private static instance: Socket;
    /** store */
    private store: typeof store;
    /** 事件map */
    private eventMap: Map> = new Map();
    /** 是否开发模式 */
    private isDev: boolean = import.meta.env.VITE_ENV === "development";
    /** 是否已经初始化 */
    private isInit: boolean = false;
    /** webSocket实例 */
    private socket?: WebSocket;
    /** webSocket实例options:用于失败重连 */
    private socketOptions: WebSocketOptions = {
        url: ""
    };
    /** WebSocket非正常关闭code码 */
    public static CLOSE_ABNORMAL = 1006;
    /** 当前失败重连次数 */
    private reconnectCount: number = 0;
    /** 最大运行重连次数 */
    private reconnectLimit: number = 3;
    /** 心跳定时器 */
    private heartCheckTimer: NodeJS.Timeout | null = null;
    /** 心跳延迟,后端定的30s,怕赶不上,调整20s */
    private heartCheckDelay: number = 20 * 1000;

    private constructor() {
        this.store = store;
    }

    /** 获取实例 */
    public static getInstance(): Socket {
        if (!Socket.instance) {
            Socket.instance = new Socket();
        }
        return Socket.instance;
    }

    /** 订阅事件 */
    public on(event: T, callback: EventCallback): void {
        let eventList = this.eventMap.get(event);
        if (!eventList) {
            eventList = [];
            this.eventMap.set(event, eventList);
        }
        eventList.push(callback);
    }

    /** 取消订阅事件 */
    public off(event: T, callback: EventCallback): void {
        const eventList = this.eventMap.get(event);
        if (!eventList) return;
        const index = eventList.findIndex((item) => item === callback);
        if (index > -1) {
            eventList.splice(index, 1);
        }
    }

    /** 订阅一次性事件 */
    public once(event: T, callback: EventCallback): void {
        let eventList = this.eventMap.get(event);
        if (!eventList) {
            eventList = [];
            this.eventMap.set(event, eventList);
        }
        const onceCallback = (data: SocketEventMap[T]) => {
            callback(data);
            this.off(event, onceCallback);
        };
    }

    /** 触发订阅 */
    private emit(event: T, data: SocketEventMap[T]): void {
        const eventList = this.eventMap.get(event);
        if (!eventList) return;
        eventList.forEach((callback) => callback(data));
    }

    /** 清空订阅 */
    public clear() {
        this.eventMap.clear();
    }

    /** 初始化 */
    public init() {
        // 未登录或者已经初始化过,不再初始化
        if (this.isInit || !this.store.getters["config/isLogin"]) return;
        this.isInit = true;
        // 创建socket实例
        this.socket = this.createWebSocket();
        // 首次初始化时订阅心跳事件
        this.on("event_ping", this.onPing);
    }

    /** 重新初始化 */
    public reInit() {
        // 未登录不进行初始化
        if (!this.store.getters["config/isLogin"]) return;
        if (!this.isInit) {
            this.init();
            return;
        }
        // 重新创建新的websocket实例
        this.reconnectCount = 0;
        this.socket = this.createWebSocket();
    }

    /** 获取socket连接地址 */
    private getSocketUrl(): string {
        if (this.isDev) {
            return import.meta.env.VITE_DEV_WS_URL;
        }
        return `wss://${location.host}`;
    }

    /** 获取token */
    private getToken(): string {
        const isLogin = this.store.getters["config/isLogin"];
        if (isLogin) {
            return this.store.state.config.real_token ?? "";
        }
        return "";
    }

    /** 创建websocket */
    private createWebSocket(options?: WebSocketOptions): WebSocket {
        const op: WebSocketOptions = {
            url: ""
        };
        if (options) {
            Object.assign(op, options);
        } else {
            const baseUrl = this.getSocketUrl();
            const token = this.getToken();
            op.url = `${baseUrl}?token=${token}`;
        }
        // 保存options,方便失败重连
        this.socketOptions = op;
        const socket = new WebSocket(op.url, op.protocols);
        // 监听事件
        socket.onopen = this.onOpen;
        socket.onmessage = this.onMessage;
        socket.onclose = this.onClose;

        return socket;
    }

    /** websocket onOpen */
    private onOpen = (event: Event) => {
        if (this.isDev) {
            console.log("socket connection successful");
        }
        // 创建心跳
        this.heartCheck();
        // 触发订阅
        this.emit("onopen", event);
    };

    /** websocket onMessage */
    private onMessage = (event: MessageEvent) => {
        const { data } = event;
        try {
            const parseData = JSON.parse(data as string);
            this.emit(parseData.event as keyof SocketEventMap, parseData.data as SocketEventMap[keyof SocketEventMap]);
        } catch (error) {
            this.emit("onmessage", data);
        }
    };

    /** websocket onClose */
    private onClose = (event: CloseEvent) => {
        // 结束心跳,重连时会重新创建
        this.clearHeartCheck();
        // 如果WebSocket是非正常关闭 则进行重连
        if (event.code === Socket.CLOSE_ABNORMAL) {
            if (this.reconnectCount  {
        if (this.isDev) console.log("socket heart check success");
    };

    /** websocket 重连 */
    private reconnect() {
        this.socket = this.createWebSocket(this.socketOptions);
    }

    /** 发送消息 */
    public sendMessage(data: SocketSendData) {
        this.socket?.send(JSON.stringify(data));
    }

    /** 创建心跳 */
    private heartCheck() {
        this.clearHeartCheck();
        this.heartCheckTimer = setInterval(() => {
            if (this.socket?.readyState === WebSocket.OPEN) {
                this.sendMessage({
                    data: {
                        event: "event_ping",
                        data: { time: Date.now() }
                    }
                });
            }
        }, this.heartCheckDelay);
    }

    /** 结束心跳 */
    private clearHeartCheck() {
        if (this.heartCheckTimer !== null) {
            clearInterval(this.heartCheckTimer);
            this.heartCheckTimer = null;
        }
    }

    /** 结束websocket连接 */
    public close() {
        this.socket?.close();
    }
}

由于我的项目目前需要判断下用户是否登录,所以引入的vuex,大家使用可以根据自己的项目情况调整,就几个判断逻辑。

使用

  import { Socket } from "@/socket";

// 初始化socket
Socket.getInstance().init();

一般来说是在main.ts中进行初始化,注意我使用了vuex,所以初始化要放在use(vuex)后,以防出现问题。

当用户退出登录,我们就可以结束连接:

  import { Socket } from "@/socket";

Socket.getInstance().close();

用户重新登录后,我们就重新连接:

  import { Socket } from "@/socket";

Socket.getInstance().reInit();

用户登录连接,后端是要求传入token的,通过链接参数传递,具体在 createWebSocket处理了,有需要可以自行调整。

[/hide]



This post first appeared on IT瘾 | IT社区推荐资讯, please read the originial post: here

Share the post

ts封装websocket,支持失败重连、心跳、事件订阅

×

Subscribe to It瘾 | It社区推荐资讯

Get updates delivered right to your inbox!

Thank you for your subscription

×