InfluxDB v2 : langage Flux, aide-mémoire

Logo

Interrogation des données

  • Définition de la source de données (bucket - database) : from
from(bucket: "netdatatsdb/autogen")
  • Plage de temps, absolue ou relative : range
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h, stop: -10m)
              
from(bucket: "netdatatsdb/autogen")
  |> range(start: 2021-01-25T00:00:00Z, stop: 2021-01-29T23:59:00Z)
  • Filtrer par mesure : filter
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
  • Filtrer par tag key :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
  |> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
  • Filtrer par champ et valeur de champ :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
  |> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
  |> filter(fn: (r) => r["_field"] == "pcpu")
  |> filter(fn: (r) => r["_value"] > 80)
  • Les filtres peuvent être combinés en une seule clause filter avec les opérateurs and / or :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics" and
                       r["host"] == "vpsfrsqlpac1" and
                       r["_field"] == "pcpu" and r["_value"] > 80
  )
  • Selon les préférences, la notation point est autorisée :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and r._value > 80
  )
  • Affichage des données : yield
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and r._value > 80 )
  |> yield()

Les clauses from et range sont supprimées par souci de concision.

  • Utiliser les expressions régulières :
  |> filter(fn: (r) => r.host =~ /^vpsfrsqlpac[1-8]$/ )
             
  |> filter(fn: (r) => r.host !~ /^vpsensqlpac[1-8]$/ )
  • n premiers enregistrements : limit
  |> limit(n:10)
  • n derniers enregistrements : tail
  |> tail(n:10)
  • Trier les données : sort
  |> sort(columns: ["_value"], desc: true)
  • Renommer une colonne : rename
  |> rename(columns: {_value: "average", _time: "when"})
  • Supprimer des colonnes en sortie : drop
  |> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
  • Sélection des colonnes en sortie : keep
  |> keep(columns: ["_value", "_time"])
  • Une simple requête Flux avant d’aller plus loin :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and
                       r._value > 80)
  |> sort(columns: ["_value"], desc: true)
  |> rename(columns: {_value: "average"})
  |> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
  |> limit(n:10)
  |> yield()

Fenêtrage des données (Windowing)

  • aggregateWindow
  |> aggregateWindow(
      every: 1m,
      fn: mean,
      column: "_value",
      timeSrc: "_stop",
      timeDst: "_time",
      createEmpty: true
)
  |> aggregateWindow(every: 10m, fn: mean)

Pour agréger sur une colonne différente de la colonne par défaut _value :

  |> aggregateWindow(every: 10m, fn: mean, column: "colname")

Pour supprimer les valeurs NULL, définir createEmpty à False :

  |> aggregateWindow(every: 10m, fn: mean, createEmpty: false)
  • window

aggregateWindow est en fait une fonction de raccourci utilisant la fonction window.

  |> window(every: 10m)
  |> mean()

createEmpty est défini à true pour afficher les valeurs nulles.

  |> window(every: 10m, createEmpty: true)
  |> mean()

La colonne _time n’est alors plus aggrégée dans la table en sortie, pour ré-ajouter la colonne _time pour un traitement ultérieur, la fonction duplicate est appelée pour dupliquer la colonne _start ou _stop en tant que nouvelle colonne _time :

  |> window(every: 10m, createEmpty: true)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")

Pour retrouver le format régulier, les données sont finalement "unwindowed" :

  |> window(every: 10m, createEmpty: true)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")
  |> window(every: inf)
  • fill

Optionnellement, utiliser la fonction fill pour gérer les valeurs vides lorsque createEmpty est défini à true lors du fenêtrage des données.

  |> fill(column: "_value", value: 0.0) 
  |> fill(column: "_value", usePrevious: true) 
  • Fenêtrage par mois et années

Flux supporte le fenêtrage des données par mois et années calendaires : 1mo, 1yr. Cette fonctionnalité n’était pas possible avec InfluxQL.

  |> aggregateWindow(every: 1mo, fn: mean) 

Sélection de plages horaires

  • hourSelection
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu"
  )
  |> hourSelection(start: 8, stop: 18)

Jointures (join)

  • join
datapcpu = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pcpu" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" )
  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)

datapmem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pmem" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pmem" )
  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)

join(
    tables: {pcpu:datapcpu, pmem:datapmem},
    on: ["_time", "host"],
    method: "inner"
  )

Seule la méthode inner est actuellement autorisée (v 2.0.4). Dans des versions futures, d’autres méthodes de jointures seront progressivement implémentées : cross, left, right, full.

La réalisation des jointures sur les colonnes _time contenant secondes, microsecondes, nanosecondes… peut être résolue en normalisant les horodatages irréguliers à l’aide de la fonction truncateTimeColumn qui tronque les valeurs _time à une unité spécifiée :

 |> truncateTimeColumn(unit: 1m)

