InfluxDB - Flux language, advanced features

Logo

Introduction

The new Flux language introduced in InfluxDB v2 addresses many InfluxQL language limitations.

Let’s see some of the features now possible using Flux language on time series data : joins, pivots, histograms… In most topics, real use cases are discussed : joining data when timestamps differ by a few seconds, simulating outer joins waiting for the new join methods to come (left, right…), building histograms without cumulative data but the difference…

Flux Advanced features - thumbnail

Using SQL data sources in Flux language is not addressed here, this topic deserves a dedicated article : InfluxDB v2, Flux language and SQL databases

InfluxDB v2, Quick reminders

Output format

It’s important to understand the InfluxDB v2 raw data output format, different than the v1 output format. In the output below, filter is set only on the measurement name, filters on tag keys and on field keys are not applied.

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> yield()
table _start               _stop                  _time                           _value  _field  _measurement  host          location
----- -------------------- ---------------------  ------------------------------  ------  ------  ------------  ------------  --------
    0 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:06.446501067Z    1182     mem    vpsmetrics  vpsfrsqlpac1  france
    0 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:16.604175869Z     817     mem    vpsmetrics  vpsfrsqlpac1  france
…
    1 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:06.446501067Z      62    pcpu    vpsmetrics  vpsfrsqlpac1  france
    1 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:16.604175869Z      66    pcpu    vpsmetrics  vpsfrsqlpac1  france
…
    2 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:07.420674651Z     429     mem    vpsmetrics  vpsfrsqlpac2  france
    2 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:17.176860469Z     464     mem    vpsmetrics  vpsfrsqlpac2  france
…
    3 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:07.420674651Z      29    pcpu    vpsmetrics  vpsfrsqlpac2  france
    3 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:17.176860469Z      32    pcpu    vpsmetrics  vpsfrsqlpac2  france
  • A table identifier is applied on each results set.
  • Range time is fully described by the columns _start and _stop.
  • The measurement name is in the column _measurement.
  • The field key and its value are respectively in the columns _field and _value.
  • Tag keys columns are displayed at the end.

Columns can be removed using the drop or keep functions, we may not want all the columns in the raw data ouput format.

 |> keep(columns: ["_value", "_time"])
 |> drop(fn: (column) => column =~ /^_(start|stop)/)

Columns are renamed using rename function :


 |> keep(columns: ["_value", "_time"])
 |> rename(columns: {_value: "pcpu", _time: "when"})

Querying data

In this article, Flux queries are executed with one of the 2 following methods :

  • influx client command line.
  • Integrated InfluxDB v2 GUI https://<host-influxdb>:8086 (formerly Chronograf in InfluxDB v1 Tick stack suite).

influx client

$ influx query --file query.flux

To be able to use influx client, config (url, token, org…) is defined before.

$ export INFLUX_CONFIGS_PATH=/sqlpac/influxdb/srvifx2sqlpac/configs
$ export INFLUX_ACTIVE_NAME=default
/sqlpac/influxdb/srvifx2sqlpac/configs :
[default]
  url = "https://vpsfrsqlpac:8086"
  token = "K2YXbGhIJIjVhL…"
  org = "sqlpac"
  active = true

InfluxDB v2 GUI

In the InfluxDB GUI : use "View Raw Data" toggle button to display queries raw results sets.

InfluxDB GUI Query

Joins

In the use case, cpu and memory used percentages per host are stored respectively in vps_cpu and vps_pmem measurements :

vps_pcpu
_measurement   _time                             _field    _value    host
------------   ------------------------------    ------    ------    ------------
vps_pcpu       2021-03-01T17:22:10.009239717Z    pcpu      63        vpsfrsqlpac1
vps_pcpu       2021-03-01T17:23:10.459116034Z    pcpu      73        vpsfrsqlpac1
vps_pcpu       2021-03-01T17:24:10.938936764Z    pcpu      57        vpsfrsqlpac1
vps_pmem
_measurement   _time                             _field    _value    host
------------   ------------------------------    ------    ------    ------------
vps_pmem       2021-03-01T17:22:10.095918231Z    pmem      26.8      vpsfrsqlpac1
vps_pmem       2021-03-01T17:23:10.575832095Z    pmem      29.725    vpsfrsqlpac1
vps_pmem       2021-03-01T17:24:11.011812310Z    pmem      22.325    vpsfrsqlpac1

