InfluxDB - Passer du langage InfluxQL au langage Flux

Logo

Introduction

La version 2 de la base de données Time series InfluxDB est sortie en novembre 2020. Un article a été publié sur la procédure de migration d’InfluxDB v1 vers InfluxDB v2 - InfluxDB : Migration vers la version 2, procédure.

InfluxQL (Influx Query Language), le langage "SQL Like" d’InfluxDB v 1.x, est toujours supporté dans InfluxDB v2 pour compatibilité descendante, mais seulement pour les users 1.x avec authentification migrés vers InfluxDB v2.

Flux est maintenant le langage natif dans InfluxDB v2. Le langage Flux unifie les requêtes et les traitements ETL (InfluxQL, TickScripts).

InfluxQL to Flux

Quand on est habitués à SQL, passer d’InfluxQL à Flux est un petit peu déroutant au tout premier abord, mais on acquiert vite la mécanique. Garder à l’esprit que le langage Flux apporte des fonctionnalités qu’InfluxQL ne pouvait pas couvrir (jointures, pivots, sources de données externes…).

Dans cet article, un tour d’horizon sur la migration des requêtes InfluxQL existantes vers Flux.

Un guide pratique et aide mémoire du langage Flux est aussi disponible sur SQLPAC :

Rappels rapides

Mesures et séries

Dans une base de données Time series InfluxDB, le format d’un point est le suivant :

measurement[,tag=value[,tag=value]] field=value[,field=value] [<timestamp>]
vpsmetrics,location=france,host=vpsfrsqlpac1 pcpu=49,mem=877 1580918550000000000
vpsmetrics,location=france,host=vpsfrsqlpac2 pcpu=22,mem=455 1580918550000000000

Les séries sont les combinaisons mesure/tag keys possibles:

measurement, tag key1=value1, tag key2=value2 [,...]
vpsmetrics,location=france,host=vpsfrsqlpac1
vpsmetrics,location=france,host=vpsfrsqlpac2
  • Les données sont écrasées si le point existe déjà.
  • Le timestamp du serveur est utilisé si il est omis.
  • Les types de données peuvent être forcées lors de l’écriture du premier point : value=25i pour un type entier, status=t|f pour un type booléen…

Bases de données et politiques de rétention v1, buckets v2

Une base de données/politique de rétention en version 1 est un bucket dans la version 2. Une organisation est obligatoire et initialisée à l’upgrade ou à la création.

v1 DBRP vs v2 Buckets

Une première requête Flux

Une requête Flux commence toujours avec la clause from, clause qui sélectionne le bucket où est stocké la ou les mesures à interroger.

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
SELECT …
FROM "netdatatsdb"."autogen".…

Intervalle de temps, range

L’intervalle de temps (start/stop) est ensuite appliqué avec la clause range. L’intervalle de temps peut être absolu ou relatif, le format de timestamps RFC3339 est utilisé pour les temps absolus. L’opérateur |> ajoute les clauses/fonctions.

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d, stop: -5m)


from(bucket: "netdatatsdb/autogen")
 |> range(start: 2021-02-05T00:00:00Z,
          stop: 2021-02-05T23:59:00Z)
SELECT …
FROM "netdatatsdb"."autogen".…
WHERE (time > now() -1d)
                
SELECT …
FROM "netdatatsdb"."autogen".…
WHERE (time > now() -1d and time < now() -5m)
                
SELECT …
FROM "netdatatsdb"."autogen".…
WHERE (time > '2021-02-05T00:00:00Z'
         and time < '2021-02-05T23:59:00Z')

Filtres, filter

Les filtres sur les mesures, tag keys, field keys… sont réalisés avec filter.

Pour filtrer sur la mesure vpsmetrics :

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
SELECT …
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (time > now() -1d)

La notation dot est possible :

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")

La notation dot est utilisée dans les prochaines sections. Ce choix va dépendre des préférences de codage.

Pour ajouter des filtres sur les tag keys (host…) :

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
SELECT …
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (time > now() -1d
       and host = 'vpsfrsqlpac1')

Pour ajouter des filtres sur les field keys (pcpu…) :

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
SELECT pcpu
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (time > now() -1d
       and host = 'vpsfrsqlpac1')

