InfluxDB - Moving from InfluxQL to Flux language

Logo

Introduction

The time-series database InfluxDB v2 has been released in november 2020. A paper has been published about how to migrate from InfluxDB v1 to InfluxDB v2 - InfluxDB - Migration to version 2.

InfluxQL (Influx Query Language), the InfluxDB v 1.x SQL Like language, is still supported in InfluxDB v2 for backward compatibility, but only for InfluxDB 1.x users with authentication migrated to InfluxDB v2.

Flux is now the native language in InfluxDB v2. Flux language unifies queries and ETL processings (InfluxQL, TickScripts).

InfluxQL to Flux

When we are used to SQL, moving from InfluxQL to Flux seems a little bit difficult, but not really. Bear in mind that Flux language brings features that InfluxQL could not cover (joins, pivots, external data sources…).

In this paper, a brief overview about migrating the existing InfluxQL queries to Flux.

A quick reference guide is also available on SQLPAC :

Quick reminders

Measurements and series

In the time series database InfluxDB, the format of a point is the following :

measurement[,tag=value[,tag=value]] field=value[,field=value] [<timestamp>]
vpsmetrics,location=france,host=vpsfrsqlpac1 pcpu=49,mem=877 1580918550000000000
vpsmetrics,location=france,host=vpsfrsqlpac2 pcpu=22,mem=455 1580918550000000000

Series are the combination measurement/possible tag keys :

measurement, tag key1=value1, tag key2=value2 [,...]
vpsmetrics,location=france,host=vpsfrsqlpac1
vpsmetrics,location=france,host=vpsfrsqlpac2
  • Data are overwritten if the point already exists.
  • The server’s timestamp is used if omitted.
  • Data types can be forced when writing the first point : value=25i for an integer datatype, status=t|f for a boolean datatype.

v1 databases and retention policies, v2 buckets

A v1 database/retention policy is a bucket in version 2. An organization is mandatory and initialized at upgrade or at creation.

v1 DBRP vs v2 Buckets

A first Flux query

Every Flux query starts with the from clause selecting the bucket where the measurement is stored.

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
SELECT …
FROM "netdatatsdb"."autogen".…

Time range, range

The time range is then applied using the range clause. Time range can be absolute or relative, RFC3339 timestamps format is used for absolute times. The operator |> adds clauses/functions.

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d, stop: -5m)


from(bucket: "netdatatsdb/autogen")
 |> range(start: 2021-02-05T00:00:00Z,
          stop: 2021-02-05T23:59:00Z)
SELECT …
FROM "netdatatsdb"."autogen".…
WHERE (time > now() -1d)
                
SELECT …
FROM "netdatatsdb"."autogen".…
WHERE (time > now() -1d and time < now() -5m)
                
SELECT …
FROM "netdatatsdb"."autogen".…
WHERE (time > '2021-02-05T00:00:00Z'
         and time < '2021-02-05T23:59:00Z')

Filtering, filter

Filtering on measurements, tag keys, field keys… are performed using filter.

To filter on the measurement vpsmetrics :

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
SELECT …
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (time > now() -1d)

Dot notation is possible :

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")

Dot notation is used in the next sections. This choice depends on the coding preferences.

To add filters on tag keys (host…) :

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
SELECT …
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (time > now() -1d
       and host = 'vpsfrsqlpac1')

To add filters on field keys (pcpu…) :

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
SELECT pcpu
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (time > now() -1d
       and host = 'vpsfrsqlpac1')

Getting results, yield

Use yield to get the results, optionally with a name.

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> yield(name:"pcpuvps1")
SELECT pcpu
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (time > now() -1d
       and host = 'vpsfrsqlpac1')

Great, we get our first Flux query :

Chronograf first Flux query

Regular expressions

Compared to InfluxQL, basic regular expressions are used in Flux queries in the same way. Nothing new, except the new package regexp in InfluxDB v2, not covered here as this paper only focuses on migrating InfluxQL code to Flux code.

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host =~ /^vps/
                      and r.host !~ /(de|uk|us)/)
 |> filter(fn: (r) => r._field == "pcpu")
SELECT pcpu 
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (time > now() -1d
       and host =~ /^vps/
       and host !~ /(de|uk|us)/)

InfluxDB v2 output format

It’s important to understand the InfluxDB v2 raw data output format, different than the v1 output format. In the output below, filter is set only on the measurement name, filters on tag keys and on field keys are not applied.

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> yield()
table _start               _stop                  _time                           _value  _field  _measurement  host          location
----- -------------------- ---------------------  ------------------------------  ------  ------  ------------  ------------  --------
    0 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:06.446501067Z    1182     mem    vpsmetrics  vpsfrsqlpac1  france
    0 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:16.604175869Z     817     mem    vpsmetrics  vpsfrsqlpac1  france
…
    1 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:06.446501067Z      62    pcpu    vpsmetrics  vpsfrsqlpac1  france
    1 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:16.604175869Z      66    pcpu    vpsmetrics  vpsfrsqlpac1  france