Reading the documentation, one may conclude : it is easy to join data with the join function !

datapcpu = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pcpu" )
  |> filter(fn: (r) => r.host == "vpsfrsqlpac1" )
  |> keep(columns: ["_measurement","_time","_field","_value","host"])

datapmem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pmem" )
  |> filter(fn: (r) => r.host == "vpsfrsqlpac1" )
  |> keep(columns: ["_measurement","_time","_field","_value","host"])

join(
    tables: {pcpu:datapcpu, pmem:datapmem},
    on: ["_time","host"],
    method: "inner"
)
  |> keep(columns: ["_value_pmem", "_value_pcpu", "_time"])

Executing the query : no results ! Why ?

Normalizing irregular timestamps

In real systems, joining on _time columns is always problematic. The column _time usually stores seconds, milliseconds, microseconds… which are very slightly different to join 2 points.

First solution : use truncateTimeColumn function to normalize irregular timestamps. This function truncates all _time values to a specified unit. In the above example, _time values are truncated to the minute : join works.

datapcpu = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pcpu" )
  |> filter(fn: (r) => r.host == "vpsfrsqlpac1" )
  |> keep(columns: ["_measurement","_time","_field","_value","host"])
  |> truncateTimeColumn(unit:1m)

datapmem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pmem" )
  |> filter(fn: (r) => r.host == "vpsfrsqlpac1" )
  |> keep(columns: ["_measurement","_time","_field","_value","host"])
  |> truncateTimeColumn(unit:1m)

join(
    tables: {pcpu:datapcpu, pmem:datapmem},
    on: ["_time","host"],
    method: "inner"
)
  |> keep(columns: ["_value_pmem", "_value_pcpu", "_time"])
                    _time:time             _value_pcpu:float             _value_pmem:float
------------------------------  ----------------------------  ----------------------------
2021-03-01T17:22:00.000000000Z                            63                          26.8
2021-03-01T17:23:00.000000000Z                            73                        29.725
2021-03-01T17:24:00.000000000Z                            57                        22.325

If technically and functionnally acceptable, the aggregateWindow function is another alternative solution.

datapcpu = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pcpu" )
  |> filter(fn: (r) => r.host == "vpsfrsqlpac1" )
  |> keep(columns: ["_measurement","_time","_field","_value","host"])
  |> aggregateWindow(every: 1m, fn: mean)

datapmem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pmem" )
  |> filter(fn: (r) => r.host == "vpsfrsqlpac1" )
  |> keep(columns: ["_measurement","_time","_field","_value","host"])
  |> aggregateWindow(every: 1m, fn: mean)

join(
    tables: {pcpu:datapcpu, pmem:datapmem},
    on: ["_time","host"],
    method: "inner"
)
  |> keep(columns: ["_value_pmem", "_value_pcpu", "_time"])
                    _time:time             _value_pcpu:float             _value_pmem:float
------------------------------  ----------------------------  ----------------------------
2021-03-01T17:23:00.000000000Z                            63                          26.8
2021-03-01T17:24:00.000000000Z                            73                        29.725
2021-03-01T17:25:00.000000000Z                            57                        22.325

Joins methods and limitations

In the previous use case, join is performed using the inner method.

join(
    tables: {pcpu:datapcpu, pmem:datapmem},
    on: ["_time","host"],
    method: "inner"
)

Only the inner method is currently allowed (v 2.0.4). In future releases, other joins methods will be gradually implemented : cross, left, right, full.

Another limitation : joins currently must only have two parents. This limit should be also extended in future releases.

How to manage outer joins waiting the other methods to be ready ?

The combination of the aggregateWindow and fill functions is a potential workaround to manage outer joins for the moment. The fill function replaces the NULL values by a default value, the default value applied will depend on the technical context and desired rendering (charts…).

Previous valueFixed value
  |> aggregateWindow(every: 1m, fn: mean)
  |> fill(column: "_value", usePrevious: true)
  |> aggregateWindow(every: 1m, fn: mean)
  |> fill(column: "_value", value: 0.0)
datapcpu = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pcpu" )
  |> filter(fn: (r) => r.host == "vpsfrsqlpac1" )
  |> keep(columns: ["_measurement","_time","_field","_value","host"])
  |> aggregateWindow(every: 1m, fn: mean)
  |> fill(column: "_value", value: 0.0)