Obtenir les résultats, yield

Utiliser yield pour rapatrier les résultats, optionnellement avec un nom.

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> yield(name:"pcpuvps1")
SELECT pcpu
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (time > now() -1d
       and host = 'vpsfrsqlpac1')

Génial, voici notre première requête Flux faite :

Chronograf first Flux query

Expressions régulières

Comparé à InfluxQL, les expressions régulières de base sont utilisées dans les requêtes Flux de la même manière. Rien de nouveau, sauf le nouveau package regexp dans InfluxDB v2, non abordé ici car cet article ne se concentre que sur la migration du code InfluxQL vers le code Flux.

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host =~ /^vps/
                      and r.host !~ /(de|uk|us)/)
 |> filter(fn: (r) => r._field == "pcpu")
SELECT pcpu 
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (time > now() -1d
       and host =~ /^vps/
       and host !~ /(de|uk|us)/)

Format de sortie InfluxDB v2

Il est important de comprendre le format de sortie des données brutes avec InfluxDB v2, différent du format de sortie v1. Dans la sortie ci-dessous, un filtre n’est défini que sur le nom de la mesure, les filtres sur les clés des tags et des champs (tag keys/field keys) ne sont pas appliqués.

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> yield()
table _start               _stop                  _time                           _value  _field  _measurement  host          location
----- -------------------- ---------------------  ------------------------------  ------  ------  ------------  ------------  --------
    0 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:06.446501067Z    1182     mem    vpsmetrics  vpsfrsqlpac1  france
    0 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:16.604175869Z     817     mem    vpsmetrics  vpsfrsqlpac1  france
…
    1 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:06.446501067Z      62    pcpu    vpsmetrics  vpsfrsqlpac1  france
    1 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:16.604175869Z      66    pcpu    vpsmetrics  vpsfrsqlpac1  france
…
    2 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:07.420674651Z     429     mem    vpsmetrics  vpsfrsqlpac2  france
    2 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:17.176860469Z     464     mem    vpsmetrics  vpsfrsqlpac2  france
…
    3 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:07.420674651Z      29    pcpu    vpsmetrics  vpsfrsqlpac2  france
    3 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:17.176860469Z      32    pcpu    vpsmetrics  vpsfrsqlpac2  france
  • Un identifiant de table est appliqué sur chaque jeu de résultats.
  • L’intervalle de temps est clairement décrit avec les colonnes _start et _stop.
  • Le nom de la mesure est dans la colonne _measurement.
  • La clé de champ (field key) et sa valeur sont respectivement dans les colonnes _field et _value.
  • Les clés de tag (tag keys) sont affichées à la fin.

Des colonnes peuvent être retirées avec les fonctions drop ou keep, on peut ne pas vouloir toutes les colonnes dans la sortie :

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> keep(columns: ["_value", "_time"])
 |> yield(name:"pcpuvps1")
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> drop(fn: (column) => column =~ /^_(start|stop)/)
 |> yield(name:"pcpuvps1")

Les colonnes sont renommées avec la fonction rename :

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> keep(columns: ["_value", "_time"])
 |> rename(columns: {_value: "pcpu", _time: "when"})
 |> yield(name:"pcpuvps1")
table when                            pcpu
----- ------------------------------  ----
    0 2021-02-05T01:18:47.116766147Z    72
    0 2021-02-05T01:18:57.360131309Z    51

Fenêtrage des données (Windowing), aggregateWindow

Le travail le plus important lors de la migration des requêtes InfluxQL en syntaxe Flux : les requêtes de fenêtrage des données avec aggrégats (windowing). Les bases de données Time series sont conçues à cet effet.