…
    2 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:07.420674651Z     429     mem    vpsmetrics  vpsfrsqlpac2  france
    2 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:17.176860469Z     464     mem    vpsmetrics  vpsfrsqlpac2  france
…
    3 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:07.420674651Z      29    pcpu    vpsmetrics  vpsfrsqlpac2  france
    3 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z   2021-02-05T03:00:17.176860469Z      32    pcpu    vpsmetrics  vpsfrsqlpac2  france
  • A table identifier is applied on each results set.
  • Range time is fully described by the columns _start and _stop.
  • The measurement name is in the column _measurement.
  • The field key and its value are respectively in the columns _field and _value.
  • Tag keys columns are displayed at the end.

Columns can be removed using the drop or keep functions, we may not want all the columns in the raw data ouput format :

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> keep(columns: ["_value", "_time"])
 |> yield(name:"pcpuvps1")
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> drop(fn: (column) => column =~ /^_(start|stop)/)
 |> yield(name:"pcpuvps1")

Columns are renamed using rename function :

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> keep(columns: ["_value", "_time"])
 |> rename(columns: {_value: "pcpu", _time: "when"})
 |> yield(name:"pcpuvps1")
table when                            pcpu
----- ------------------------------  ----
    0 2021-02-05T01:18:47.116766147Z    72
    0 2021-02-05T01:18:57.360131309Z    51

Windowing data, aggregateWindow

The most important work when migrating InfluxQL queries to Flux syntax : the data windowing queries with aggregates. Time series databases are designed to this purpose.

The translation is quite easy with the function aggregateWindow. The example below computes the mean value with the mean function, the other aggregate functions are obviously available (count, sum…)

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> aggregateWindow(every: 1h, fn: mean )
SELECT mean(pcpu)
FROM "netdatatsdb"."autogen"."vpsmetrics"
WHERE (host ='vpsfrsqlpac1' AND time > now() -1d)
GROUP BY time(1h)
… _time                             _value     _field   _measurement …
… --------------------   -----------------  ---------   ------------
  2021-02-05T06:00:00Z                           pcpu   vpsmetrics
  2021-02-05T07:00:00Z                           pcpu   vpsmetrics
  2021-02-05T08:00:00Z   61.70815450643776       pcpu   vpsmetrics
  2021-02-05T09:00:00Z   60.55806451612903       pcpu   vpsmetrics
  2021-02-05T10:00:00Z   60.01699716713881       pcpu   vpsmetrics
…

To remove the NULL values, the argument createEmpty is set to FALSE in the aggregateWindow function.

FluxInfluxQL
…
 |> aggregateWindow(every: 1h, fn: mean,
                    createEmpty: false )
SELECT mean(pcpu)
…
GROUP BY time(1h)
FILL(none)

To apply a default value for empty data, the fill function with the argument value is called after windowing data :

FluxInfluxQL
…
 |> aggregateWindow(every: 1h, fn: mean,
                    createEmpty: true )
 |> fill(column: "_value", value: 0.0)
SELECT mean(pcpu)
…
GROUP BY time(1h)
FILL(0.0)

To apply the previous non null value for empty data, the fill function with the argument usePrevious is called after windowing data :

FluxInfluxQL
…
 |> aggregateWindow(every: 1h, fn: mean,
                    createEmpty: true )
 |> fill(column: "_value", usePrevious: true)
SELECT mean(pcpu)
…
GROUP BY time(1h)
FILL(previous)

Behind the scene, the aggregateWindow function is not really a native function, this function performs steps involving especially the window and mean functions :

…
 |> aggregateWindow(every: 1h, fn: mean,
                    createEmpty: true )
…
 |> window(every: 1h, createEmpty: true)
 |> mean()
 |> duplicate(column: "_stop", as: "_time")
 |> window(every: inf)

The column _stop is duplicated to re-add the column _time removed by the mean function call.

From subqueries

Subqueries are frequently used in InfluxQL to compute aggregates on "windowed" data. The translation to Flux language is quite straightforward for basic subqueries :

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vpsmetrics")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "pcpu")
 |> aggregateWindow(every: 1h, fn: mean )
 |> max()

SELECT max("mean")
FROM (
   SELECT mean(pcpu)
   FROM "netdatatsdb"."autogen"."vpsmetrics"
   WHERE (host ='vpsfrsqlpac1' AND time > now() -1d)
   GROUP BY time(1h)
)

When computed columns are created using InfluxQL subqueries, pivot and map Flux functions are combined, syntax a little bit more complicated. Pivoting data is mandatory to align fields within each input table that have the same timestamp, the map function creates the computed column :

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vps_space")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "used"
                      or r._field == "available")
 |> pivot(
       rowKey:["_time","host"],
       columnKey: ["_field"],
       valueColumn: "_value"
    )
 |> map(fn: (r) => (
        { r with pctspace:
          (r.used / (r.used + r.available)) * 100.0
        }
    ))
 |> max(column: "pctspace")

