功能介绍

应用鉴权是第三方应用程序/服务 调用云鲸AIoT开发者平台服务的一种签名鉴权机制。本文主要介绍如何进行应用鉴权。

前提条件

您已完成创建应用 的步骤。

逻辑图

鉴权加密算法

  1. 对请求参数进行排序并加密

// 使用 TreeMap 对请求参数 自然排序,并转为 Json 字符串,然后使用 SHA256 加密并转为16进制字符串。

// 伪代码↓

HexEncode(Hash.SHA256(payloadJson))

目前支持的参数方式:

  • HTTP GET 请求:将请求参数以 Key 、Value 形式放入Map中。value 需为字符串类型。
  • HTTP POST 请求,请求类型为 application/Json : 支持JSON对象传递,将JSON对象中的第一层级的 Key 、Value 放入Map中,如Value 仍为 JSON对象,也同样需要进行自然排序。其他类型value 需为字符串类型。
  • HTTP POST 请求,请求类型为 multipart/form-data : 将普通参数以 Key 、Value 形式放入Map中,文件参数不参与校验过程。如Value 仍为 JSON对象,也同样需要进行自然排序。其他类型value 需为字符串类型。
  1. 拼接待签名字符串

    //按照如下格式进行拼接待签名字符串:
    StringToSign = Algorithm + ‘\n’ + date + ‘\n’ + HashedRequestPayload

字段说明:

字段名称 解释
1 Algorithm 签名算法,固定为 HMAC-SHA256
2 date UTC 标准时间的日期,取值需要和请求参数中 timestamp 换算的 UTC 标准时间日期一致
3 HashedRequestPayload 第一步所得规范请求串的哈希值
  1. 根据以上规则,得到待签名字符串如下:

HMAC-SHA256

2021-06-23 01:11:12

3b6099ce6240a6cfa8a4f77a2d3e5cd723b6fa9d2d43031b340b9362eb3ed434

  1. 计算签名

计算签名的伪代码如下:

String signature = HexEncode(HMAC_SHA256(Assess key Secret, StringToSign))

signature = signature.toLowerCase()

  1. 拼接Authorization

按如下格式拼接 Authorization:

Authorization = Algorithm + ‘ ‘ + ‘Signature=’ + Signature ‘ ‘+ ‘AccessKey=’+ AK + ‘ ‘ + ‘Timestamp=’ + timestamp

鉴权加密算法代码示例

JAVA版本

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
import java.util.TreeMap;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;


public class ApiAuthorizationDemo{


private final static Charset UTF8 = StandardCharsets.UTF_8;


public static byte[] hmac256(byte[] key, String msg) throws Exception {
    Mac mac = Mac.getInstance("HmacSHA256");
    SecretKeySpec secretKeySpec = new SecretKeySpec(key, mac.getAlgorithm());
    mac.init(secretKeySpec);
    return mac.doFinal(msg.getBytes(UTF8));
}
public static String sha256Hex(String s) throws Exception {
    MessageDigest md = MessageDigest.getInstance("SHA-256");
    byte[] d = md.digest(s.getBytes(UTF8));
    return DatatypeConverter.printHexBinary(d).toLowerCase();
}


public static void main(String[] args) throws Exception {


    String algorithm = "HMAC-SHA256";
    String accessKey = "915d957d05ff44f3aeb12413e2247e5cdb5e";
    String accessKeySecret = "c7531c2357e2200bb7eee3fe190f138a0febcbec791241417aceac0418f8c662657b97e5fa87b5b4b9bafb";


    //组装参数,此处以设备注册为例子
    final TreeMap<String, Object> treeMap = Maps.newTreeMap();
    treeMap.put("productId","pJabWNSCCU");

    final Long timestamp = System.currentTimeMillis();


    //参数转为JSON,并进行加密

     final JSONObject jsonObject = JSONUtil.parseObj(treeMap,JSONConfig.create().setOrder(true));
   final String jsonStr = jsonObject.toString();
    final String hashedRequestPayload = sha256Hex(jsonStr);



    //拼接待签名字符串
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" );
    sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
    String date = sdf.format(new Date(Long.valueOf(timestamp)));
    String stringToSign = algorithm + "\n" + date + "\n" + hashedRequestPayload;
    //计算签名
    final String signature = DatatypeConverter.printHexBinary(hmac256(accessKeySecret.getBytes(UTF8), stringToSign)).toLowerCase();

    //拼接Authorization
    String authorization = algorithm + " " + "Signature="+signature + " " + "AccessKey="+ accessKey + " " + "Timestamp=" + timestamp ;
}


}

