Connmix 基于 Go 与 Lua 的分布式可编程协议平台即将上线

2021-12-20 11:45

Connmix 是我至今为止最令人兴奋的作品,与之前我创建的 mixphp, mixgo 框架不同,这是一种全新的开发思路,并且支持非常广泛的应用场景。

Connmix 是什么

基于 golang + lua 开发的面向消息编程的分布式自定义协议互联平台,通讯协议可以像 openresty 一样使用 lua 编程;该平台可快速接入到 java,php,go,py,nodejs,c# 等任意编程语言开发的现有系统;可以开发:分布式Web系统、APP实时消息推送、即时IM服务端、视频弹幕、游戏服务端、物联网、智能家居、安防监控等等。

可编程协议的能力让 Connmix 拥有无限的想象力,同时官方会开源 websocket, http, mqtt 等众多常用协议供用户直接使用。

简单一点描述就是:

  • Connmix 程序可以通过配置文件启动很多个 server ,每个 server 由一个 lua 入口文件驱动。
  • 根据 Connmix API,用户可以快速编写自己的通讯协议,如果是常用协议可以直接使用官方开源的 lua 协议包。
  • 同时 Connmix API 中包含内存队列,分布式订阅发布等功能,因此可以与分布式网格的其他任何连接通讯。
  • Connmix Mesh 是一个可以随意扩容缩容的服务网格,0依赖的设计,扩容只需启动一个新的Node即可
  • Connmix Client 使用 websocket 协议驱动,支持 java,php,go,py,nodejs,c# 等任何语言都可以和 Connmix Mesh 通讯来消费任务,并将消费结果通知到网格中的具体连接。

到底怎么用

启动一个 center 节点

% bin/connmix center -f conf/connmix.yaml

________/\\\\\\\\\___________________________________________________________________________________        
 _____/\\\////////____________________________________________________________________________________       
  ___/\\\/__________________________________________________________________________/\\\_______________      
   __/\\\_________________/\\\\\_____/\\/\\\\\\____/\\/\\\\\\______/\\\\\__/\\\\\___\///___/\\\____/\\\_     
    _\/\\\_______________/\\\///\\\__\/\\\////\\\__\/\\\////\\\___/\\\///\\\\\///\\\__/\\\_\///\\\/\\\/__    
     _\//\\\_____________/\\\__\//\\\_\/\\\__\//\\\_\/\\\__\//\\\_\/\\\_\//\\\__\/\\\_\/\\\___\///\\\/____   
      __\///\\\__________\//\\\__/\\\__\/\\\___\/\\\_\/\\\___\/\\\_\/\\\__\/\\\__\/\\\_\/\\\____/\\\/\\\___  
       ____\////\\\\\\\\\__\///\\\\\/___\/\\\___\/\\\_\/\\\___\/\\\_\/\\\__\/\\\__\/\\\_\/\\\__/\\\/\///\\\_ 
        _______\/////////_____\/////_____\///____\///__\///____\///__\///___\///___\///__\///__\///____\///__

        connmix0.0.0-alpha, go1.17.5, lua5.1+bit64, darwin, arm64

2021-12-20 13:34:05.552 INFO    registry/server.go:42   start the registry server (0.0.0.0:6786)

启动一个 normal 节点,如果你觉得性能不够,增加节点即可

% bin/connmix normal -f conf/connmix.yaml

________/\\\\\\\\\___________________________________________________________________________________        
 _____/\\\////////____________________________________________________________________________________       
  ___/\\\/__________________________________________________________________________/\\\_______________      
   __/\\\_________________/\\\\\_____/\\/\\\\\\____/\\/\\\\\\______/\\\\\__/\\\\\___\///___/\\\____/\\\_     
    _\/\\\_______________/\\\///\\\__\/\\\////\\\__\/\\\////\\\___/\\\///\\\\\///\\\__/\\\_\///\\\/\\\/__    
     _\//\\\_____________/\\\__\//\\\_\/\\\__\//\\\_\/\\\__\//\\\_\/\\\_\//\\\__\/\\\_\/\\\___\///\\\/____   
      __\///\\\__________\//\\\__/\\\__\/\\\___\/\\\_\/\\\___\/\\\_\/\\\__\/\\\__\/\\\_\/\\\____/\\\/\\\___  
       ____\////\\\\\\\\\__\///\\\\\/___\/\\\___\/\\\_\/\\\___\/\\\_\/\\\__\/\\\__\/\\\_\/\\\__/\\\/\///\\\_ 
        _______\/////////_____\/////_____\///____\///__\///____\///__\///___\///___\///__\///__\///____\///__

        connmix0.0.0-alpha, go1.17.5, lua5.1+bit64, darwin, arm64

2021-12-20 13:35:17.592 INFO    ws/server.go:155        start the control server (websocket) (0.0.0.0:6789)
2021-12-20 13:35:17.592 INFO    mesh/server.go:41       start the mesh node (0.0.0.0:6788)
2021-12-20 13:35:17.594 INFO    protocol/registrycli.go:36      center registry 127.0.0.1:6786 connect successful
2021-12-20 13:35:17.594 INFO    protocol/registrycli.go:75      register normal node
2021-12-20 13:35:17.596 INFO    protocol/servers.go:95  start the protocol server /Users/***/connmix/lua/entry.lua (0.0.0.0:6790)

