简介:本文主要演示通过规则引擎将设备上行消息流转到函数计算,并通过函数计算发送消息到钉钉机器人。作者:俏巴
概述
使用物联网平台规则引擎的数据流转功能,可将Topic中的数据消息转发至其他Topic或其他阿里云产品进行存储或处理。本文主要演示通过规则引擎将设备上行消息流转到函数计算,并通过函数计算发送消息到钉钉机器人。
Step By Step
产品及设备准备
1、创建产品
2、定义物模型
3、添加设备
4、使用SDK 上行消息,参考链接:基于开源JAVA MQTT Client连接阿里云IoT
import com.alibaba.taro.AliyunIoTSignUtil;import com.google.common.util.concurrent.ThreadFactoryBuilder;import org.eclipse.paho.client.mqttv3.*;import
org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ScheduledExecutorService;import
java.util.concurrent.ScheduledThreadPoolExecutor;import java.util.concurrent.TimeUnit;
publicclass IoTDemoPubSubDemo {
// 设备三元组信息
public
static
String
productKey =
"a16MX********"
;public
static
String
deviceName =
"device1"
;public
static
String
deviceSecret =
"YGLHxUr40E1JaWhk3IVAm0uk********"
;public
static
String
regionId =
"cn-shanghai"
;
// 物模型-属性上报topic
private
static
String
pubTopic =
"/sys/"
+ productKey +
"/"
+ deviceName +
"/thing/event/property/post"
;
// 自定义topic,在产品Topic列表位置定义
private
static
String
subTopic =
"/sys/"
+ productKey +
"/"
+ deviceName +
"/thing/event/property/post_reply"
;private
static
MqttClient mqttClient;public
static
void
main(
String
[] args){initAliyunIoTClient();ScheduledExecutorService scheduledThreadPool =
new
ScheduledThreadPoolExecutor(
1
,
new
ThreadFactoryBuilder().setNameFormat(
"thread-runner-%d"
).build());scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(),
10
,
5
, TimeUnit.SECONDS);
try
{mqttClient.subscribe(subTopic);
// 订阅Topic
}
catch
(MqttException e) {System.out.println(
"error:"
+ e.getMessage());e.printStackTrace();}
// 设置订阅监听
mqttClient.setCallback(
new
MqttCallback() {@Overridepublic
void
connectionLost(Throwable throwable) {System.out.println(
"connection Lost"
);}@Overridepublic
void
messageArrived(
String
s, MqttMessage mqttMessage) throws Exception {System.out.println(
"Sub message"
);System.out.println(
"Topic : "
+ s);System.out.println(
new
String
(mqttMessage.getPayload()));
//打印输出消息payLoad
}@Overridepublic
void
deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}});}
/** * 初始化 Client 对象 */
private
static
void
initAliyunIoTClient() {
try
{
// 构造连接需要的参数
String
clientId =
"java"
+ System.currentTimeMillis();
Map
<
String
,
String
> params =
new
HashMap<>(
16
);params.put(
"productKey"
, productKey);params.put(
"deviceName"
, deviceName);params.put(
"clientId"
, clientId);
String
timestamp =
String
.valueOf(System.currentTimeMillis());params.put(
"timestamp"
, timestamp);
// cn-shanghai
String
targetServer =
"tcp://"
+ productKey +
".iot-as-mqtt."
+regionId+
".aliyuncs.com:1883"
;
String
mqttclientId = clientId +
"|securemode=3,signmethod=hmacsha1,timestamp="
+ timestamp +
"|"
;
String
mqttUsername = deviceName +
"&"
+ productKey;
String
mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret,
"hmacsha1"
);connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);}
catch
(Exception e) {System.out.println(
"initAliyunIoTClient error "
+ e.getMessage());}}public
static
void
connectMqtt(
String
url,
String
clientId,
String
mqttUsername,
String
mqttPassword) throws Exception {MemoryPersistence persistence =
new
MemoryPersistence();mqttClient =
new
MqttClient(url, clientId, persistence);MqttConnectOptions connOpts =
new
MqttConnectOptions();
// MQTT 3.1.1
connOpts.setMqttVersion(
4
);connOpts.setAutomaticReconnect(
false
);
// connOpts.setCleanSession(true);connOpts.setCleanSession(
false
);connOpts.setUserName(mqttUsername);connOpts.setPassword(mqttPassword.toCharArray());connOpts.setKeepAliveInterval(
60
);mqttClient.connect(connOpts);}
/** * 汇报属性 */
private
static
void
postDeviceProperties() {
try
{
//上报数据
//高级版 物模型-属性上报payload
System.out.println(
"上报属性值"
);
String
payloadJson =
"{\"params\":{\"CurrentTemperature\":13,\"Humidity\":10}}"
;MqttMessage message =
new
MqttMessage(payloadJson.getBytes(
"utf-8"
));message.setQos(
1
);mqttClient.publish(pubTopic, message);}
catch
(Exception e) {System.out.println(e.getMessage());}}
}
5、运行状态查看
函数计算创建与配置
1、创建应用
2、应用下面添加函数
3、编辑脚本
const https = require('https');
const accessToken = '填写accessToken,即钉钉机器人webhook的accessToken';
module.exports.handler = function(event, context, callback) {
var eventJson = JSON.parse(event.toString());
console.log(event.toString());
//钉钉消息格式
const postData = JSON.stringify({
"msgtype": "markdown",
"markdown": {
"title": "设备温湿度传感器",
"text": "#### 温湿度传感器上报n" +
"> 设备名称:" + eventJson.deviceName+ "nn" +
"> 实时温度:" + eventJson.Temperature + "℃nn" +
"> 相对湿度:" + eventJson.Humidity + "%nn" +
"> ###### " + eventJson.time + " 发布 by 物联网平台 n"
},
"at": {
"isAtAll": false
}
});
const options = {
hostname: 'oapi.dingtalk.com',
port: 443,
path: '/robot/send?access_token=' + accessToken,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData)
}
};
const req = https.request(options, (res) => {
res.setEncoding('utf8');
res.on('data', (chunk) => {});
res.on('end', () => {
callback(null, 'success');
});
});
// 异常返回
req.on('error', (e) => {
callback(e);
});
// 写入数据
req.write(postData);
req.end();
};
钉钉机器人webhook的accessToken获取参考链接:阿里云IoT Studio服务开发定时关灯功能示例Demo: 2.3 钉钉机器人Webhook获取 部分。
4、快速测试
规则引擎配置
1、创建规则引擎
2、配置处理数据
SQL字段
deviceName() as deviceName, items.Humidity.value as Humidity, items.CurrentTemperature.value as Temperature, timestamp('yyyy-MM-dd HH:mm:ss') as time
3、配置转发数据
4、启动设备端SDK,周期性上行消息,钉钉群查看通知
5、上行日志查看
原文:https://developer.aliyun.com/article/753291?spm=5176.8068049.0.0.64b16d19Nleszq&groupCode=iot
如若转载,请注明出处:https://www.ozabc.com/keji/154594.html