SELECT max(pctspace)
FROM (
   SELECT (used / (used + available)) * 100
            AS pctspace
   FROM "netdatatsdb"."autogen"."vps_space"
   WHERE (host ='vpsfrsqlpac1' AND time > now() -1d)
)

Alternatively, the pivot function can be replaced by the function schema.fieldsAsCols. schema.fieldsAsCols is a special application of the pivot function that pivots on _field and _time columns to align fields.

FluxInfluxQL
import "influxdata/influxdb/schema"

from(bucket: "netdatatsdb/autogen")
 |> range(start: -1d)
 |> filter(fn: (r) => r._measurement == "vps_space")
 |> filter(fn: (r) => r.host == "vpsfrsqlpac1")
 |> filter(fn: (r) => r._field == "used"
                      or r._field == "available")
 |> schema.fieldsAsCols()
 |> map(fn: (r) => (
        { r with pctspace:
          (r.used / (r.used + r.available)) * 100.0
        }
    ))
 |> max(column: "pctspace")

SELECT max(pctspace)
FROM (
   SELECT (used / (used + available)) * 100
            AS pctspace
   FROM "netdatatsdb"."autogen"."vps_space"
   WHERE (host ='vpsfrsqlpac1' AND time > now() -1d)
)

Copying data

In InfluxDB v1.x, SELECT INTO statements are used to copy data from one measurement to another.

In InfluxDB v2, use the function to().

Notice in the below example the set function to change the measurement name in order to copy the data into the same bucket.

FluxInfluxQL
from(bucket: "netdatatsdb/autogen")
  |> range(start: -10d)
  |> filter(fn: (r) => r._measurement == "vpsmetrics")
  |> filter(fn: (r) => r._field == "value")
  |> aggregateWindow(every: 1h, fn: mean,
                     createEmpty: false)
  |> set(key: "_measurement", value: "vpsmetrics1h")
  |> to(org: "sqlpac", bucket: "netdatatsdb/autogen")
SELECT mean(value)
INTO netdatatsdb.autogen.vpsmetrics1h
FROM netdatatsdb.autogen.vpsmetrics
WHERE time >= now() - 10d
GROUP BY time(1h) FILL(none)

Translation tool, influx client transpile option

Not sure it is really the case, but it seems InfluxQL queries are translated to Flux queries to ensure InfluxQL queries support in InfluxDB v2 engines. The influx client transpile option displays translations, a such mechanism exists. A good help and a good starting point when migrating InfluxQL queries to Flux syntax, just some adjustments to apply (range time…) and then testings.

influxdb$ influx transpile \
             'SELECT pcpu FROM netdatatsdb.autogen.vpsmetrics WHERE (time > now() -1d and host =~ /^vps/ and host !~ /(de|uk|us)/)'
package main
from(bucket: "netdatatsdb/autogen")
        |> range(start: 2021-02-04T17:16:31.295656071Z, stop: 2262-04-11T23:47:16.854775806Z)
        |> filter(fn: (r) =>
                (r._measurement == "vpsmetrics" and r._field == "pcpu"))
        |> filter(fn: (r) =>
                (r["host"] =~ /^vps/ and r["host"] !~ /(de|uk|us)/))
        |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
        |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
        |> rename(columns: {_value: "pcpu"})
        |> yield(name: "0")

influxdb$ influx transpile \
             'SELECT mean(pcpu) FROM netdatatsdb.autogen.vpsmetrics WHERE time >= now() - 1w GROUP BY time(15m) FILL(none)'
package main
from(bucket: "netdatatsdb/autogen")
        |> range(start: 2021-01-29T16:25:00.305159561Z, stop: 2021-02-05T16:25:00.305159561Z)
        |> filter(fn: (r) =>
                (r._measurement == "vpsmetrics" and r._field == "pcpu"))
        |> group(columns: ["_measurement", "_start", "_stop", "_field"], mode: "by")
        |> keep(columns: ["_measurement", "_start", "_stop", "_field", "_time", "_value"])
        |> window(every: 15m)
        |> mean()
        |> map(fn: (r) =>
                ({r with _time: r._start}))
        |> window(every: inf)
        |> rename(columns: {_value: "mean"})
        |> yield(name: "0")

Unfortunately, the transpile option does not translate subqueries, an error "unimplemented" is raised :

$ influx transpile \
    'SELECT max(pctspace) FROM ( SELECT (used / (used + available)) * 100 AS pctspace FROM netdatatsdb.autogen.vps_space WHERE (host='vpsfrsqlpac1'  AND time > now() -1d))'
Error: unimplemented: source must be a measurement

Conclusion

One may regret InfluxQL is replaced by Flux, a NoSQL language which seems syntaxically heavier when getting started but Flux language brings features which were not possible with InfluxQL (joins, pivot, SQL external datasources…), these advanced features will be covered in upcoming papers.

It just takes some time to get used to coding with Flux language when coming from SQL language.