RuleManager.java 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. /*
  2. * +----------------------------------------------------------------------
  3. * | Copyright (c) 奇特物联 2021-2022 All rights reserved.
  4. * +----------------------------------------------------------------------
  5. * | Licensed 未经许可不能去掉「奇特物联」相关版权
  6. * +----------------------------------------------------------------------
  7. * | Author: xw2sy@163.com
  8. * +----------------------------------------------------------------------
  9. */
  10. package cc.iotkit.ruleengine.rule;
  11. import cc.iotkit.common.api.PageRequest;
  12. import cc.iotkit.common.api.Paging;
  13. import cc.iotkit.common.utils.JsonUtils;
  14. import cc.iotkit.common.utils.StringUtils;
  15. import cc.iotkit.data.manager.*;
  16. import cc.iotkit.message.model.Message;
  17. import cc.iotkit.message.service.MessageService;
  18. import cc.iotkit.model.alert.AlertConfig;
  19. import cc.iotkit.model.notify.Channel;
  20. import cc.iotkit.model.notify.ChannelConfig;
  21. import cc.iotkit.model.notify.ChannelTemplate;
  22. import cc.iotkit.model.rule.FilterConfig;
  23. import cc.iotkit.model.rule.RuleAction;
  24. import cc.iotkit.model.rule.RuleInfo;
  25. import cc.iotkit.ruleengine.action.Action;
  26. import cc.iotkit.ruleengine.action.alert.AlertAction;
  27. import cc.iotkit.ruleengine.action.alert.AlertService;
  28. import cc.iotkit.ruleengine.action.device.DeviceAction;
  29. import cc.iotkit.ruleengine.action.device.DeviceActionService;
  30. import cc.iotkit.ruleengine.action.http.HttpAction;
  31. import cc.iotkit.ruleengine.action.http.HttpService;
  32. import cc.iotkit.ruleengine.action.kafka.KafkaAction;
  33. import cc.iotkit.ruleengine.action.kafka.KafkaService;
  34. import cc.iotkit.ruleengine.action.mqtt.MqttAction;
  35. import cc.iotkit.ruleengine.action.mqtt.MqttService;
  36. import cc.iotkit.ruleengine.action.tcp.TcpAction;
  37. import cc.iotkit.ruleengine.action.tcp.TcpService;
  38. import cc.iotkit.ruleengine.config.RuleConfiguration;
  39. import cc.iotkit.ruleengine.filter.DeviceFilter;
  40. import cc.iotkit.ruleengine.filter.Filter;
  41. import cc.iotkit.ruleengine.link.LinkFactory;
  42. import cc.iotkit.ruleengine.listener.DeviceListener;
  43. import cc.iotkit.ruleengine.listener.Listener;
  44. import cn.hutool.core.collection.CollectionUtil;
  45. import lombok.SneakyThrows;
  46. import lombok.extern.slf4j.Slf4j;
  47. import org.springframework.beans.factory.annotation.Autowired;
  48. import org.springframework.beans.factory.annotation.Qualifier;
  49. import org.springframework.stereotype.Component;
  50. import java.util.ArrayList;
  51. import java.util.List;
  52. import java.util.concurrent.Executors;
  53. import java.util.concurrent.ScheduledExecutorService;
  54. import java.util.concurrent.TimeUnit;
  55. @Component
  56. @Slf4j
  57. public class RuleManager {
  58. @Autowired
  59. private RuleConfiguration ruleConfiguration;
  60. @Autowired
  61. private RuleMessageHandler ruleMessageHandler;
  62. @Autowired
  63. private IRuleInfoData ruleInfoData;
  64. @Autowired
  65. @Qualifier("deviceInfoPropertyDataCache")
  66. private IDeviceInfoData deviceInfoData;
  67. @Autowired
  68. private DeviceActionService deviceActionService;
  69. @Autowired
  70. private IAlertConfigData alertConfigData;
  71. @Autowired
  72. private IChannelTemplateData channelTemplateData;
  73. @Autowired
  74. private IChannelConfigData channelConfigData;
  75. @Autowired
  76. private IChannelData channelData;
  77. @Autowired
  78. private MessageService messageService;
  79. public RuleManager() {
  80. ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
  81. executorService.schedule(this::initRules, 1, TimeUnit.SECONDS);
  82. }
  83. @SneakyThrows
  84. public void initRules() {
  85. int idx = 1;
  86. while (true) {
  87. PageRequest<RuleInfo> pageRequest = new PageRequest<>();
  88. pageRequest.setPageNum(idx);
  89. pageRequest.setPageSize(100);
  90. Paging<RuleInfo> all = ruleInfoData.findAll(pageRequest);
  91. List<RuleInfo> rules = all.getRows();
  92. if (CollectionUtil.isEmpty(rules)) {
  93. return;
  94. }
  95. rules.forEach(rule -> {
  96. try {
  97. //不添加停止的规则
  98. if (RuleInfo.STATE_STOPPED.equals(rule.getState())) {
  99. return;
  100. }
  101. log.info("got rule {} to init", rule.getId());
  102. add(rule);
  103. } catch (Throwable e) {
  104. log.error("add rule error", e);
  105. }
  106. });
  107. idx++;
  108. }
  109. }
  110. public void add(RuleInfo ruleInfo) {
  111. Rule rule = parseRule(ruleInfo);
  112. ruleMessageHandler.putRule(rule);
  113. }
  114. public void remove(String ruleId) {
  115. ruleMessageHandler.removeRule(ruleId);
  116. // 移出link连接
  117. LinkFactory.ruleClose(ruleId);
  118. }
  119. public void pause(String ruleId) {
  120. remove(ruleId);
  121. }
  122. public void resume(RuleInfo ruleInfo) {
  123. add(ruleInfo);
  124. }
  125. private Rule parseRule(RuleInfo ruleInfo) {
  126. List<Listener<?>> listeners = new ArrayList<>();
  127. for (FilterConfig listener : ruleInfo.getListeners()) {
  128. if (StringUtils.isBlank(listener.getConfig())) {
  129. continue;
  130. }
  131. listeners.add(parseListener(listener.getType(), listener.getConfig()));
  132. }
  133. List<Filter<?>> filters = new ArrayList<>();
  134. for (FilterConfig filter : ruleInfo.getFilters()) {
  135. if (StringUtils.isBlank(filter.getConfig())) {
  136. continue;
  137. }
  138. filters.add(parseFilter(filter.getType(), filter.getConfig()));
  139. }
  140. List<Action<?>> actions = new ArrayList<>();
  141. for (RuleAction action : ruleInfo.getActions()) {
  142. if (StringUtils.isBlank(action.getConfig())) {
  143. continue;
  144. }
  145. actions.add(parseAction(ruleInfo.getId(), action.getType(), action.getConfig()));
  146. }
  147. return new Rule(ruleInfo.getId(), ruleInfo.getName(), listeners, filters, actions);
  148. }
  149. private Listener<?> parseListener(String type, String config) {
  150. if (DeviceListener.TYPE.equals(type)) {
  151. return parse(config, DeviceListener.class);
  152. }
  153. return null;
  154. }
  155. private Filter<?> parseFilter(String type, String config) {
  156. if (DeviceFilter.TYPE.equals(type)) {
  157. DeviceFilter filter = parse(config, DeviceFilter.class);
  158. filter.setDeviceInfoData(deviceInfoData);
  159. return filter;
  160. }
  161. return null;
  162. }
  163. private Action<?> parseAction(String ruleId, String type, String config) {
  164. if (DeviceAction.TYPE.equals(type)) {
  165. DeviceAction action = parse(config, DeviceAction.class);
  166. action.setDeviceActionService(deviceActionService);
  167. return action;
  168. } else if (HttpAction.TYPE.equals(type)) {
  169. HttpAction httpAction = parse(config, HttpAction.class);
  170. for (HttpService service : httpAction.getServices()) {
  171. service.setDeviceInfoData(deviceInfoData);
  172. }
  173. return httpAction;
  174. } else if (MqttAction.TYPE.equals(type)) {
  175. MqttAction mqttAction = parse(config, MqttAction.class);
  176. for (MqttService service : mqttAction.getServices()) {
  177. service.setDeviceInfoData(deviceInfoData);
  178. service.initLink(ruleId);
  179. }
  180. return mqttAction;
  181. } else if (KafkaAction.TYPE.equals(type)) {
  182. KafkaAction kafkaAction = parse(config, KafkaAction.class);
  183. for (KafkaService service : kafkaAction.getServices()) {
  184. service.setDeviceInfoData(deviceInfoData);
  185. service.initLink(ruleId);
  186. }
  187. return kafkaAction;
  188. } else if (TcpAction.TYPE.equals(type)) {
  189. TcpAction tcpAction = parse(config, TcpAction.class);
  190. for (TcpService service : tcpAction.getServices()) {
  191. service.setDeviceInfoData(deviceInfoData);
  192. service.initLink(ruleId);
  193. }
  194. return tcpAction;
  195. } else if (AlertAction.TYPE.equals(type)) {
  196. List<AlertConfig> alertConfigs = alertConfigData.findAllByCondition(AlertConfig.builder()
  197. .ruleInfoId(ruleId)
  198. .build());
  199. AlertAction alertAction = parse(config, AlertAction.class);
  200. String script = alertAction.getServices().get(0).getScript();
  201. List<AlertService> alertServices = new ArrayList<>();
  202. for (AlertConfig alertConfig : alertConfigs) {
  203. if (alertConfig.getEnable() != null && !alertConfig.getEnable()) {
  204. continue;
  205. }
  206. AlertService service = new AlertService();
  207. service.setScript(script);
  208. service.setDeviceInfoData(deviceInfoData);
  209. service.setMessageService(messageService);
  210. ChannelTemplate channelTemplate = channelTemplateData.findById(Long.parseLong(alertConfig.getMessageTemplateId()));
  211. Long channelConfigId = channelTemplate.getChannelConfigId();
  212. Message message = Message.builder()
  213. .content(channelTemplate.getContent())
  214. .alertConfigId(alertConfig.getId())
  215. .build();
  216. if (channelConfigId != null) {
  217. ChannelConfig channelConfig = channelConfigData.findById(channelTemplate.getChannelConfigId());
  218. Channel channel = channelData.findById(channelConfig.getChannelId());
  219. message.setChannel(channel.getCode());
  220. message.setChannelId(channel.getId());
  221. message.setChannelConfig(channelConfig.getParam());
  222. }
  223. service.setMessage(message);
  224. alertServices.add(service);
  225. }
  226. alertAction.setServices(alertServices);
  227. return alertAction;
  228. }
  229. return null;
  230. }
  231. private <T> T parse(String config, Class<T> cls) {
  232. return JsonUtils.parseObject(config, cls);
  233. }
  234. }