Node JS版本

/* eslint-disable no-restricted-syntax */
import * as crypto from 'crypto';

const {
  createHash,
  createHmac,
} = crypto;
const algorithm = 'HMAC-SHA256';

type GetAuthorizationOptions = {
  payload: {},
  accessKey: string,
  accessSecret: string,
};

// 自然排序
function sortObj(obj) {
  const newObj = {};
  Object.assign(newObj, obj);

  const keys = Object.keys(newObj); // Cannot convert undefined or null to object
  keys.sort();
  const sorted = {};
  for (const k of keys) {
    sorted[k] = obj[k];
  }
  return sorted;
}

function hashSha256(content: string): string {
  return createHash('sha256').update(content).digest('hex');
}

function hmacSha256(content: string, secret: string): string {
  return createHmac('sha256', secret).update(content).digest('hex');
}

function getFull(v) {
  return v < 10 ? `0${v}` : v;
}

function getUTC(date: Date): string {
  const y = date.getUTCFullYear();
  const m = getFull(date.getUTCMonth() + 1);
  const d = getFull(date.getUTCDate());
  const h = getFull(date.getUTCHours());
  const M = getFull(date.getUTCMinutes());
  const s = getFull(date.getUTCSeconds());

  return `${y}-${m}-${d} ${h}:${M}:${s}`;
}

function getAuthorization(options: GetAuthorizationOptions): string {
  const { payload, accessKey, accessSecret } = options;
  const now = new Date();

  // 1:对请求参数进行排序并加密
  const sha256Result = hashSha256(JSON.stringify(sortObj(payload)));

  // 2: 拼接待签名字符串
  const stringToSign = `${algorithm}
${getUTC(now)}
${sha256Result}`;

  // 3:计算签名
  const hmacSha256Result = hmacSha256(stringToSign, accessSecret);

  // 4:拼接 Authorization
  const Authorization = `${algorithm} Signature=${hmacSha256Result} AccessKey=${accessKey} Timestamp=${now.valueOf()}`;

  return Authorization;
}

// 调用方式
// getAuthorization({
//   payload: {},
//   accessKey: 'xxx',
//   accessSecret: 'xxx',
// });

export default getAuthorization;

Python版本

#! /usr/bin/env python3.9
import os
import hashlib
import time
import hmac
import traceback

import requests

from common import sqlalchemy_mysql_conn
from models import *
from datetime import datetime, timedelta

class sysMonitorMain:
    def __init__(self):
        self.conf_info = {
            "test_cn": {
                "host": "https://testcn-openapi.narwaltech.com",
                "obs_base_url": "https://narwal-test.obs.cn-south-1.myhuaweicloud.com/",
                "access_key": "k3NtuLpLGgRzoEFP1Qen",
                "access_key_secret": "fc8c28139c96485d90c816f5c1c2b2d5"
            },
            "prod_cn": {
                "host": "https://cn-openapi.narwaltech.com",
                "obs_base_url": "https://narwal-base-robot-logs.obs.cn-south-1.myhuaweicloud.com",
                "access_key": "ALzJii8TBcnkTKQuWrEz",
                "access_key_secret": "24bd4f4ac1224176b3d1af19545b8884"
            }
        }
        self.api_url = '/api/v1/obs/file'
        self.line = {
            "disk": 80,
            "cpu": 55,
            "mem": 60
        }
        self.MYSQL_OBJ = sqlalchemy_mysql_conn.MySQL()

    def get_all_device_list(self):
        res = []
        device_list = {}
        datas = self.MYSQL_OBJ.query_all(sys_device_list)
        # print(datas)
        for data in datas:
            # print(type(data))
            # print(data.test_schema())
            res.append(data.test_schema())
            if not data.test_schema()['device_name'] in device_list.keys():
                device_list[data.test_schema()['device_name']] = {"product_id": data.test_schema()['product_id'],
                                                                  "service_type": data.test_schema()['service_type'],
                                                                  "contry_code": data.test_schema()['contry_code']}
        # print(res)
        # print(device_list)
        return device_list

    def gen_authorization(self, param, access_key, accessKeySecret):
        param_json = json.dumps(param, separators=(',', ':'))  # 去除默认加的空格
        header_hash = hashlib.sha256()
        header_hash.update(bytes(param_json, encoding='utf-8'))
        Algorithm = "HMAC-SHA256"
        tmp_time = time.time()
        time_stamp = int((tmp_time) * 1000)
        date_tmp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(tmp_time))
        strptime = datetime.strptime(date_tmp, "%Y-%m-%d %H:%M:%S")
        date = strptime - timedelta(hours=8)
        stringtosign = Algorithm + '
