Querying data
- Defining the data source (bucket - database) : 
from 
from(bucket: "netdatatsdb/autogen")
              - Time range, absolute or relative : 
range 
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h, stop: -10m)
              
from(bucket: "netdatatsdb/autogen")
  |> range(start: 2021-01-25T00:00:00Z, stop: 2021-01-29T23:59:00Z)
              - Filtering by measurement : 
filter 
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
             - Filtering by tag key :
 
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
  |> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
             - Filtering by field and field value :
 
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
  |> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
  |> filter(fn: (r) => r["_field"] == "pcpu")
  |> filter(fn: (r) => r["_value"] > 80)
             - Filters can be combined in one 
filterclause with theand/oroperators : 
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics" and
                       r["host"] == "vpsfrsqlpac1" and
                       r["_field"] == "pcpu" and r["_value"] > 80
  )  
             - Depending on preferences, dot notation is allowed :
 
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and r._value > 80
  )  
               - Displaying data : 
yield 
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and r._value > 80 )
  |> yield()   
            from and range clauses are removed for brevity.
- Using regular expressions :
 
  |> filter(fn: (r) => r.host =~ /^vpsfrsqlpac[1-8]$/ )
             
  |> filter(fn: (r) => r.host !~ /^vpsensqlpac[1-8]$/ )
             
              - Top 
nrecords :limit 
  |> limit(n:10)
              
              - Last 
nrecords :tail 
  |> tail(n:10)
             - Sorting data : 
sort 
  |> sort(columns: ["_value"], desc: true)
             - Renaming a column : 
rename 
  |> rename(columns: {_value: "average", _time: "when"})
    
             - Removing output columns : 
drop 
  |> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
             
             - Selecting output columns : 
keep 
  |> keep(columns: ["_value", "_time"])
             - A simple first Flux query before going further :
 
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and
                       r._value > 80)
  |> sort(columns: ["_value"], desc: true)
  |> rename(columns: {_value: "average"})
  |> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
  |> limit(n:10)
  |> yield()
            Windowing data
aggregateWindow
  |> aggregateWindow(
      every: 1m,
      fn: mean,
      column: "_value",
      timeSrc: "_stop",
      timeDst: "_time",
      createEmpty: true
)
                |> aggregateWindow(every: 10m, fn: mean)
              To aggregate on a different column than the default _value column :
  |> aggregateWindow(every: 10m, fn: mean, column: "colname")
              To remove the NULL values, set createEmpty to False :
  |> aggregateWindow(every: 10m, fn: mean, createEmpty: false)
              
              window
aggregateWindow is in fact a shortcut function using window function.
  |> window(every: 10m)
  |> mean()
              createEmpty is set to true to display null values
  |> window(every: 10m, createEmpty: true)
  |> mean()
              Column _time is then not aggregated in the output table, to re-add _time column for further processing,
              duplicate function is called to duplicate _start or _stop column as
              the new _time column :
  |> window(every: 10m, createEmpty: true)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")
              To recover the regular format, data are finally "unwindowed" :
  |> window(every: 10m, createEmpty: true)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")
  |> window(every: inf)
            fill
Optionally, use the fill function to handle empty values when createEmpty
            is defined to true when windowing data.
  |> fill(column: "_value", value: 0.0) 
              |> fill(column: "_value", usePrevious: true) 
            
            - Window by calendar months and years
 
Flux supports windowing data by calendar months and years : 1mo, 1yr. This feature was not possible with InfluxQL.
  |> aggregateWindow(every: 1mo, fn: mean) 
            Hour Selection
hourSelection
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu"
  )
  |> hourSelection(start: 8, stop: 18)
            Joins
join
datapcpu = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pcpu" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" )
  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
datapmem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pmem" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pmem" )
  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
join(
    tables: {pcpu:datapcpu, pmem:datapmem},
    on: ["_time", "host"],
    method: "inner"
  )
              Only the inner method is currently allowed (v 2.0.4). In future releases, other joins methods
              will be gradually implemented : cross, left, right, full.
Performing joins on _time columns storing seconds, microseconds, nanoseconds… 
              can be solved by normalizing irregular timestamps using truncateTimeColumn function which
              truncate all _time values to a specified unit :
              
 |> truncateTimeColumn(unit: 1m)
            Pivot