La traduction est assez simple avec la fonction aggregateWindow. L’exemple ci-dessous calcule la valeur moyenne par heure avec la fonction mean, les autres fonctions d’aggrégat sont bien entendu disponibles (count, sum…)

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> aggregateWindow(every: 1h, fn: mean )
SELECT mean(pcpu)
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (host ='vpsfrsqlpac1' AND time > now() -1d)
GROUP BY time(1h)
… _time                             _value     _field   _measurement …
… --------------------   -----------------  ---------   ------------
  2021-02-05T06:00:00Z                           pcpu   vpsmetrics
  2021-02-05T07:00:00Z                           pcpu   vpsmetrics
  2021-02-05T08:00:00Z   61.70815450643776       pcpu   vpsmetrics
  2021-02-05T09:00:00Z   60.55806451612903       pcpu   vpsmetrics
  2021-02-05T10:00:00Z   60.01699716713881       pcpu   vpsmetrics
…

Pour supprimer les valeurs NULL, l’argument createEmpty est défini à FALSE dans la fonction aggregateWindow.

FluxInfluxQL
…
 |> aggregateWindow(every: 1h, fn: mean,
                    createEmpty: false )
SELECT mean(pcpu)
…
GROUP BY time(1h)
FILL(none)

Pour définir une valeur par défaut aux données vides, la fonction fill avec l’argument value est appelée après le fenêtrage des données :

FluxInfluxQL
…
 |> aggregateWindow(every: 1h, fn: mean,
                    createEmpty: true )
 |> fill(column: "_value", value: 0.0)
SELECT mean(pcpu)
…
GROUP BY time(1h)
FILL(0.0)

Pour appliquer la précédente valeur non nulle aux données vides, la fonction fill avec l’argument usePrevious est appelée après le fenêtrage des données :

FluxInfluxQL
…
 |> aggregateWindow(every: 1h, fn: mean,
                    createEmpty: true )
 |> fill(column: "_value", usePrevious: true)
SELECT mean(pcpu)
…
GROUP BY time(1h)
FILL(previous)

Dans les coulisses, la fonction aggregateWindow n’est pas une vraie fonction native, elle réalise des étapes impliquant notamment les fonctions window et mean :

…
 |> aggregateWindow(every: 1h, fn: mean,
                    createEmpty: true )
…
 |> window(every: 1h, createEmpty: true)
 |> mean()
 |> duplicate(column: "_stop", as: "_time")
 |> window(every: inf)

La colonne _stop est dupliquée pour réajouter la colonne _time qui est supprimée par l’appel de la fonction mean.

Sous-requêtes From

Les sous-requêtes sont fréquemment utilisées avec InfluxQL pour calculer les agrégats sur des données "fenêtrées". La traduction en langage Flux est assez simple pour des sous-requêtes basiques :

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> aggregateWindow(every: 1h, fn: mean )
 |> max()

SELECT max("mean")
FROM (
   SELECT mean(pcpu)
   FROM "netdatatsdb"."autogen"."vpsmetrics"
   WHERE (host ='vpsfrsqlpac1' AND time > now() -1d)
   GROUP BY time(1h)
)

Quand des colonnes calculées sont créées en utilisant des sous-requêtes InfluxQL, les fonctions Flux pivot et map sont combinées, la syntaxe est un petit peu plus compliquée. Pivoter les données est indispensable pour aligner les champs qui ont le même timestamp, la fonction map crée la colonne calculée :

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vps_space")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "used"
                      or r._field == "available")
 |> pivot(
       rowKey:["_time","host"],
       columnKey: ["_field"],
       valueColumn: "_value"
    )
 |> map(fn: (r) => (
        { r with pctspace:
          (r.used / (r.used + r.available)) * 100.0
        }
    ))
 |> max(column: "pctspace")

SELECT max(pctspace)
FROM (
   SELECT (used / (used + available)) * 100
            AS pctspace
   FROM "netdatatsdb"."autogen"."vps_space"
   WHERE (host ='vpsfrsqlpac1' AND time > now() -1d)
)

Alternativement, la fonction pivot peut être remplacée par la fonction schema.fieldsAsCols. schema.fieldsAsCols est une application spéciale de la fonction pivot et elle pivote les colonnes _field et _time pour aligner les champs.

FluxInfluxQL
import "influxdata/influxdb/schema"

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vps_space")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "used"
                      or r._field == "available")
 |> schema.fieldsAsCols()
 |> map(fn: (r) => (
        { r with pctspace:
          (r.used / (r.used + r.available)) * 100.0
        }
    ))
 |> max(column: "pctspace")