' + str(date) + '
' + header_hash.hexdigest()
        signaturetohash = hmac.new(accessKeySecret.encode(), stringtosign.encode(),
                                   digestmod="sha256").hexdigest().lower()
        # print(signaturetohash)
        Authorization = Algorithm + ' ' + 'Signature=' + signaturetohash + ' ' + "AccessKey=" + access_key + ' ' + 'Timestamp=' + str(
            int(time_stamp))
        header = {
            'Content-Type': 'application/json',
            'Authorization': Authorization
        }
        return header

    def run(self):
        device_list = self.get_all_device_list()
        first_signal = True
        # sleep_days = 0.25
        sleep_secs = 3600
        while True:
            try:
                end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
                if first_signal:
                    start_time = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
                else:
                    start_time = (datetime.now() - timedelta(seconds=sleep_secs)).strftime("%Y-%m-%d %H:%M:%S")
                for key in device_list.keys():
                    conf = {}
                    # 正服/测服判断
                    if device_list[key]['service_type'] is None or 'test' in device_list[key]['service_type']:
                        # 国别判断
                        if device_list[key]['contry_code'] is None:
                            conf = self.conf_info['prod_cn']
                        else:
                            conf = self.conf_info['prod_cn']
                    elif 'prod' in device_list[key]['service_type']:
                        # 国别判断
                        if device_list[key]['contry_code'] is None:
                            conf = self.conf_info['prod_cn']
                        else:
                            conf = self.conf_info['prod_cn']
                    conf['device_name'] = key
                    conf['product_id'] = device_list[key]['product_id']
                    self.get_iot_info(conf, "core", start_time, end_time)
                    self.get_iot_info(conf, "log", start_time, end_time)
                    os.system('rm -rf ../tmp/*')
                first_signal = False
                time.sleep(sleep_secs)
            except KeyboardInterrupt:
                pass


if __name__ == '__main__':
    main_entry = sysMonitorMain()
    main_entry.run()

统一请求路径

站点 请求路径
中国 测试环境:https://testcn-openapi.narwaltech.com
生产环境:https://cn-openapi.narwaltech.com
美国 测试环境:https://testus-openapi.narwaltech.com
生产环境:https://us-openapi.narwaltech.com
日本 测试环境:https://testjp-openapi.narwaltech.com
生产环境:https://jp-openapi.narwaltech.com
加拿大 测试环境:https://testca-openapi.narwaltech.com/
生产环境:https://ca-openapi.narwaltech.com/
韩国 测试环境:https://testkr-openapi.narwaltech.com/
生产环境:https://kr-openapi.narwaltech.com/
澳洲 测试环境:https://testaus-openapi.narwaltech.com/
生产环境:https://aus-openapi.narwaltech.com/
德国 测试环境:https://testde-openapi.narwaltech.com/
生产环境:https://de-openapi.narwaltech.com/
新加坡 测试环境:https://testsg-openapi.narwaltech.com/
生产环境:https://sg-openapi.narwaltech.com/
马来西亚 测试环境:https://testmy-openapi.narwaltech.com/
生产环境:https://my-openapi.narwaltech.com/
以色列 测试环境:https://testil-openapi.narwaltech.com/
生产环境:https://il-openapi.narwaltech.com/
法国 测试环境:https://testfr-openapi.narwaltech.com/
生产环境:https://fr-openapi.narwaltech.com/
意大利 测试环境:https://testit-openapi.narwaltech.com/
生产环境:https://it-openapi.narwaltech.com/
西班牙 测试环境:https://testes-openapi.narwaltech.com/
生产环境:https://es-openapi.narwaltech.com/
英国 测试环境:https://testgb-openapi.narwaltech.com/
生产环境:https://gb-openapi.narwaltech.com/
瑞士 测试环境:https://testch-openapi.narwaltech.com/
生产环境:https://ch-openapi.narwaltech.com/
荷兰 测试环境:https://testnl-openapi.narwaltech.com/
生产环境:https://nl-openapi.narwaltech.com/
印度 测试环境:https://testin-openapi.narwaltech.com/
生产环境:https://in-openapi.narwaltech.com/
 创建时间:2023-03-28 21:40
最后编辑:admin  更新时间:2024-10-18 10:58