pivot
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" )
  |> pivot(
       rowKey:["_time"],
       columnKey: ["_field"],
       valueColumn: "_value"
    )
              Like joins, use the truncateTimeColumn function to normalize timestamps.
Use schema.fieldsAsCols() shortcut function when pivoting data only on time/_field : 
import "influxdata/influxdb/schema"
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" )
  |> schema.fieldsAsCols()
            Histograms
histogram
from(bucket:"netdatatsdb/autogen")
  |> range(start: -3h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu"
  )
  |> histogram(
       bins:[0.0,10.0,20.0,30.0,40.0,50.0,60.0,70.0,80.0,90.0,100.0]
   )
              Use linearBins to ease the creation of the list of
              linearly separated floats.
  |> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )
              Logarithmic floats are defined with logarithmicBins
  |> histogram(
       bins: logarithmicBins(start: 1.0, factor: 2.0, count: 10,
                             infinity: true)
   )
              By default, values are cumulative in the column _value, apply the difference function 
              to override the cumulative computation.
  |> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )
  |> difference()
              histogram             histogram + difference
le     _value        le     _value
-----  ------        -----  ------
50     292           50     269
60     536           60     241
70     784           70     247
            Computed columns (map)
map
  |> map(fn: (r) => ({ _value: r._value * 2.0 }))
              The basic usage of the map function removes the columns that are not part of
              the input table’s group key (_time, _start, _stop…) and not explicitly mapped,
              use the with operator to avoid their removal : 
  |> map(fn: (r) => ({ r with _value: r._value * 2.0 }))
              The with operator updates a column if it already exists, creates a new column if it doesn’t exist,
              and includes all existing columns in the output table.
Custom aggregate functions (reduce)
reduce
      |> reduce(fn: (r, accumulator) => ({
      count: accumulator.count + 1,
      total: accumulator.total + r._value,
      avg: (accumulator.total + r._value) / float(v: accumulator.count)
    }),
    identity: {count: 1, total: 0.0, avg: 0.0}
  )
              identity defines the initial values as well as the data types.
A single row storing the aggregates is returned with no _time column.
Writing data
to
  |> to(bucket:"history", org:"sqlpac")
              To write in the same bucket, use before set function to define the measurement name : 
  |> set(key: "_measurement", value: "history_vpsmetrics")
  |> to(bucket:"netdatatsdb/autogen", org:"sqlpac")
            SQL Data sources
- Metadata informations (username, password…)
 
Use influx secret or curl to store metadata :
$ influx secret update --key PG_HOST --value vpsfrsqlpac
$ influx secret update --key PG_PORT --value 5432
$ influx secret update --key PG_USER --value influxdb
$ influx secret update --key PG_PASS --value "***********"
              Retrieving secrets in a Flux script using secret package (check the authorization read:secrets) :
import "influxdata/influxdb/secrets"
PG_HOST = secrets.get(key: "PG_HOST")
PG_USER = secrets.get(key: "PG_USER")
PG_PASS = secrets.get(key: "PG_PASS")
PG_PORT = secrets.get(key: "PG_PORT") 
              
              sql.from: retrieving data
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
datavps = sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
  query: "SELECT name, totalmemory FROM vps"
)
           Available drivers in InfluxDB v2.0.4 (more to come in future releases) :
- awsathena
 - bigquery
 - mysql
 - postgres
 - snowflake
 - sqlserver, mssql
 
- Joining data
 
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
datavps = sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
  query: "SELECT name, totalmemory FROM vps"
)
  |> rename(columns : {name: "host"})
datamem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vpsmetrics"
                       and r._field == "mem"
                       and r.host == "vpsfrsqlpac1")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
join(
    tables: {vps:datavps, mem:datamem},
    on: ["host"],
    method: "inner"
  )
  |> map(fn: (r) => ({ r with _value: (r._value / r.totalmemory) * 100.0 }))
              
              sql.to: writing data
Check the table structure receiving data : data types, column names, null/not null.
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vps_pmem"
                       and r._field == "pmem"
                       and r.host == "vpsfrsqlpac2")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> rename(columns: {_value: "pmem", _time: "dth"})
  |> keep(columns: ["host", "dth", "pmem"])
  |> sql.to(
      driverName: "postgres",
      dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
      table: "vpspmem"
    )