kafka
生产者
<dependency>
<groupId>com.manatee.manatee-lowcode</groupId>
<artifactId>lowcode-kafka-producer</artifactId>
<version>${lowcode.version}</version>
</dependency>
消费者
<dependency>
<groupId>com.manatee.manatee-lowcode</groupId>
<artifactId>lowcode-kafka-consumer</artifactId>
<version>${lowcode.version}</version>
</dependency>
<dependency>
<groupId>com.manatee.manatee-lowcode</groupId>
<artifactId>lowcode-thread</artifactId>
<version>${lowcode.version}</version>
</dependency>
配置
manatee:
kafka:
topics: manatee-topic //指定监听 topic ,多个 topic 逗号分隔
bootstrap-servers: xx.xxx.xx.xx:9092
consumer:
auto-offset-reset: earliest
group-id: xxxx
初始化消息分发架构模块
在项目数据库执行以下 sql,已初始化过则忽略此步骤
CREATE TABLE `lowcode_msg_distribute` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(45) DEFAULT NULL,
`type` varchar(45) DEFAULT NULL,
`sub_type` varchar(45) DEFAULT NULL,
`filter` varchar(1024) DEFAULT NULL,
`project_id` bigint(10) DEFAULT NULL,
`code` varchar(45) DEFAULT NULL,
`valid` tinyint(4) DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `lowcode_base_module` (`id`, `module_code`, `module_name`, `module_description`, `module_version`, `env`, `package_id`, `project_id`, `module_type`, `request_mode`, `is_login`, `process_conf`, `valid`, `gmt_create`, `gmt_modified`, `params`, `mock`, `chart_url`, `interrupt`, `create_user`, `modified_user`, `system_version`, `conf`, `sort`) VALUES (954, 'msgDistribute', '进行消息分发', '对异步消息,如kafaka、rocketMq等消息分发', '1.0', 'daily', 256, 0, 0, 'POST', 0, '{\"steps\":[{\"lowcodeProjectId\":0,\"lowcodeModule\":\"msgRepeatedFiltering\",\"name\":\"moduleProcess\",\"id\":\"736ddbc5-f8c7-4ddf-8748-da5d1cc5cd48\"},{\"selectOne\":false,\"valueTo\":\"response\",\"queryParams\":\"type = _type;\\nsub_type = _subType;\\nvalid = 0;\",\"name\":\"mysqlQuery\",\"id\":\"954a4be7-22e0-4036-a869-c5a85a7f39e0\",\"returnKey\":\"distributeList\",\"table\":\"lowcode_msg_distribute\"},{\"name\":\"loopProcess\",\"id\":\"222647af-e695-41c1-8813-23aa974300d3\",\"paramsKey\":\"distributeList\",\"steps\":[{\"name\":\"ifProcess\",\"id\":\"9f19f556-66f2-4578-af9d-8978d123bfd4\",\"config\":[{\"expression\":\"filter is not blank\",\"id\":\"7e7ed23b-45ed-4a62-9776-9b5bd36ef349\",\"steps\":[{\"valueTo\":\"response\",\"method\":\"analysis2ManateeExp\",\"service\":\"manateeCodeManager\",\"name\":\"runMethodProcess\",\"id\":\"c8a78beb-86a8-437c-a12b-18fc239222f6\",\"params\":\"param = params;\\nexp = filter\",\"returnKey\":\"filterResult\",\"paramsType\":\"param = java.util.Map;\\nexp = java.lang.String\"},{\"expression\":\"filterResult._result==false\",\"name\":\"breakProcess\",\"id\":\"5dc73d14-f0fe-4622-b729-0641eb7a9aa4\"},{\"valueTo\":\"response\",\"handleType\":\"single\",\"name\":\"setData\",\"dataHandle\":\"projectId=projectId;\\ncode=code;\",\"id\":\"386fd9dc-920d-4eb0-b0fa-31bf011a26d6\"},{\"lowcodeProjectId\":0,\"lowcodeModule\":\"msgHandle\",\"name\":\"moduleProcess\",\"id\":\"347a8718-3874-4075-9bf8-262a0f17657f\"}]},{\"expression\":\"else\",\"id\":\"dffe6d5b-6ae0-46a7-a551-9aca799618cf\",\"steps\":[{\"valueTo\":\"response\",\"handleType\":\"single\",\"name\":\"setData\",\"dataHandle\":\"projectId=projectId;\\ncode=code;\",\"id\":\"2c395410-64b5-4a4b-90ab-984f38a3f5fb\"},{\"lowcodeProjectId\":0,\"lowcodeModule\":\"msgHandle\",\"name\":\"moduleProcess\",\"id\":\"0dc891a7-aba4-49aa-adcd-8f84d9878266\"}]}]}]}]}', 0, '2021-12-31 11:09:59', '2023-05-09 10:19:43', '{\"get\":[{\"VALUE\":\"\",\"DESC\":\"\",\"key\":\"0.5340739310066451\"}],\"post\":{\"_type\":\"trade\",\"_subType\":\"100\",\"_uniqueId1\":\"1001KEY\",\"params\":{\"a\":1,\"b\":2}},\"paramVerify\":{\"_type\":{\"required\":false},\"_subType\":{\"required\":false},\"_uniqueId1\":{\"required\":false},\"params\":{\"required\":false}},\"host\":\"http://127.0.0.1:8104\"}', '{}', '/biz/image/0-msgDistribute.png?time=1683598783218', NULL, 58, 58, '2.1', NULL, 1);
INSERT INTO `lowcode_base_module` (`id`, `module_code`, `module_name`, `module_description`, `module_version`, `env`, `package_id`, `project_id`, `module_type`, `request_mode`, `is_login`, `process_conf`, `valid`, `gmt_create`, `gmt_modified`, `params`, `mock`, `chart_url`, `interrupt`, `create_user`, `modified_user`, `system_version`, `conf`, `sort`) VALUES (959, 'msgHandle', '消息处理', '消息处理', '1.0', 'daily', 257, 0, 0, 'POST', 0, '{\"steps\":[{\"name\":\"asyProcess\",\"id\":\"8ef50166-db76-47a7-a00d-34de3d30da7d\",\"title\":\"异步\",\"steps\":[{\"valueTo\":\"response\",\"method\":\"executionModule\",\"service\":\"modularityManager\",\"name\":\"runMethodProcess\",\"id\":\"f55818d1-927a-4445-8f6e-6519a8348eee\",\"title\":\"调用流程图处理\",\"params\":\"projectId = projectId;\\ncode = code;\\nparams = params\",\"paramsType\":\"projectId = java.lang.Long;\\ncode = java.lang.String;\\nparams = java.util.Map\"}]}]}', 0, '2021-12-31 14:35:01', '2022-07-13 17:53:59', '{\"get\":[{\"VALUE\":\"\",\"DESC\":\"\",\"key\":\"0.32894533238186563\"}],\"post\":{\"params\":{\"a\":1,\"b\":2},\"projectId\":109,\"code\":\"strTest\"},\"paramVerify\":{\"params\":{\"required\":false},\"projectId\":{\"required\":false},\"code\":{\"required\":false}},\"host\":\"http://127.0.0.1:8106\"}', '{}', '/biz/image/cec3b9cf809a0391c319b4ac801ffd94.png', NULL, 58, 58, '2.1', NULL, 1);
INSERT INTO `lowcode_base_module` (`id`, `module_code`, `module_name`, `module_description`, `module_version`, `env`, `package_id`, `project_id`, `module_type`, `request_mode`, `is_login`, `process_conf`, `valid`, `gmt_create`, `gmt_modified`, `params`, `mock`, `chart_url`, `interrupt`, `create_user`, `modified_user`, `system_version`, `conf`, `sort`) VALUES (958, 'msgRepeatedFiltering', '消息去重过滤', '对一段时间的消息去重过滤', '1.0', 'daily', 257, 0, 0, 'POST', 0, '{\"steps\":[{\"name\":\"ifProcess\",\"id\":\"bc36e58f-1069-46b4-b5df-bc8512232c93\",\"title\":\"if else\",\"config\":[{\"expression\":\"_uniqueId is not null\",\"id\":\"f133e5fd-7fde-4d49-9924-b15e9d3a2459\",\"steps\":[{\"valueTo\":\"response\",\"handleType\":\"single\",\"name\":\"setData\",\"dataHandle\":\"uniqueIdData = \\\"un:\\\"+ _uniqueId ;\",\"id\":\"3a69e1a2-eac5-4568-b1af-d7c76a0ff7a5\",\"title\":\"生成唯一key\"},{\"valueTo\":\"response\",\"name\":\"cacheProcess\",\"id\":\"eb546090-3dc0-4b5c-99a9-877349c7039e\",\"title\":\"缓存\",\"steps\":[{\"valueTo\":\"response\",\"handleType\":\"single\",\"name\":\"setData\",\"dataHandle\":\"${uniqueIdData} = \\\"1\\\";\\nhaveHandleMsg = false;\",\"id\":\"855454e6-5a5c-4cd6-a404-70bfc7bbfa24\",\"title\":\"缓存记录数据\"}],\"key\":\"${uniqueIdData}\"}]},{\"expression\":\"else\",\"id\":\"15e9c69c-ea06-47e8-bd89-a4e7a06700d2\",\"steps\":[{\"valueTo\":\"response\",\"handleType\":\"single\",\"name\":\"setData\",\"dataHandle\":\"haveHandleMsg = false\",\"id\":\"171e6e3f-8b9a-4389-b9e0-891740f8fa5d\",\"title\":\"不需要过滤\"}]}]},{\"interruptLevel\":3,\"expression\":\"haveHandleMsg is null\",\"resultSubCode\":\"haveHandle\",\"name\":\"throwsException\",\"resultCode\":\"biz_error\",\"id\":\"760373b9-9515-443c-b30e-3300a05e97e6\",\"title\":\"异常\",\"resultMsg\":\"消息已经处理了\"}]}', 0, '2021-12-31 11:33:54', '2022-07-13 17:53:59', '{\"host\":\"http://dev.manateeai.com\"}', '{}', '/biz/image/2a6803fd6399f7ff284814a9dc81aa3e.png', NULL, 58, 58, '2.1', NULL, 1);
流程图中使用 kafka 的消息内容
消息内容是 json
例如消息内容是:{"a":1,"b":2}, 则 a 和 b 会放入流程图的参数上下文中,直接使用即可
消息内容不是 json
例如消息内容是:xxx(非 json), 则该内容会通过 value = xxx 放入流程图的参数上下文中,使用 value 可得到消息内容
将开发好的流程图配置到 lowcode_msg_distribute
表字段名称 | 说明 |
---|---|
id | 不填则默认自增 |
name | 流程图接口的中文名称 |
type | 要消费的 kafka topic |
sub_type | kafka key |
filter | 过滤条件,语法为低代码数据处理语法 |
project_id | 流程图接口的 project_id |
code | 流程图接口的英文名称 |
valid | 0:正常,-1:删除。默认 0 |