基于WebSocket 长连接客户端设计文档

1.简介

为了便于业务线开发和使用,公共服务推进整合相关长连接通道,统一规划,提高长连接可维护性和降低开发成本。目前, 各业务线存在长连接及使用情况如下:

  • 病历同步, 目前基于Websocket协议,使用Golang语言开发的服务端,业务服务和长连接服务通过Kafka实现消息通信.
  • 协作群组, 基于SocketCluster开发
  • 小秘书小红点,仅用于判断是否存在新消息
  • 诊疗圈

1.1 目标

易使用: 便于各业务线使用
高可靠: 独立进程,持久化消息,防消息丢失,ACK机制,保证消息到达并被处理
可扩展: 统一的SDK,便于长连接协议及消息无缝升级
安全性: 支持 WSS
可监控: 原生的监控统计支持

1.2解决的问题

► 长连接方案不统一
► 消息丢失无法定位
► 长连接生命周期不可控

2.架构总览

本项目是基于WebSocket 实现的长连接方案,客户端SDK 命名为 Seawater。

nv-websocket: WebSocket协议 java实现方案
socketCluster-java: 长连接消息体定义、重连、认证、Channel订阅实现。
SeaWater: 长连接状态维护(登录、退出)、维护Android 本地IPC,ACK 确认。

服务端使用 SocketCluster 维护长连接服务,通过消息中心使用http请求与业务服务交换数据。

3.实现

3.1 WebSocket协议Java实现

nv-websocket,High-quality WebSocket client implementation in Java which

  • complies with RFC 6455 (The WebSocket Protocol),
  • works on Java SE 1.5+ and Android,
  • supports all the frame types (continuation, binary, text, close, ping and pong),
  • provides a method to send a fragmented frame in addition to methods for unfragmented frames,
  • provides a method to get the underlying raw socket of a WebSocket to configure it,
  • provides a method for Basic Authentication,
  • provides a factory class which utilizes javax.net.SocketFactory interface,
  • provides a rich listener interface to hook WebSocket events, has fine-grained error codes for fine-grained controllability on errors,
  • allows to disable validity checks on RSV1/RSV2/RSV3 bits and opcode of frames,
  • supports HTTP proxy, especially "Secure WebSocket" (wss) through "Secure Proxy" (https), and supports RFC 7692 (Compression Extensions for WebSocket), also known as permessage-deflate (not enabled by default).

3.1.1 WebSocket SSL

WebSocketFactory 工厂类用来设置 Websock 与服务器交互的特性,如:

  • 符合 RFC 6455 ( web socket协议),在 Java 1.5 + 和安卓上工作
  • 支持所有帧类型( 继续,二进制,文本,关闭,ping和 pong ),
  • 提供发送拆分帧的方法,除了unfragmented框架的方法之外,
  • 提供一个方法来获取 web socket的底层原始套接字以对它的进行配置,
  • 提供一个用于基本身份验证插件的方法,
  • 提供一个使用 javax.net.SocketFactory 接口的工厂类,
  • 提供一个丰富的侦听器接口来挂接 web socket事件,有细粒度误差代码,适用于错误的细粒度控制,
  • 允许禁用 RSV1/RSV2/RSV3 位的有效性检查和帧的操作码,
  • 通过"安全代理"( https ) 支持HTTP代理,特别是"安全 web socket"( wss ),
  • 并支持 RFC 7692 ( 。web socket的压缩扩展),也称为 permessage-deflate ( 默认情况下未启用) 。

获取WebSocketFactory 实例 :

    WebSocketFactory factory=socket.getFactorySettings();

为WebSocketFactory实例创建SSL context, 如果使用默认 SSL 配置则不需要调用setSSL*方法。

// Create a custom SSL context.
SSLContext context = NaiveSSLContext.getInstance("TLS");

// Set the custom SSL context.
factory.setSSLContext(context);  

3.1.2 设置 HTTP 代理

设置 Http 代理,需要在创建WebSock 实例前通过ProxySettings 类设置WebSocketFactory, 通过调用 WebSocketFactory.getProxySettings() 方法对代理进行设置.

// Get the associated ProxySettings instance.
ProxySettings settings = factory.getProxySettings();  

ProxySettings 类中有setHost 方法和 setPort 方法.

// Set a proxy server.
settings.setServer("https://proxy.example.com");  

如果代理服务器需要证书,可以通过 setId 方法和setPassword 方法, 或者 setCredentials 方法来设置相关参数。

// Set credentials for authentication at a proxy server.
settings.setCredentials(id, password);  

3.2 SocketCluster

SocketCluster is a fast, highly scalable HTTP + realtime server engine which lets you build multi-process realtime servers that make use of all CPU cores on a machine/instance. It removes the limitations of having to run your Node.js server as a single thread and makes your backend resilient by automatically recovering from worker crashes and aggregating errors into a central log on each host. SC can also auto-scale across multiple hosts on top of Kubernetes; see SCC guide: https://github.com/SocketCluster/socketcluster/blob/master/scc-guide.md.

Follow the project on Twitter: https://twitter.com/SocketCluster Subscribe for updates: http://socketcluster.launchrock.com/

3.3 SeaWater

3.3.1 重连

