新聞中心
01 問題
正文本篇我們要解決 ??No?6,?No7提到的網(wǎng)友問題,如下:

?簡單說就是如何處理兩條時(shí)間線的數(shù)值計(jì)算?上面例子是一個(gè) “+” 加法。
02 數(shù)據(jù)準(zhǔn)備
我們首先利用InfluxDB解決上述問題,首先進(jìn)行數(shù)據(jù)準(zhǔn)備,建立一個(gè)測(cè)試的bucket,建立之前先檢查一下現(xiàn)有的bucket。
啟動(dòng)InfluxDB實(shí)例,如下:
啟動(dòng)之后,我們查看一下現(xiàn)有的bucket,如下:
influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx bucket list
ID Name Retention Shard group duration Organization ID
98e86f05543f5866 _monitoring 168h0m0s 24h0m0s 56b35f89025991c8
b9b9609ae3e08b97 _tasks 72h0m0s 24h0m0s 56b35f89025991c8
創(chuàng)建名為iot的bucket,如下命令:
bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx setup \
--username iot \
--password 2021iotdb \
--org org \
--bucket 2021iotdb \
--retention 1h \
--token iot_test_token \
--host http://localhost:8086 \
--force
執(zhí)行成功之后會(huì)顯示如下:
influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx setup \
--username iot \
--password 2021iotdb \
--org org \
--bucket 2021iotdb \
--retention 1h \
--token iot_test_token \
--host http://localhost:8086 \
--force
Config default has been stored in /Users/jincheng/.influxdbv2/configs.
User Organization Bucket
iot org 2021iotdb
我們用命令查看一下:
influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx bucket list
ID Name Retention Shard group duration Organization ID
c05283f56bf9cead 2021iotdb 1h0m0s 1h0m0s 0b1ad4c0cd4db9ca
e70f5bb2fdaa5dd2 _monitoring 168h0m0s 24h0m0s 0b1ad4c0cd4db9ca
56241b01789c1a1b _tasks 72h0m0s 24h0m0s 0b1ad4c0cd4db9ca
插入兩條時(shí)間線數(shù)據(jù),如下:
influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx write --bucket 2021iotdb --precision s "m1 vm=3333 $(date +%s)"
influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx write --bucket 2021iotdb --precision s "m2 vn=4444 $(date +%s)"
我們插入兩條時(shí)間線數(shù)據(jù),m1的vm=3333,m2的vn=4444,我們的需求是vm + vn。
03 JOIN查詢
我們看一下JOIN的功能定義:
The join() function merges two or more input streams, whose values are equal on a set of common columns, into a single output stream. Flux allows you to join on any columns common between two data streams and opens the door for operations such as cross-measurement joins and math across measurements.
語法:join(tables: {key1: table1, key2: table2}, on: ["_time", "_field"], method: "inner")
這個(gè)和我們標(biāo)準(zhǔn)數(shù)據(jù)庫的JOIN語義基本一致,我們先查看一下用于測(cè)試的數(shù)據(jù),我們既可以用influxCLI,如下:
我們發(fā)現(xiàn)數(shù)據(jù)已經(jīng)插入成功。也可以用fluxCLI,InlfuxDB社區(qū)更推進(jìn)用flux,我們打開一個(gè)flux repl。細(xì)節(jié)可以查閱 前面一篇No6。我用IDE打開如下:
> from(bucket:"2021iotdb") |> range(start:-1h)
Result: _result
Error: unauthorized access
如圖,我們?cè)贗DE里面執(zhí)行查詢時(shí)候,提示我們需要token,那么influx query為啥不需要呢,IDE沒有默認(rèn)去讀取配置文件,我們可以配置環(huán)境變量也可以直接添加token,查詢語句如下:
> from(bucket:"2021iotdb", org:"org", token:"iot_test_token") |> range(start:-1h)
Result: _result
Table: keys: [_start, _stop, _field, _measurement]
_start:time _stop:time _field:string _measurement:string _time:time _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ------------------------------ ----------------------------
2021-04-06T05:36:50.079542000Z 2021-04-06T06:36:50.079542000Z vm m1 2021-04-06T06:23:16.000000000Z 3333
Table: keys: [_start, _stop, _field, _measurement]
_start:time _stop:time _field:string _measurement:string _time:time _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ------------------------------ ----------------------------
2021-04-06T05:36:50.079542000Z 2021-04-06T06:36:50.079542000Z
好的,一切都還算順利,我們看看如果計(jì)算 vm + vn呢?如果我們把 m1和m2兩個(gè)時(shí)間序列看成是兩個(gè)流(表),那么我們要進(jìn)行兩個(gè)表的操作,第一想到的應(yīng)該是兩個(gè)表進(jìn)行JOIN將兩個(gè)表的數(shù)據(jù)合并成一個(gè)寬表,然后在進(jìn)行列求值,如下:
tab1 = from(bucket:"2021iotdb", org:"org", token:"iot_test_token")
|> range(start:-1h)
|> filter(fn:(r) => r._measurement == "m1")
tab2 = from(bucket:"2021iotdb", org:"org", token:"iot_test_token")
|> range(start:-1h)
|> filter(fn:(r) => r._measurement == "m2")
得到兩個(gè)表之后我們?cè)谶M(jìn)行JOIN操作,查詢語句如下:
join(tables: {m1:tab1, m2:tab2},
on: ["_time"]
) |> map(fn:(r) => ({_time: r._time,
_value: r._value_m1 + r._value_m2
}))上面的on表示JOIN的條件,但是我們發(fā)現(xiàn),tab1和tab2中時(shí)間字段并不相同,如下:
所以我們需要再快速的插入兩條數(shù)據(jù),使得時(shí)間字段相同,我們才能拿到結(jié)果,插入之后數(shù)據(jù)如下:
這樣我們?cè)龠M(jìn)行查詢:
join(tables: {m1:tab1, m2:tab2},
on: ["_time"]
) |> map(fn:(r) => ({_time: r._time,
_value: r._value_m1 + r._value_m2
}))如上我們完成了查詢需求。哈哈,那是不是在InfluxDB里面進(jìn)行這類查詢都是用JOIN的方式嗎?是否有更簡單的方式?看下面部分:)
03 PIOVT查詢
我們看一下PIVOT的功能定義:
The pivot() function collects values stored vertically (column-wise) in a table and aligns them horizontally (row-wise) into logical sets.
語法:pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
其實(shí)在標(biāo)準(zhǔn)數(shù)據(jù)庫里面也有PIVOT,在InfluxDB里面pivot可以將行轉(zhuǎn)換為列,進(jìn)而將兩個(gè)時(shí)序數(shù)據(jù)值變成一個(gè)Table中的兩個(gè)列,這個(gè)內(nèi)置也可以為用戶進(jìn)行內(nèi)部優(yōu)化處理。我們看看如何操作:
> from(bucket:"2021iotdb", org:"org", token:"iot_test_token")
|> range(start:-1h)
|> pivot(rowKey:["_time"], columnKey: ["_measurement","_field"],valueColumn: "_value")
如上語句執(zhí)行結(jié)果如下:
我們發(fā)現(xiàn)m1的vm和m2的vn都變成一個(gè)表的某一列了,這樣pivot就完美的將兩個(gè)時(shí)序數(shù)據(jù)合并成寬表的列了。我們?cè)偌由暇唧w的過濾條件,如下:
接下來我們?cè)龠M(jìn)行計(jì)算,如下:
from(bucket:"2021iotdb", org:"org", token:"iot_test_token")
|> range(start:-1h)
|> filter(fn:(r) => r._measurement == "m1" or r._measurement == "m2")
|> pivot(rowKey:["_time"], columnKey: ["_measurement","_field"],valueColumn: "_value")
|> map(fn:(r) => ({_time: r._time, _value:r.m1_vm + r.m2_vn}))
OK, 大家是不是趕緊PIVOT非常方便?:)
04 問題
最后,留個(gè)問題給大家,大家知道標(biāo)準(zhǔn)數(shù)據(jù)庫里面PIVOT和UNPIVOT的使用場(chǎng)景嗎?或者Flink&Spark如何支持PIVOT?或者知道Apache IoTDB里面如何處理多條時(shí)序數(shù)據(jù)分析梳理嗎?我們下一篇見。
作者介紹
孫金城,社區(qū)編輯,Apache Flink PMC 成員,Apache Beam Committer,Apache IoTDB PMC 成員,ALC Beijing 成員,Apache ShenYu 導(dǎo)師,Apache 軟件基金會(huì)成員。關(guān)注技術(shù)領(lǐng)域流計(jì)算和時(shí)序數(shù)據(jù)存儲(chǔ)。
當(dāng)前文章:No.8-時(shí)序數(shù)據(jù)庫隨筆-InfluxDB多條時(shí)序數(shù)據(jù)聯(lián)合分析
網(wǎng)頁地址:http://www.fisionsoft.com.cn/article/cdhodjd.html


咨詢
建站咨詢