Pivot

  • pivot
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" )
  |> pivot(
       rowKey:["_time"],
       columnKey: ["_field"],
       valueColumn: "_value"
    )

Comme les jointures, utiliser la fonction truncateTimeColumn pour normaliser les timestamps.

Utiliser la fonction de raccourci schema.fieldsAsCols() pour pivoter les données uniquement sur time/_field :

import "influxdata/influxdb/schema"

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" )
  |> schema.fieldsAsCols()

Histogrammes

  • histogram
from(bucket:"netdatatsdb/autogen")
  |> range(start: -3h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu"
  )
  |> histogram(
       bins:[0.0,10.0,20.0,30.0,40.0,50.0,60.0,70.0,80.0,90.0,100.0]
   )

Utiliser linearBins pour faciliter la création de la liste des valeurs float linéairement séparés.

  |> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )

Les valeurs float séparées logarithmiquement sont définies avec logarithmicBins

  |> histogram(
       bins: logarithmicBins(start: 1.0, factor: 2.0, count: 10,
                             infinity: true)
   )

Par défaut, les valeurs sont cumulatives dans la colonne _value, appliquer la fonction difference pour remplacer le calcul cumulatif.

  |> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )
  |> difference()
histogram             histogram + difference
le     _value        le     _value
-----  ------        -----  ------
50     292           50     269
60     536           60     241
70     784           70     247

Colonnes calculées - Computed columns (map)

  • map
  |> map(fn: (r) => ({ _value: r._value * 2.0 }))

L’utilisation de base de la fonction map supprime les colonnes qui ne font pas partie de la clé de groupe (_time, _start, _stop…) et qui ne sont pas explicitement "mappées", utiliser l’opérateur with pour empêcher leur suppression :

  |> map(fn: (r) => ({ r with _value: r._value * 2.0 }))

L’opérateur with met à jour la colonne si elle existe déjà, créé une nouvelle colonne si elle n’existe pas, et retourne toutes les colonnes existantes dans la table en sortie.

Fonctions d’agrégat personnalisées (reduce)

  • reduce
      |> reduce(fn: (r, accumulator) => ({
      count: accumulator.count + 1,
      total: accumulator.total + r._value,
      avg: (accumulator.total + r._value) / float(v: accumulator.count)
    }),
    identity: {count: 1, total: 0.0, avg: 0.0}
  )

identity définit les valeurs initiales ainsi que le type de données.

Une ligne unique avec les agrégats est retournée sans colonne _time.

Écrire des données

  • to
  |> to(bucket:"history", org:"sqlpac")

Pour écrire dans le même bucket, utiliser auparavant la fonction set pour définir le nom de la mesure :

  |> set(key: "_measurement", value: "history_vpsmetrics")
  |> to(bucket:"netdatatsdb/autogen", org:"sqlpac")

Sources de données SQL

  • Informations métadonnées (username, password…)

Utiliser influx secret ou curl pour stocker les métadonnées :

$ influx secret update --key PG_HOST --value vpsfrsqlpac
$ influx secret update --key PG_PORT --value 5432
$ influx secret update --key PG_USER --value influxdb
$ influx secret update --key PG_PASS --value "***********"

Extraire les "secrets" dans un script Flux à l’aide du package secret (vérifier l’autorisation read:secrets) :

import "influxdata/influxdb/secrets"

PG_HOST = secrets.get(key: "PG_HOST")
PG_USER = secrets.get(key: "PG_USER")
PG_PASS = secrets.get(key: "PG_PASS")
PG_PORT = secrets.get(key: "PG_PORT") 
  • sql.from : extraire les données
import "sql"

import "influxdata/influxdb/secrets"
// Get secrets…

datavps = sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
  query: "SELECT name, totalmemory FROM vps"
)

Drivers disponibles avec InfluxDB v2.0.4 (plus à venir dans de futures releases) :

  • awsathena
  • bigquery
  • mysql
  • postgres
  • snowflake
  • sqlserver, mssql
  • Joindre les données
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…

datavps = sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
  query: "SELECT name, totalmemory FROM vps"
)
  |> rename(columns : {name: "host"})

datamem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vpsmetrics"
                       and r._field == "mem"
                       and r.host == "vpsfrsqlpac1")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)

join(
    tables: {vps:datavps, mem:datamem},
    on: ["host"],
    method: "inner"
  )
  |> map(fn: (r) => ({ r with _value: (r._value / r.totalmemory) * 100.0 }))
  • sql.to : writing data

Vérifier la structure de la table recevant les données : types de données, noms des colonnes, null/not null.

import "sql"

import "influxdata/influxdb/secrets"
// Get secrets…

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vps_pmem"
                       and r._field == "pmem"
                       and r.host == "vpsfrsqlpac2")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> rename(columns: {_value: "pmem", _time: "dth"})
  |> keep(columns: ["host", "dth", "pmem"])
  |> sql.to(
      driverName: "postgres",
      dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
      table: "vpspmem"
    )