 
					Interrogation des données
- Définition de la source de données (bucket - database) : from
from(bucket: "netdatatsdb/autogen")- Plage de temps, absolue ou relative : range
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h, stop: -10m)
              
from(bucket: "netdatatsdb/autogen")
  |> range(start: 2021-01-25T00:00:00Z, stop: 2021-01-29T23:59:00Z)- Filtrer par mesure : filter
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")- Filtrer par tag key :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
  |> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")- Filtrer par champ et valeur de champ :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
  |> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
  |> filter(fn: (r) => r["_field"] == "pcpu")
  |> filter(fn: (r) => r["_value"] > 80)- Les filtres peuvent être combinés en une seule clause filteravec les opérateursand/or:
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics" and
                       r["host"] == "vpsfrsqlpac1" and
                       r["_field"] == "pcpu" and r["_value"] > 80
  )- Selon les préférences, la notation point est autorisée :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and r._value > 80
  )- Affichage des données : yield
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and r._value > 80 )
  |> yield()Les clauses from et range sont supprimées par souci de concision.
- Utiliser les expressions régulières :
  |> filter(fn: (r) => r.host =~ /^vpsfrsqlpac[1-8]$/ )
             
  |> filter(fn: (r) => r.host !~ /^vpsensqlpac[1-8]$/ )- npremiers enregistrements :- limit
  |> limit(n:10)- nderniers enregistrements :- tail
  |> tail(n:10)- Trier les données : sort
  |> sort(columns: ["_value"], desc: true)- Renommer une colonne : rename
  |> rename(columns: {_value: "average", _time: "when"})- Supprimer des colonnes en sortie : drop
  |> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)- Sélection des colonnes en sortie : keep
  |> keep(columns: ["_value", "_time"])- Une simple requête Flux avant d’aller plus loin :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and
                       r._value > 80)
  |> sort(columns: ["_value"], desc: true)
  |> rename(columns: {_value: "average"})
  |> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
  |> limit(n:10)
  |> yield()Fenêtrage des données (Windowing)
- aggregateWindow
  |> aggregateWindow(
      every: 1m,
      fn: mean,
      column: "_value",
      timeSrc: "_stop",
      timeDst: "_time",
      createEmpty: true
)  |> aggregateWindow(every: 10m, fn: mean)Pour agréger sur une colonne différente de la colonne par défaut _value :
  |> aggregateWindow(every: 10m, fn: mean, column: "colname")Pour supprimer les valeurs NULL, définir createEmpty à False :
  |> aggregateWindow(every: 10m, fn: mean, createEmpty: false)- window
aggregateWindow est en fait une fonction de raccourci utilisant la fonction window.
  |> window(every: 10m)
  |> mean()createEmpty est défini à true pour afficher les valeurs nulles.
  |> window(every: 10m, createEmpty: true)
  |> mean()La colonne _time n’est alors plus aggrégée dans la table en sortie,
              pour ré-ajouter la colonne _time pour un traitement ultérieur, 
              la fonction duplicate est appelée pour dupliquer la colonne _start ou _stop
              en tant que nouvelle colonne  _time :
  |> window(every: 10m, createEmpty: true)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")Pour retrouver le format régulier, les données sont finalement "unwindowed" :
  |> window(every: 10m, createEmpty: true)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")
  |> window(every: inf)- fill
Optionnellement, utiliser la fonction fill pour gérer les valeurs vides lorsque createEmpty
            est défini à true lors du fenêtrage des données.
  |> fill(column: "_value", value: 0.0)   |> fill(column: "_value", usePrevious: true) - Fenêtrage par mois et années
Flux supporte le fenêtrage des données par mois et années calendaires : 1mo, 1yr. Cette fonctionnalité
            n’était pas possible avec InfluxQL.
  |> aggregateWindow(every: 1mo, fn: mean) Sélection de plages horaires
- hourSelection
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu"
  )
  |> hourSelection(start: 8, stop: 18)Jointures (join)
- join
datapcpu = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pcpu" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" )
  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
datapmem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pmem" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pmem" )
  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
join(
    tables: {pcpu:datapcpu, pmem:datapmem},
    on: ["_time", "host"],
    method: "inner"
  )
Seule la méthode inner est actuellement autorisée (v 2.0.4).
              Dans des versions futures, d’autres méthodes de jointures seront progressivement implémentées : cross, left, right, full.
La réalisation des jointures sur les colonnes _time contenant secondes, microsecondes, nanosecondes… 
              peut être résolue en normalisant les horodatages irréguliers à l’aide de la fonction truncateTimeColumn qui 
              tronque les valeurs _time à une unité spécifiée :
              
 |> truncateTimeColumn(unit: 1m)Pivot