datapmem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pmem" )
  |> filter(fn: (r) => r.host == "vpsfrsqlpac1" )
  |> keep(columns: ["_measurement","_time","_field","_value","host"])
  |> aggregateWindow(every: 1m, fn: mean)
  |> fill(column: "_value", value: 0.0)

join(
    tables: {pcpu:datapcpu, pmem:datapmem},
    on: ["_time","host"]
)
  |> keep(columns: ["_value_pmem", "_value_pcpu", "_time"])
                    _time:time             _value_pcpu:float             _value_pmem:float
------------------------------  ----------------------------  ----------------------------
2021-03-01T18:23:00.000000000Z                            76                            26
2021-03-01T18:24:00.000000000Z                            63                         24.65
2021-03-01T18:25:00.000000000Z                            54                           0.0
2021-03-01T18:26:00.000000000Z                            69                           0.0
2021-03-01T18:27:00.000000000Z                            61                           0.0
2021-03-01T18:28:00.000000000Z                            79                          23.4
2021-03-01T18:29:00.000000000Z                            65                          21.8

Pivot

Default output format is really not easy for some specific further processings (writing to another measurement, to a SQL database…).

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pcpumem" and
                       r.host == "vpsfrsqlpac1" )
  |> keep(columns: ["_measurement","_time","_field","_value","host"])
          Table: keys: [_field, _measurement, host]
     _field:string     _measurement:string             host:string                      _time:time         _value:float
------------------  ----------------------  ----------------------  ------------------------------  -------------------
               mem             vps_pcpumem            vpsfrsqlpac1  2021-03-01T18:50:46.919579473Z                 1055
               mem             vps_pcpumem            vpsfrsqlpac1  2021-03-01T18:51:47.359821389Z                 1069
                 …                       …                       …                               …                    …
Table: keys: [_field, _measurement, host]
     _field:string     _measurement:string             host:string                      _time:time         _value:float
------------------  ----------------------  ----------------------  ------------------------------  -------------------
              pcpu             vps_pcpumem            vpsfrsqlpac1  2021-03-01T18:50:46.919579473Z                   68
              pcpu             vps_pcpumem            vpsfrsqlpac1  2021-03-01T18:51:47.359821389Z                   57
                 …                       …                       …                               …                    …

Pivoting data to format output was a major missing feature in InfluxQL. Feature ready and easy to use in Flux language :

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pcpumem" and
                       r.host == "vpsfrsqlpac1")
  |> keep(columns: ["_measurement","_time","_field","_value","host"])
  |> pivot(
       rowKey:["_time"],
       columnKey: ["_field"],
       valueColumn: "_value"
    )
  Table: keys: [_measurement, host]
   _measurement:string      host:string                      _time:time            mem:float             pcpu:float
----------------------  ---------------  ------------------------------  -------------------  ---------------------
           vps_pcpumem     vpsfrsqlpac1  2021-03-01T18:50:46.919579473Z                 1055                     68
           vps_pcpumem     vpsfrsqlpac1  2021-03-01T18:51:47.359821389Z                 1069                     57
                     …                …                               …                    …                      …

Like joins, if needed use the truncateTimeColumn function to normalize timestamps.

Use schema.fieldsAsCols() shortcut function when pivoting data only on time/_field :

import "influxdata/influxdb/schema"

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pcpumem" and
                       r.host == "vpsfrsqlpac1" )
  |> keep(columns: ["_measurement","_time","_field","_value","host"])
  |> schema.fieldsAsCols()

Histograms

Producing histograms by a query is made simple in the Flux language with histogram and linearBins functions. linearBins function eases the creation of the list of linearly separated floats ([0.0,5.0,10.0,15.0,…]).

from(bucket:"netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vps_pcpumem" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu"
  )
  |> histogram(
       bins: linearBins(start: 0.0, width: 5.0, count: 21, infinity: false)
   )
  |> keep(columns: ["le","_value"])
                         le:float                  _value:float
----------------------------  ----------------------------
                           0                             0
                           5                             0
                          10                             0
                          15                             0
                          20                             0
                          25                             0
                          30                             0
                          35                             0
                          40                            42
                          45                           206
                          50                           389
                          55                           547
                          60                           716
                          65                           910
                          70                          1085
                          75                          1260
                          80                          1427
                          85                          1427
                          90                          1427
                          95                          1427
                         100                          1427

