Interrogation des données
- Définition de la source de données (bucket - database) :
from
from(bucket: "netdatatsdb/autogen")
- Plage de temps, absolue ou relative :
range
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h, stop: -10m)
from(bucket: "netdatatsdb/autogen")
|> range(start: 2021-01-25T00:00:00Z, stop: 2021-01-29T23:59:00Z)
- Filtrer par mesure :
filter
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
- Filtrer par tag key :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
|> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
- Filtrer par champ et valeur de champ :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
|> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
|> filter(fn: (r) => r["_field"] == "pcpu")
|> filter(fn: (r) => r["_value"] > 80)
- Les filtres peuvent être combinés en une seule clause
filteravec les opérateursand/or:
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics" and
r["host"] == "vpsfrsqlpac1" and
r["_field"] == "pcpu" and r["_value"] > 80
)
- Selon les préférences, la notation point est autorisée :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" and r._value > 80
)
- Affichage des données :
yield
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" and r._value > 80 )
|> yield()
Les clauses from et range sont supprimées par souci de concision.
- Utiliser les expressions régulières :
|> filter(fn: (r) => r.host =~ /^vpsfrsqlpac[1-8]$/ )
|> filter(fn: (r) => r.host !~ /^vpsensqlpac[1-8]$/ )
npremiers enregistrements :limit
|> limit(n:10)
nderniers enregistrements :tail
|> tail(n:10)
- Trier les données :
sort
|> sort(columns: ["_value"], desc: true)
- Renommer une colonne :
rename
|> rename(columns: {_value: "average", _time: "when"})
- Supprimer des colonnes en sortie :
drop
|> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
- Sélection des colonnes en sortie :
keep
|> keep(columns: ["_value", "_time"])
- Une simple requête Flux avant d’aller plus loin :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" and
r._value > 80)
|> sort(columns: ["_value"], desc: true)
|> rename(columns: {_value: "average"})
|> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
|> limit(n:10)
|> yield()
Fenêtrage des données (Windowing)
aggregateWindow
|> aggregateWindow(
every: 1m,
fn: mean,
column: "_value",
timeSrc: "_stop",
timeDst: "_time",
createEmpty: true
)
|> aggregateWindow(every: 10m, fn: mean)
Pour agréger sur une colonne différente de la colonne par défaut _value :
|> aggregateWindow(every: 10m, fn: mean, column: "colname")
Pour supprimer les valeurs NULL, définir createEmpty à False :
|> aggregateWindow(every: 10m, fn: mean, createEmpty: false)
window
aggregateWindow est en fait une fonction de raccourci utilisant la fonction window.
|> window(every: 10m)
|> mean()
createEmpty est défini à true pour afficher les valeurs nulles.
|> window(every: 10m, createEmpty: true)
|> mean()
La colonne _time n’est alors plus aggrégée dans la table en sortie,
pour ré-ajouter la colonne _time pour un traitement ultérieur,
la fonction duplicate est appelée pour dupliquer la colonne _start ou _stop
en tant que nouvelle colonne _time :
|> window(every: 10m, createEmpty: true)
|> mean()
|> duplicate(column: "_stop", as: "_time")
Pour retrouver le format régulier, les données sont finalement "unwindowed" :
|> window(every: 10m, createEmpty: true)
|> mean()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
fill
Optionnellement, utiliser la fonction fill pour gérer les valeurs vides lorsque createEmpty
est défini à true lors du fenêtrage des données.
|> fill(column: "_value", value: 0.0)
|> fill(column: "_value", usePrevious: true)
- Fenêtrage par mois et années
Flux supporte le fenêtrage des données par mois et années calendaires : 1mo, 1yr. Cette fonctionnalité
n’était pas possible avec InfluxQL.
|> aggregateWindow(every: 1mo, fn: mean)
Sélection de plages horaires
hourSelection
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu"
)
|> hourSelection(start: 8, stop: 18)
Jointures (join)
join
datapcpu = from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vps_pcpu" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" )
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
datapmem = from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vps_pmem" and
r.host == "vpsfrsqlpac1" and
r._field == "pmem" )
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
join(
tables: {pcpu:datapcpu, pmem:datapmem},
on: ["_time", "host"],
method: "inner"
)
Seule la méthode inner est actuellement autorisée (v 2.0.4).
Dans des versions futures, d’autres méthodes de jointures seront progressivement implémentées : cross, left, right, full.
La réalisation des jointures sur les colonnes _time contenant secondes, microsecondes, nanosecondes…
peut être résolue en normalisant les horodatages irréguliers à l’aide de la fonction truncateTimeColumn qui
tronque les valeurs _time à une unité spécifiée :
|> truncateTimeColumn(unit: 1m)
Pivot
pivot
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" )
|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value"
)
Comme les jointures, utiliser la fonction truncateTimeColumn pour normaliser les timestamps.
Utiliser la fonction de raccourci schema.fieldsAsCols() pour pivoter les données uniquement sur time/_field :
import "influxdata/influxdb/schema"
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" )
|> schema.fieldsAsCols()
Histogrammes
histogram
from(bucket:"netdatatsdb/autogen")
|> range(start: -3h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu"
)
|> histogram(
bins:[0.0,10.0,20.0,30.0,40.0,50.0,60.0,70.0,80.0,90.0,100.0]
)
Utiliser linearBins pour faciliter la création de la liste des valeurs float linéairement séparés.
|> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )
Les valeurs float séparées logarithmiquement sont définies avec logarithmicBins
|> histogram(
bins: logarithmicBins(start: 1.0, factor: 2.0, count: 10,
infinity: true)
)
Par défaut, les valeurs sont cumulatives dans la colonne _value, appliquer la fonction difference
pour remplacer le calcul cumulatif.
|> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )
|> difference()
histogram histogram + difference
le _value le _value
----- ------ ----- ------
50 292 50 269
60 536 60 241
70 784 70 247
Colonnes calculées - Computed columns (map)
map
|> map(fn: (r) => ({ _value: r._value * 2.0 }))
L’utilisation de base de la fonction map supprime les colonnes
qui ne font pas partie de la clé de groupe (_time, _start, _stop…)
et qui ne sont pas explicitement "mappées",
utiliser l’opérateur with pour empêcher leur suppression :
|> map(fn: (r) => ({ r with _value: r._value * 2.0 }))
L’opérateur with met à jour la colonne si elle existe déjà, créé une nouvelle colonne si elle n’existe pas,
et retourne toutes les colonnes existantes dans la table en sortie.
Fonctions d’agrégat personnalisées (reduce)
reduce
|> reduce(fn: (r, accumulator) => ({
count: accumulator.count + 1,
total: accumulator.total + r._value,
avg: (accumulator.total + r._value) / float(v: accumulator.count)
}),
identity: {count: 1, total: 0.0, avg: 0.0}
)
identity définit les valeurs initiales ainsi que le type de données.
Une ligne unique avec les agrégats est retournée sans colonne _time.
Écrire des données
to
|> to(bucket:"history", org:"sqlpac")
Pour écrire dans le même bucket, utiliser auparavant la fonction set pour définir le nom de la mesure :
|> set(key: "_measurement", value: "history_vpsmetrics")
|> to(bucket:"netdatatsdb/autogen", org:"sqlpac")
Sources de données SQL
- Informations métadonnées (username, password…)
Utiliser influx secret ou curl pour stocker les métadonnées :
$ influx secret update --key PG_HOST --value vpsfrsqlpac
$ influx secret update --key PG_PORT --value 5432
$ influx secret update --key PG_USER --value influxdb
$ influx secret update --key PG_PASS --value "***********"
Extraire les "secrets" dans un script Flux à l’aide du package secret (vérifier l’autorisation read:secrets) :
import "influxdata/influxdb/secrets"
PG_HOST = secrets.get(key: "PG_HOST")
PG_USER = secrets.get(key: "PG_USER")
PG_PASS = secrets.get(key: "PG_PASS")
PG_PORT = secrets.get(key: "PG_PORT")
sql.from: extraire les données
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
datavps = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
query: "SELECT name, totalmemory FROM vps"
)
Drivers disponibles avec InfluxDB v2.0.4 (plus à venir dans de futures releases) :
- awsathena
- bigquery
- mysql
- postgres
- snowflake
- sqlserver, mssql
- Joindre les données
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
datavps = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
query: "SELECT name, totalmemory FROM vps"
)
|> rename(columns : {name: "host"})
datamem = from(bucket: "netdatatsdb/autogen")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "vpsmetrics"
and r._field == "mem"
and r.host == "vpsfrsqlpac1")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
join(
tables: {vps:datavps, mem:datamem},
on: ["host"],
method: "inner"
)
|> map(fn: (r) => ({ r with _value: (r._value / r.totalmemory) * 100.0 }))
sql.to: writing data
Vérifier la structure de la table recevant les données : types de données, noms des colonnes, null/not null.
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
from(bucket: "netdatatsdb/autogen")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "vps_pmem"
and r._field == "pmem"
and r.host == "vpsfrsqlpac2")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> rename(columns: {_value: "pmem", _time: "dth"})
|> keep(columns: ["host", "dth", "pmem"])
|> sql.to(
driverName: "postgres",
dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
table: "vpspmem"
)