Hi!请登陆

阿里云物联网平台数据转发到函数计算示例

2020-12-1 32 12/1

简介:本文主要演示通过规则引擎将设备上行消息流转到函数计算,并通过函数计算发送消息到钉钉机器人。作者:俏巴

概述

使用物联网平台规则引擎的数据流转功能,可将Topic中的数据消息转发至其他Topic或其他阿里云产品进行存储或处理。本文主要演示通过规则引擎将设备上行消息流转到函数计算,并通过函数计算发送消息到钉钉机器人。

Step By Step

产品及设备准备

1、创建产品

2、定义物模型

3、添加设备

4、使用SDK 上行消息,参考链接:基于开源JAVA MQTT Client连接阿里云IoT

import com.alibaba.taro.AliyunIoTSignUtil;import com.google.common.util.concurrent.ThreadFactoryBuilder;import org.eclipse.paho.client.mqttv3.*;import
org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ScheduledExecutorService;import
java.util.concurrent.ScheduledThreadPoolExecutor;import java.util.concurrent.TimeUnit;

publicclass IoTDemoPubSubDemo {

// 设备三元组信息

public

static

String

productKey =

"a16MX********"

;public

static

String

deviceName =

"device1"

;public

static

String

deviceSecret =

"YGLHxUr40E1JaWhk3IVAm0uk********"

;public

static

String

regionId =

"cn-shanghai"

;

// 物模型-属性上报topic

private

static

String

pubTopic =

"/sys/"

+ productKey +

"/"

+ deviceName +

"/thing/event/property/post"

;

// 自定义topic,在产品Topic列表位置定义

private

static

String

subTopic =

"/sys/"

+ productKey +

"/"

+ deviceName +

"/thing/event/property/post_reply"

;private

static

MqttClient mqttClient;public

static

void

main(

String

[] args){initAliyunIoTClient();ScheduledExecutorService scheduledThreadPool =

new

ScheduledThreadPoolExecutor(

1

,

new

ThreadFactoryBuilder().setNameFormat(

"thread-runner-%d"

).build());scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(),

10

,

5

, TimeUnit.SECONDS);

try

{mqttClient.subscribe(subTopic);

// 订阅Topic

}

catch

(MqttException e) {System.out.println(

"error:"

+ e.getMessage());e.printStackTrace();}

// 设置订阅监听

mqttClient.setCallback(

new

MqttCallback() {@Overridepublic

void

connectionLost(Throwable throwable) {System.out.println(

"connection Lost"

);}@Overridepublic

void

messageArrived(

String

s, MqttMessage mqttMessage) throws Exception {System.out.println(

"Sub message"

);System.out.println(

"Topic : "

+ s);System.out.println(

new

String

(mqttMessage.getPayload()));

//打印输出消息payLoad

}@Overridepublic

void

deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}});}

/** * 初始化 Client 对象 */

private

static

void

initAliyunIoTClient() {

try

{

// 构造连接需要的参数

String

clientId =

"java"

+ System.currentTimeMillis();

Map

<

String

,

String

> params =

new

HashMap<>(

16

);params.put(

"productKey"

, productKey);params.put(

"deviceName"

, deviceName);params.put(

"clientId"

, clientId);

String

timestamp =

String

.valueOf(System.currentTimeMillis());params.put(

"timestamp"

, timestamp);

// cn-shanghai

String

targetServer =

"tcp://"

+ productKey +

".iot-as-mqtt."

+regionId+

".aliyuncs.com:1883"

;

String

mqttclientId = clientId +

"|securemode=3,signmethod=hmacsha1,timestamp="

+ timestamp +

"|"

;

String

mqttUsername = deviceName +

"&"

+ productKey;

String

mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret,

"hmacsha1"

);connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);}

catch

(Exception e) {System.out.println(

"initAliyunIoTClient error "

+ e.getMessage());}}public

static

void

connectMqtt(

String

url,

String

clientId,

String

mqttUsername,

String

mqttPassword) throws Exception {MemoryPersistence persistence =

new

MemoryPersistence();mqttClient =

new

MqttClient(url, clientId, persistence);MqttConnectOptions connOpts =

new

MqttConnectOptions();

// MQTT 3.1.1

connOpts.setMqttVersion(

4

);connOpts.setAutomaticReconnect(

false

);

// connOpts.setCleanSession(true);connOpts.setCleanSession(

false

);connOpts.setUserName(mqttUsername);connOpts.setPassword(mqttPassword.toCharArray());connOpts.setKeepAliveInterval(

60

);mqttClient.connect(connOpts);}

/** * 汇报属性 */

private

static

void

postDeviceProperties() {

try

{

//上报数据

//高级版 物模型-属性上报payload

System.out.println(

"上报属性值"

);

String

payloadJson =

"{\"params\":{\"CurrentTemperature\":13,\"Humidity\":10}}"

;MqttMessage message =

new

MqttMessage(payloadJson.getBytes(

"utf-8"

));message.setQos(

1

);mqttClient.publish(pubTopic, message);}

catch

(Exception e) {System.out.println(e.getMessage());}}

}

5、运行状态查看

函数计算创建与配置

1、创建应用

2、应用下面添加函数

3、编辑脚本

const https = require('https');

const accessToken = '填写accessToken,即钉钉机器人webhook的accessToken';

module.exports.handler = function(event, context, callback) {

var eventJson = JSON.parse(event.toString());

console.log(event.toString());

//钉钉消息格式

const postData = JSON.stringify({

"msgtype": "markdown",

"markdown": {

"title": "设备温湿度传感器",

"text": "#### 温湿度传感器上报n" +

"> 设备名称:" + eventJson.deviceName+ "nn" +

"> 实时温度:" + eventJson.Temperature + "℃nn" +

"> 相对湿度:" + eventJson.Humidity + "%nn" +

"> ###### " + eventJson.time + " 发布 by 物联网平台 n"

},

"at": {

"isAtAll": false

}

});

const options = {

hostname: 'oapi.dingtalk.com',

port: 443,

path: '/robot/send?access_token=' + accessToken,

method: 'POST',

headers: {

'Content-Type': 'application/json',

'Content-Length': Buffer.byteLength(postData)

}

};

const req = https.request(options, (res) => {

res.setEncoding('utf8');

res.on('data', (chunk) => {});

res.on('end', () => {

callback(null, 'success');

});

});

// 异常返回

req.on('error', (e) => {

callback(e);

});

// 写入数据

req.write(postData);

req.end();

};

钉钉机器人webhook的accessToken获取参考链接:阿里云IoT Studio服务开发定时关灯功能示例Demo: 2.3 钉钉机器人Webhook获取 部分。

4、快速测试

规则引擎配置

1、创建规则引擎

2、配置处理数据

SQL字段

deviceName() as deviceName, items.Humidity.value as Humidity, items.CurrentTemperature.value as Temperature, timestamp('yyyy-MM-dd HH:mm:ss') as time

3、配置转发数据

4、启动设备端SDK,周期性上行消息,钉钉群查看通知

5、上行日志查看

原文:https://developer.aliyun.com/article/753291?spm=5176.8068049.0.0.64b16d19Nleszq&groupCode=iot

相关推荐