A column named le (less or equals) is automatically created, in the _value column data are cumulative. Apply the difference function to cancel the cumulative mode and thus build a typical histogram

  |> histogram(
       bins: linearBins(start: 0.0, width: 5.0, count: 21, infinity: false)
   )
  |> difference()
  |> keep(columns: ["le","_value"])
                         le:float                  _value:float
----------------------------  ----------------------------
                           5                             0
                          10                             0
                          15                             0
                          20                             0
                          25                             0
                          30                             0
                          35                             0
                          40                            42
                          45                           161
                          50                           183
                          55                           156
                          60                           172
                          65                           195
                          70                           175
                          75                           175
                          80                           169
                          85                             0
                          90                             0
                          95                             0
                         100                             0

Logarithmic mode is performed with logarithmicBins function.

  |> histogram(
       bins: logarithmicBins(start: 1.0, factor: 2.0, count: 10,
                           infinity: false)

histogram function is of interest when we need to build histograms by programming for further processing. InfluxDB GUI or Grafana does not need the histogram function.

In InfluxDB GUI, to visualize an histogram :

  • The query source without histogram function call is used.
    from(bucket:"netdatatsdb/autogen")
      |> range(start: -1d)
      |> filter(fn: (r) => r._measurement == "vps_pcpumem" and
                           r.host == "vpsfrsqlpac1" and
                           r._field == "pcpu"
      )
  • Select "Histogram" graph type and click on the button "Customize".
  • Define the X column (_value), the bins (20) and the X max value (100).
InfluxDB GUI - Histogram

In Grafana, same methodology, the query source without histogram function call is used.

Grafana - Histogram

Computed columns - map

Computed columns are created with map function. map function is usually called after a join or a pivot operation.

In the example below, vps_space measurement stores two fields per point : used space (used) and available space (available). In the final result set, we want to compute the space used percentage in a column named pctspace :

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vps_space" and
                       r.host == "vpsfrsqlpac1" and
                       (r._field == "used" or r._field == "available")
  )
  |> pivot(
       rowKey:["_time"],
       columnKey: ["_field"],
       valueColumn: "_value" 
  )
  |> map(fn: (r) => ({ r with pctspace: (r.used / (r.used + r.available)) * 100.0 }))
  |> keep(columns: ["_time", "pctspace"])
                    _time:time                pctspace:float
------------------------------  ----------------------------
2021-03-01T17:51:47.437958334Z             73.84712268752916
2021-03-01T17:52:47.900410821Z             73.84713257521788
2021-03-01T17:53:48.389130505Z              73.8471523505953

The with operator is very important when calling map function. In the default basic syntax (without the with operator), columns that are not part of the input table’s group key (_time, _start, _stop…) and not explicitly mapped are removed :

  |> map(fn: (r) => ({ pctspace: (r.used / (r.used + r.available)) * 100.0 }))
Table: keys: []
              pctspace:float
----------------------------
           73.84734021668079
           73.84734021668079

The with operator updates a column if it already exists, creates a new column if it doesn’t exist, and includes all existing columns in the output table.

Cross computed columns

Computed columns referencing computed columns can not be defined in one unique map function call :

  |> map(fn: (r) => ({ r with totalspace: r.used + r.available, pctspace: (r.used / r.totalspace) * 100.0 }))
|> keep(columns: ["_time", "used", "totalspace","pctspace"])
                    _time:time                pctspace:float              totalspace:float                    used:float
------------------------------  ----------------------------  ----------------------------  ----------------------------
2021-03-02T18:33:36.041221773Z                                                    40454348                      29889216
2021-03-02T18:34:36.532166008Z                                                    40454348                      29889220

In this specific context, map function must be called sequentially :

  |> map(fn: (r) => ({ r with totalspace: r.used + r.available }))
  |> map(fn: (r) => ({ r with pctspace: (r.used / r.totalspace) * 100.0  }))
  |> keep(columns: ["_time", "used", "totalspace","pctspace"])
                    _time:time                pctspace:float              totalspace:float                    used:float
------------------------------  ----------------------------  ----------------------------  ----------------------------
2021-03-02T18:33:36.041221773Z             73.88386533877645                      40454348                      29889236
2021-03-02T18:34:36.532166008Z             73.88388511415386                      40454348                      29889244