- pivot
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" )
  |> pivot(
       rowKey:["_time"],
       columnKey: ["_field"],
       valueColumn: "_value"
    )Comme les jointures, utiliser la fonction truncateTimeColumn pour normaliser les timestamps.
Utiliser la fonction de raccourci schema.fieldsAsCols() pour pivoter les données uniquement sur time/_field : 
import "influxdata/influxdb/schema"
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" )
  |> schema.fieldsAsCols()Histogrammes
- histogram
from(bucket:"netdatatsdb/autogen")
  |> range(start: -3h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu"
  )
  |> histogram(
       bins:[0.0,10.0,20.0,30.0,40.0,50.0,60.0,70.0,80.0,90.0,100.0]
   )Utiliser linearBins pour faciliter la création de la liste des valeurs float linéairement séparés.
  |> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )Les valeurs float séparées logarithmiquement sont définies avec logarithmicBins
  |> histogram(
       bins: logarithmicBins(start: 1.0, factor: 2.0, count: 10,
                             infinity: true)
   )Par défaut, les valeurs sont cumulatives dans la colonne _value, appliquer la fonction difference
              pour remplacer le calcul cumulatif.
  |> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )
  |> difference()histogram             histogram + difference
le     _value        le     _value
-----  ------        -----  ------
50     292           50     269
60     536           60     241
70     784           70     247Colonnes calculées - Computed columns (map)
- map
  |> map(fn: (r) => ({ _value: r._value * 2.0 }))L’utilisation de base de la fonction map supprime les colonnes 
              qui ne font pas partie de la clé de groupe (_time, _start, _stop…)
              et qui ne sont pas explicitement "mappées", 
              utiliser l’opérateur with pour empêcher leur suppression  : 
  |> map(fn: (r) => ({ r with _value: r._value * 2.0 }))L’opérateur with met à jour la colonne si elle existe déjà, créé une nouvelle colonne si elle n’existe pas,
              et retourne toutes les colonnes existantes dans la table en sortie.
Fonctions d’agrégat personnalisées (reduce)
- reduce
      |> reduce(fn: (r, accumulator) => ({
      count: accumulator.count + 1,
      total: accumulator.total + r._value,
      avg: (accumulator.total + r._value) / float(v: accumulator.count)
    }),
    identity: {count: 1, total: 0.0, avg: 0.0}
  )identity définit les valeurs initiales ainsi que le type de données.
Une ligne unique avec les agrégats est retournée sans colonne _time.
Écrire des données
- to
  |> to(bucket:"history", org:"sqlpac")Pour écrire dans le même bucket, utiliser auparavant la fonction set pour définir le nom de la mesure : 
  |> set(key: "_measurement", value: "history_vpsmetrics")
  |> to(bucket:"netdatatsdb/autogen", org:"sqlpac")Sources de données SQL
- Informations métadonnées (username, password…)
Utiliser influx secret ou curl pour stocker les métadonnées :
$ influx secret update --key PG_HOST --value vpsfrsqlpac
$ influx secret update --key PG_PORT --value 5432
$ influx secret update --key PG_USER --value influxdb
$ influx secret update --key PG_PASS --value "***********"Extraire les "secrets" dans un script Flux à l’aide du package secret (vérifier l’autorisation read:secrets) :
import "influxdata/influxdb/secrets"
PG_HOST = secrets.get(key: "PG_HOST")
PG_USER = secrets.get(key: "PG_USER")
PG_PASS = secrets.get(key: "PG_PASS")
PG_PORT = secrets.get(key: "PG_PORT") - sql.from: extraire les données
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
datavps = sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
  query: "SELECT name, totalmemory FROM vps"
)Drivers disponibles avec InfluxDB v2.0.4 (plus à venir dans de futures releases) :
- awsathena
- bigquery
- mysql
- postgres
- snowflake
- sqlserver, mssql
- Joindre les données
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
datavps = sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
  query: "SELECT name, totalmemory FROM vps"
)
  |> rename(columns : {name: "host"})
datamem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vpsmetrics"
                       and r._field == "mem"
                       and r.host == "vpsfrsqlpac1")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
join(
    tables: {vps:datavps, mem:datamem},
    on: ["host"],
    method: "inner"
  )
  |> map(fn: (r) => ({ r with _value: (r._value / r.totalmemory) * 100.0 }))- sql.to: writing data
Vérifier la structure de la table recevant les données : types de données, noms des colonnes, null/not null.
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vps_pmem"
                       and r._field == "pmem"
                       and r.host == "vpsfrsqlpac2")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> rename(columns: {_value: "pmem", _time: "dth"})
  |> keep(columns: ["host", "dth", "pmem"])
  |> sql.to(
      driverName: "postgres",
      dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
      table: "vpspmem"
    )