建立连接有成功就有失败,对于失败情况我们需要重连,下面我们分别说明重连的时机,重连的策略和当前是否应该重连的判断.

重连的场景:

  • 应用网络的切换.具体点就是可用网络状态的切换,比如4g切wifi连接会断开我们需要重连.

  • 应用回到前台的时候,判断如果连接断开我们需要重连,这个是尽量保持当应用再前臺的时候连接的稳定.

  • 收到连接失败或者连接断开事件的时候。

  • 心跳连续n次失败时候。

重连策略:

这里我定义了一个最小重连时间间隔min和一个最大重连时间间隔max,当重连次数小于等于3次的时候都以最小重连时间间隔min去尝试重连,当重连次数大于3次的时候我们将重连地址替换成默认地址DEF_URL,将重连时间间隔按min*(重连次数-2)递增最大不不超过max.

是否应该重连的判断:

  • 用户是否登录,可以通过本地是否有缓存的用户信息来判断.因为重连成功后我们需要将用户信息通过WebSocket发送到服务器进行身份验证所以这里必须登录成功.

  • 当前连接是否可用

  • 当前网络可用.

重连实现
通过handler实现了一定时间间隔的重连,然后在WsListener监听中的onConnectError()和onDisconnected()调用了reconnect()实现重连,onConnected()中调用了cancelReconnect()取消重连并初始化重连次数.

所以当需要重连的时候调用reconnect()方法,如果失败onConnectError()和onDisconnected()回调会再次调用reconnect()实现重连,如果成功onConnected()中会调用cancelReconnect()取消重连并初始化重连次数.

并且这里我们已经实现了需要重连的情景3,收到连接失败或者连接断开事件的时候进行重连.

接下来我们实现情景1和2

应用网络的切换.具体点就是可用网络状态的切换,比如4g切wifi连接会断开我们需要重连.

应用回到前臺的时候,判断如果连接断开我们需要重连,这个是尽量保持当应用再前臺的时候连接的稳定.

3.4长连接维持方案

服务端超过20s未监测到心跳,标识该设备已下线,设备连接后,重新下发未确认消息,基于APP上次启动之后的未确认消息,服务端保存消息条数上限100条。

具体设计与实现:http://98ki.com/wang-luo-bao-huo/

4.API

4.1 对内 API

建立连接

SeaWaterService启动的时候建立连接。

SeaWater.init(context,url,reconnectionSecond).start();  

监听连接

注册 basic listeners

SeaWater.getInstance().setListener(new BasicListener() {

public void onConnected(Socket socket,Map<String, List<String>> headers) {
            System.out.println("Connected to endpoint");
        }

        public void onDisconnected(Socket socket,WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) {
            System.out.println("Disconnected from end-point");
        }

        public void onConnectError(Socket socket,WebSocketException exception) {
            System.out.println("Got connect error "+ exception);
        }

        public void onSetAuthToken(String token, Socket socket) {
            System.out.println("Token is "+ token);
        }

        public void onAuthentication(Socket socket,Boolean status) {
            if (status) {
                System.out.println("socket is authenticated");
            } else {
                System.out.println("Authentication is required (optional)");
            }
        }

    });

获取channel

  • 获取所有 channel
    List <Socket.Channel> channels=Seawater.getInstance().getChannels();
  • 通过 channel name获取channel :
        SeaWater.Channel channel=Seawater.getInstance().getChannelByName("T_test");
        //Returns null if channel of given name is not present

属性设置

  • 关闭重连 :
   SeaWater.getInstance().setReconnection(null); 
  • 日至打印默认开启,关闭日至 :
   SeaWater.getInstance().disableLogging();

4.2 对外 API

订阅Channel

订阅 Channel,并通过传入 Listener 获取服务器消息。

listener =  new Emitter.Listener() {  
            @Override
            public void call(String name, Object data) {
                messageView.setText("Business Line Receiver, channel :"+name+", data:"+data.toString());
            }

        };
        SeaWaterClient.subscribChannel(this,channelView.getText().toString(),listener);

发送消息

  • 向 channel发消息 :
    /**
     * without acknowledgement
     */          
SeaWaterClient.sendMessage(channelView.getText().toString(), object.toString());

    /**
     * with acknowledgement
     */
                SeaWaterClient.sendMessage(DemoActivity.this,channelView.getText().toString(), object.toString(), new Ack() {
                    @Override
                    public void call(String name, Object error, Object data) {
                        if (error == null) {
                            Log.d(TAG,"Published message to channel successfully");
                        }
                    }
                });

附录

完整 push规约:

{
    "event": "#publish",
    "data": {
        "channel": "T_test1",
        "data": {
            "meta": {
                "version": 1,
                "bizType": "",
                "messageId": 1287
            },
            "body": {
                "userid": "1900000000051668013",
                "level": "3",
                "isUpgrade": "true",
                "VipName": "fd"
            }
        }
    }

消息确认规约:

{
    "event": "#publish",
    "data": {
        "channel": "$client-ack",
        "data": {
            "body": {
                "exchange": true,
                "error": ""
            },
            "meta": {
                "bizType": "",
                "messageId": 1283,
                "version": 1
            }
        }
    },
    "cid": 4
}

张鹏宇

继续阅读此作者的更多文章