entry.lua 入口文件代码如下:

  • 代码中我们使用已经封装好的 websocket 协议包 (会开源)
  • 注册了 '/' 路径的 websocket 服务,并在收到消息后,将消息和当前连接的上下文 push 到了 foo 内存队列中
local mix_log = mix.log
local mix_DEBUG = mix.DEBUG
local websocket = require("protocols/websocket")
local queue_name = "foo"

function init()
    mix.queue.new(queue_name, 100)
end

function on_connect(conn)
end

function on_close(err, conn)
    --print(err)
end

function protocol_input(bufcopy, conn)
    return websocket.input(bufcopy, conn, "/")
end

function protocol_decode(str, conn)
    return websocket.decode(str, conn)
end

function protocol_encode(str, conn)
    return websocket.encode(str)
end

function on_message(data, conn)
    s, err = mix.json_encode({ msg = data, ctx = conn:context() })
    if err then
        mix_log(mix_DEBUG, "json_encode error: " .. err)
        return
    end

    n, err = mix.queue.push(queue_name, s)
    if err then
        mix_log(mix_DEBUG, "queue push error: " .. err)
        return
    end
end

接下来我们使用 control server (websocket) (0.0.0.0:6789) 这个端口来消费这个内存队列的数据

  1. 可以使用任意语言连接到 ws://127.0.0.1:6789/ws/v1
  2. 发送消息 {"method":"queue.consume","params":["foo"],"id":1}
  3. 接收到消息 {"error":null,"result":[{"success":true}],"id":1}
  4. protocol server /Users/***/connmix/lua/entry.lua (0.0.0.0:6790) 接收到连接发送的消息时,会收到以下推送:

觉得 headers 很多无用字段,可以自行修改 entry.lua 代码删减

{
  "method":"queue.pop",
  "params":[
    {
      "client_id":"1472844646504726528",
      "queue":"foo",
      "data":{
        "ctx":{
          "headers":{
            "Accept-Encoding":"gzip,
             deflate,
             br",
            "Accept-Language":"zh-CN,
            zh;q=0.9",
            "Cache-Control":"no-cache",
            "Connection":"Upgrade",
            "Host":"127.0.0.1:6790",
            "Method":"GET",
            "Origin":"http://www.connmix.com",
            "Pragma":"no-cache",
            "RequestUri":"/",
            "Sec-WebSocket-Extensions":"permessage-deflate; client_max_window_bits",
            "Sec-WebSocket-Key":"EVgmSmg+tUU0qEcdKgcn5g==",
            "Sec-WebSocket-Version":"13",
            "ServerProtocol":"HTTP/1.1",
            "Upgrade":"websocket",
            "User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML,
             like Gecko) Chrome/96.0.4664.93 Safari/537.36"
          }
        },
        "msg":{
          "data":"hello",
          "finish":true,
          "type":"text"
        }
      }
    }
  ],
  "id":null
}
  1. 接下来我们可以对 msg.data 的消息进行处理,然后进行消息回复
  • mesh.send 可以在网格的任意节点进行发送
{"method":"mesh.send","params":[{"client_id":"1472844646504726528","data":"123456789"}],"id":2}
  1. 我还提供了 PubSub 的方式,能更加方便抽象业务逻辑,并且该 PubSub 是自研0依赖。
  • 给某个连接订阅频道
{"method":"conn.call","params":[{"client_id":"1472844646504726528","method":"subscribe","params":["user_10001"]}],"id":3}
  • mesh.publish 可以在网格的任意节点进行发送
{"method":"mesh.publish","params":[{"channel":"user_10001","data":"test_pubsub"}],"id":4}

以上只是最常用的玩法,还有更多使用方式,几乎可以使用在任何与连接相关的场景。

特点总结

当我们使用 Connmix 编写高并发连接系统时,我们将获得以下收益:

  • 无需面对高并发复杂的连接异常处理
  • 无需处理复杂的分布式扩容缩容,支持千万并发连接只取决于需要多少个节点
  • 无需使用第3方内存数据库,真正的0依赖
  • 协议解析和server业务真正脱离,业务可以使用任何语言作为技术栈
  • 支持任何语言开发,只要有ws客户端的语言都可以立即开发,并且团队可以选择自己擅长的语言一起多语言协作
  • 支持任何协议,用户可以自己编写协议,这对游戏开发、物联网开发来说非常好用
  • 真正的连接不断线代码热更新,重启消费进程用户连接并不中断,这对于游戏开发非常重要
  • 可以通过 mix.queue 的队列名称来实现微服务拆分

授权协议

  • 很遗憾,介于之前 openmix.org 系列开源的经验,connmix 项目将选择闭源授权,不过会和 JDK 中的 OracleJDK 类似,Connmix 本身是闭源的,但是基于 Connmix API 开发的 Lua 协议代码和 Connmix 各个语言的 SDK 是开源的。
  • 闭源会采取 vCPU 授权,当然这个价格会非常便宜,就和你买个腾讯、优酷视频会员类似的价格,通过会员的收入来维持 Connmix 的可持续发展,反哺使用的企业。

个人公众号