Conditions, exists

Conditions can be defined in map function and the exists operator is available :

import "math"
  …
  |> map(fn: (r) => ({ r with totalspace: r.used + r.available }))
  |> map(fn: (r) => ({ r with pctspace: (r.used / r.totalspace) * 100.0  }))
  |> aggregateWindow(every: 1m, fn: mean, column: "pctspace")
  |> map(fn: (r) => ({ r with pctspace: 
                          if exists r.pctspace then "${string(v: math.round(x: r.pctspace) )} %"
                          else "N/A"
                     }))
  |> keep(columns: ["_time", "used","pctspace"])
                    _time:time         pctspace:string
------------------------------  ----------------------
2021-03-02T14:35:00.000000000Z                    74 %
2021-03-02T14:36:00.000000000Z                     N/A

Custom aggregate functions - reduce

mean, max, min, sum, count… Flux functions are good shortcuts to aggregates, but unfortunately only one aggregate can be returned.

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vps_space" and
                       r.host == "vpsfrsqlpac1" and
                       (r._field == "used" or r._field == "available")
  )
  |> pivot(
       rowKey:["_time"],
       columnKey: ["_field"],
       valueColumn: "_value"
  )
  |> count(column: "used")
                  used:int
--------------------------
                      1428

Multiple custom aggregate results can be extracted using reduce function which returns a unique row storing the aggregates with no _time column.

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vps_space" and
                       r.host == "vpsfrsqlpac1" and
                       (r._field == "used" or r._field == "available")
  )
  |> pivot(
       rowKey:["_time"],
       columnKey: ["_field"],
       valueColumn: "_value"
     )
  |> map(fn: (r) => ({ r with totalspace: r.used + r.available }))
  |> map(fn: (r) => ({ r with pctspace: (r.used / r.totalspace) * 100.0  }))
  |> reduce(fn: (r, accumulator) => (
     {
       count: accumulator.count + 1,
       max: if r.pctspace > accumulator.max then r.pctspace else accumulator.max
     }),
     identity: {count: 0, max: 0.0}
    )
  |> keep(columns: ["count","max"])
                   count:int                     max:float
----------------------------  ----------------------------
                        1428             73.93648761809237

identity defines the initial values as well as the data types (0 for an integer, 0.0 for a float). Like map function, conditions and exists operator are allowed.

One may comment : what a stupid example ! The reduce function computes aggregates available with native Flux functions, why reinventing the wheel ?

The reduce function indeed should be used for custom aggregates computations for which no native function exists. The above example can be rewritten using native functions count and max, a join is performed :

dataspace = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  …

datacnt = dataspace
  |> count(column:"pctspace")
  |> rename(columns: {"pctspace": "count"})

datamax = dataspace
  |> max(column:"pctspace")
  |> rename(columns: {"pctspace": "max"})

join(
    tables: {cnt: datacnt, max: datamax},
    on: ["_measurement"]
 )
 |> keep(columns: ["count", "max"])
 |> yield()
                 count:int                     max:float
--------------------------  ----------------------------
                      1428             73.93870246036347

The above example would have been the following InfluxQL query with a subquery :

SELECT count(pctspace), max(pctspace)
FROM (SELECT (used / (used + available)) * 100 as pctspace
      FROM "netdatatsdb.autogen.vps_space"
      WHERE (host='vpsfrsqlpac1' and time > now() -1d)
)

Significant calendar functions enhancements

hourSelection

Selecting specified hours of the day was not possible using InfluxQL. Flux language supports DatePart-like queries with the function hourSelection returning only data with time values in a specified hour range.

from(bucket: "netdatatsdb/autogen")
  |> range(start: -72h)
  |> filter(fn: (r) => r._measurement == "vps_pcpumem" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu"
  )
  |> hourSelection(start: 8, stop: 18)
  |> aggregateWindow(every: 1h, fn: mean)

Windowing data by month(s) and year(s)

Flux language supports windowing data by calendar months (mo) and years (yr) : 1mo, 3mo, 1yr… This feature was not either possible with InfluxQL. Useful for producing KPI (Key Performance Indicators) per months, quarters, semesters, years.

 |> aggregateWindow(every: 1mo, fn: mean)