技术文章
o2server内置消息处理机制,用于发送各类通知消息,也可以用于进行数据同步和事件发送.
比如流程中的待办消息提醒:
是使用ws(websocket)消息处理器来实现的,钉钉上的待办消息处理是使用dingding(钉钉消息推送)实现的.
内置消息处理由系统自动处理,内置消息处理大多是用来实现消息推送的,一般情况下无需干预.
ws: websocket消息.
pmsinner:移动端推送消息
calendar:推送到日历事件
dingding:钉钉消息推送
welink:welink消息推送
zhengwudingding:政务钉消息推送
qiyeweixin:企业微信消息推送
mpweixin:微信公众号消息推送
外置消息处理类型可以将消息推送到系统外部,需要进行配置和干预以完成特定功能.
kafka:将消息推送到kafka队列
activemq:将消息推送到activemq队列
restful:调用外部的restful请求来发送消息.
mail:将消息发送到指定的邮件地址
api:以restful方式调用o2server服务接口
jdbc:将消息内容insert到指定的数据库表中.
table:将消息内容写入到自建表.
hadoop:将消息内容写入到hadoop dfs
custom_xxxx:自定义消息处理器.
处理器本质上是一个队列,先入先出的处理进入到队列的消息,这些消息来源于o2server事件,我们以流程为例逐一说明目前支持的消息类型.
application_create:创建应用,message.body消息体是json格式的application对象.
application_update:更新应用,message.body消息体是json格式的application对象.
application_delete:删除应用,message.body消息体是json格式的application对象.
process_create:创建流程,message.body消息体是json格式的process对象.
process_update:更新流程,message.body消息体是json格式的process对象.
process_delete:删除流程,message.body消息体是json格式的process对象.
activity_message:有新的工作通过消息节点,message.body消息体是json格式的work对象.
work_to_workCompleted:工作完成转已完成工作,message.body消息体是json格式的work对象.
work_create:创建工作,message.body消息体是json格式的work对象.
work_delete:删除工作,message.body消息体是json格式的work对象.
workCompleted_create:创建已完成工作,message.body消息体是json格式的work对象.
workCompleted_delete:删除已完成工作,message.body消息体是json格式的work对象.
task_to_taskCompleted:待办完成转已办,message.body消息体是json格式的taskCompleted对象.
task_create:创建待办,message.body消息体是json格式的task对象.
task_delete:删除待办,message.body消息体是json格式的task对象.
task_urge:待办即将过期催办,message.body消息体是json格式的task对象.
task_expire:待办过期,message.body消息体是json格式的task对象.
task_press:待办提醒,message.body消息体是json格式的task对象.
taskCompleted_create:已办创建,message.body消息体是json格式的taskCompleted对象.
taskCompleted_delete:删除已办,message.body消息体是json格式的taskCompleted对象.
read_to_readCompleted:待阅转已阅,message.body消息体是json格式的read对象.
read_create:待阅创建,message.body消息体是json格式的read对象.
read_delete:待阅删除,message.body消息体是json格式的read对象.
readCompleted_create:已阅创建,message.body消息体是json格式的readCompleted对象.
readCompleted_delete:已阅删除,message.body消息体是json格式的readCompleted对象.
review_create:创建参阅,message.body消息体是json格式的review对象.
o2server可以通过配置来设置使用特定的消息处理器来处理特定的消息.
config/messages.json
默认情况下config目录下没有messages.json文件,系统使用默认的配置来处理消息.系统提供配置样例在configSample目录下,需要手工将默认配置文件拷贝到config目录下进行配置.修改messages.json需要重启服务器使之生效.
configSample/messages.json
后续版本将删除consumersV2配置项.
我们以对待办进行提醒进行配置项说明,默认情况下系统会通过websocket进行待办消息提醒,如果接入移动端比如(微信,钉钉等)也会自动进行待办消息提醒.只是因为默认情况下使用的配置项如下:
{ "task_create": { "description": "创建待办", "consumersV2": { "calendar": "", "activemq": "", "pmsinner": "", "restful": "", "mail": "", "jdbc": "", "mpweixin": "", "dingding": "", "qiyeweixin": "", "kafka": "", "zhengwudingding": "", "welink": "", "api": "", "hadoop": "", "ws": "", "table": "" }, "consumers": [ { "type": "ws", "enable": true, "loader": "", "filter": "" }, { "type": "pmsinner", "enable": true, "loader": "", "filter": "" }, { "type": "calendar", "enable": false, "loader": "", "filter": "" }, { "type": "dingding", "enable": true, "loader": "", "filter": "" }, { "type": "welink", "enable": true, "loader": "", "filter": "" }, { "type": "zhengwudingding", "enable": true, "loader": "", "filter": "" }, { "type": "qiyeweixin", "enable": true, "loader": "", "filter": "" }, { "type": "mpweixin", "enable": true, "loader": "", "filter": "" }, { "bootstrapServers": "", "topic": "o2oa", "securityProtocol": "SASL_PLAINTEXT", "saslMechanism": "PLAIN", "username": "", "password": "", "type": "kafka", "enable": false, "loader": "", "filter": "" }, { "username": "", "password": "", "url": "", "queueName": "", "type": "activemq", "enable": false, "loader": "", "filter": "" }, { "url": "", "method": "get", "type": "restful", "enable": false, "loader": "", "filter": "" }, { "host": "", "port": 465.0, "sslEnable": true, "auth": true, "from": "admin@o2oa.net", "password": "password", "type": "mail", "enable": false, "loader": "", "filter": "" }, { "application": "", "path": "", "method": "get", "type": "api", "enable": false, "loader": "", "filter": "" }, { "driverClass": "com.mysql.cj.jdbc.Driver", "url": "jdbc:mysql://", "username": "root", "password": "password", "catalog": "", "schema": "", "table": "NEWTABLE", "type": "jdbc", "enable": false, "loader": "", "filter": "" }, { "table": "", "type": "table", "enable": false, "loader": "", "filter": "" }, { "fsDefaultFS": "hdfs://", "username": "", "path": "", "type": "hadoop", "enable": false, "loader": "", "filter": "" } ] }, }
默认情况下ws,pmsinner,dingding,welink,zhengwudingding,qiyeweixin,mpweixin,是启用的(enable= true),这里我对配置项进行说明:
{ "task_create": { "description": "创建待办", "consumers": [ { "type": "ws", "enable": true, "loader": "", "filter": "" }, { "url": "", "method": "get", "type": "restful", "enable": false, "loader": "", "filter": "" } ] }, }
messages.json是一个json格式对象类型配置文件
task_create:消息类型
description:说明
consumers:使用的消息处理器,这里是一个数组类型,可以使用多个消息处理器,同一类型的消息处理器也可以反复使用.
{ "type": "ws", "enable": true, "loader": "", "filter": "" }
ws是一个内置的消息处理器,配置项比较简单
type:类型说明这个消息处理器的类型,ws代表websocket消息.
enable:是否启用
loader:加载器,加载器是一个javascript脚本文件的文件名,通过加载器可以通过程序对消息体进行修改,可以实现对消息内容的修改和定制.
filter:过滤器,过滤器是一个javascript脚本文件的文件名,通过过滤器可以通过程序来判断是否要过滤掉消息.
{ "url": "", "method": "get", "type": "restful", "enable": false, "loader": "", "filter": "" }
restful是一个外置消息处理,使用该类型消息处理器将在收到消息后调用外部的restful请求发送消息.
type:类型说明这个消息处理器的类型,restful代表调用外部的restful服务.
enable:是否启用
loader:加载器,加载器是一个javascript脚本文件的文件名,通过加载器可以通过程序对消息体进行修改,可以实现对消息内容的修改和定制.
filter:过滤器,过滤器是一个javascript脚本文件的文件名,通过过滤器可以通过程序来判断是否要过滤掉消息.
url:http服务地址.
method:调用http请求类型,get,post,put,delete.
加载器配置的是一个javascript脚本文件地址比如,为空则不调用脚本.
"loader":"message_ws_loader_taskCreate.js"
系统读到这个配置将会从o2server的程序目录下的config/message_ws_loader_taskCreate.js文件装载脚本.
下面给出一个样例说明如何修改消息内容:秘密文件将通知消息的标题进行模糊处理.
/* *message 对象是消息体,有脚本执行上下文环境环境自动注入,其中有四个字段 message.title: 标题 message.person: 发送对象 message.type: 消息类型,task_create message.body: 消息体,类型是task_create的消息中消息体是json格式存储的task(待办)数据 return 返回的message对象 */ if ((message.body.secret == '秘密') && (essage.title.length > 4)) { message.title = message.title.substring(0, 4) + '......'; } return message;
过滤器配置的是一个javascript脚本文件地址比如,为空则不调用脚本.
"loader":"message_ws_filter_taskCreate.js"
系统读到这个配置将会从o2server的程序目录下的config/message_ws_filter_taskCreate.js文件装载脚本.
下面给出一个样例说明如何过滤消息内容:如果通知人是张三则不发送消息.
/* *message 对象是消息体,有脚本执行上下文环境环境自动注入,其中有四个字段 message.title: 标题 message.person: 发送对象 message.type: 消息类型,task_create message.body: 消息体,类型是task_create的消息中消息体是json格式存储的task(待办)数据 return true 标识此消息要处理,false表示忽略此消息 */ return message.person != '张三@zhangsan@p';
{ "bootstrapServers": "", "topic": "o2oa", "securityProtocol": "SASL_PLAINTEXT", "saslMechanism": "PLAIN", "username": "", "password": "", "type": "kafka", "enable": false, "loader": "", "filter": "" }
bootstrapServers:kafka服务地址
topic:队列名称
securityProtocol:认证协议
saslMechanism:加密机制
username:用户名
password:密码
type:kafka
enable:是否启用
loader:装载器
filter:过滤器
{ "username": "", "password": "", "url": "", "queueName": "", "type": "activemq", "enable": false, "loader": "", "filter": "" }
url:服务地址
queueName:队列名称
username:用户名
password:密码
type:activemq
enable:是否启用
loader:装载器
filter:过滤器
{ "url": "", "method": "get", "type": "restful", "enable": false, "loader": "", "filter": "" }
url:http请求地址
method:http请求方法
type:restful
enable:是否启用
loader:装载器
filter:过滤器
{ "host": "", "port": 465.0, "sslEnable": true, "auth": true, "from": "admin@o2oa.net", "password": "password", "type": "mail", "enable": false, "loader": "", "filter": "" }
host:smtp服务器地址
port:smtp服务器端口
sslEnable:使用ssl加密
auth:启用认证
from:邮件发送者
password:邮件发送者smtp密码
type:mail
enable:是否启用
loader:装载器
filter:过滤器
{ "application": "", "path": "", "method": "get", "type": "api", "enable": false, "loader": "", "filter": "" }
application:应用标识
path:接口路径
method:restful方法get,post,delete,put
type:api
enable:是否启用
loader:装载器
filter:过滤器
{ "driverClass": "com.mysql.cj.jdbc.Driver", "url": "jdbc:mysql://", "username": "root", "password": "password", "catalog": "", "schema": "", "table": "NEWTABLE", "type": "jdbc", "enable": false, "loader": "", "filter": "" }
driverClass:jdbc驱动类
url:jdbc服务器地址
username:数据库用户名
password:数据库密码
catalog:数据库编目
schema:数据库模式
table:数据库表
type:jdbc
enable:是否启用
loader:装载器
filter:过滤器
{ "table": "", "type": "table", "enable": false, "loader": "", "filter": "" }
table:自建表标识
type:table
enable:是否启用
loader:装载器
filter:过滤器
{ "fsDefaultFS": "hdfs://", "username": "", "path": "", "type": "hadoop", "enable": false, "loader": "", "filter": "" }
fsDefaultFS:hadoop dfs 服务地址
username:链接hadoop系统用户名,需要事先完成ssh免密设置.
path:hadoop dfs 存储目录路径
type:hadoop
enable:是否启用
loader:装载器
filter:过滤器
{ "custom_message_type":[ { "type": "custom_", "enable": false, "loader": "", "filter": "" } ] }
可以通过上面的配置样例进行自定义消息和自定义消息处理器的配置,自定义消息系统不会进行任何处理,接收到自定义消息后系统将消息存放在数据库中等待处理.
可以通过下面的接口进行获取:
http://127.0.0.1:20020/x_message_assemble_communicate/jaxrs/consume/list/{consume}/count/{count}
消费完成后需要调用接口回写消费完成标志.
http://127.0.0.1:20020/x_message_assemble_communicate/jaxrs/consume/{id}/type/{type}
消息处理器在处理消息过程中发生错误,那么本次处理失败,消息处理器在失败的情况下记录错误信息,并且在后续将自动尝试重试.
消息数据存储在MSG_MESSAGE数据库表中,定时器定时清除过期的数据以避免表存储过多数据,默认情况下message数据保存期限是7天.