SELECT max(pctspace)
FROM (
   SELECT (used / (used + available)) * 100
            AS pctspace
   FROM "netdatatsdb"."autogen"."vps_space"
   WHERE (host ='vpsfrsqlpac1' AND time > now() -1d)
)

Copier des données

Avec InfluxDB v1.x, les instructions SELECT INTO sont utilisées pour copier des données d’une mesure à une autre.

Avec InfluxDB v2, utiliser la fonction to().

Noter dans l’exemple ci-dessous la fonction set pour changer le nom de la mesure afin de pouvoir copier les données dans le même bucket.

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
  |> range(start: -10d)
  |> filter(fn: (r) => r._measurement == "vpsmetrics")
  |> filter(fn: (r) => r._field == "value")
  |> aggregateWindow(every: 1h, fn: mean,
                     createEmpty: false)
  |> set(key: "_measurement", value: "vpsmetrics1h")
  |> to(org: "sqlpac", bucket: "netdatatsdb/autogen")
SELECT mean(value)
INTO netdatatsdb.autogen.vpsmetrics1h
FROM netdatatsdb.autogen.vpsmetrics
WHERE time >= now() - 10d
GROUP BY time(1h) FILL(none)

Outil de traduction, option transpile du client influx

Mise à jour (9 mars 2021) : l’option transpile sera obsolète dans les prochaines versions - GitHub Influxdata/Influxdb, fix(cmd/influx): delete unsupported influx transpile command

L’option transpile du client influx traduit les requêtes InfluxQL en syntaxe Flux, pas complètement fiable mais un bon outil de départ pour les débutants dans le langage Flux. Quelques ajustements à appliquer (intervalle de temps…), puis tester :

$ influx transpile \
   'SELECT pcpu FROM netdatatsdb.autogen.vpsmetrics WHERE (time > now() -1d and host =~ /^vps/ and host !~ /(de|uk|us)/)'
package main
from(bucket: "netdatatsdb/autogen")
        |> range(start: 2021-02-04T17:16:31.295656071Z, stop: 2262-04-11T23:47:16.854775806Z)
        |> filter(fn: (r) =>
                (r._measurement == "vpsmetrics" and r._field == "pcpu"))
        |> filter(fn: (r) =>
                (r["host"] =~ /^vps/ and r["host"] !~ /(de|uk|us)/))
        |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
        |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
        |> rename(columns: {_value: "pcpu"})
        |> yield(name: "0")

$ influx transpile \
   'SELECT mean(pcpu) FROM netdatatsdb.autogen.vpsmetrics WHERE time >= now() - 1w GROUP BY time(15m) FILL(none)'
package main
from(bucket: "netdatatsdb/autogen")
        |> range(start: 2021-01-29T16:25:00.305159561Z, stop: 2021-02-05T16:25:00.305159561Z)
        |> filter(fn: (r) =>
                (r._measurement == "vpsmetrics" and r._field == "pcpu"))
        |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
        |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
        |> window(every: 15m)
        |> mean()
        |> map(fn: (r) =>
                ({r with _time: r._start}))
        |> window(every: inf)
        |> rename(columns: {_value: "mean"})
        |> yield(name: "0")

Malheureusement, l’option transpile ne traduit pas les sous-requêtes, une erreur "unimplemented" est levée :

$ influx transpile \
    'SELECT max(pctspace) FROM ( SELECT (used / (used + available)) * 100 AS pctspace FROM netdatatsdb.autogen.vps_space WHERE (host='vpsfrsqlpac1'  AND time > now() -1d))'
Error: unimplemented: source must be a measurement

Conclusion

On peut regretter qu’InfluxQL soit remplacé par Flux, un langage plutôt NoSQL qui semble plus lourd syntaxiquement au tout premier abord, mais le langage Flux apporte des fonctionnalités qui n’étaient pas possibles avec InfluxQL (jointures, pivots, sources de données externes SQL…), ces fonctionnalités avancées seront abordées dans des articles à venir.

Il faut juste un tout petit peu de temps pour s’habituer au codage avec le langage Flux quand on vient du monde SQL.