InfluxDB.Client.Linq 4.8.0-dev.9324

.NET Standard 2.0
This is a prerelease version of InfluxDB.Client.Linq.
There is a newer version of this package available.
See the version list below for details.
dotnet add package InfluxDB.Client.Linq --version 4.8.0-dev.9324
NuGet\Install-Package InfluxDB.Client.Linq -Version 4.8.0-dev.9324
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="InfluxDB.Client.Linq" Version="4.8.0-dev.9324" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add InfluxDB.Client.Linq --version 4.8.0-dev.9324
#r "nuget: InfluxDB.Client.Linq, 4.8.0-dev.9324"
#r directive can be used in F# Interactive, C# scripting and .NET Interactive. Copy this into the interactive tool or source code of the script to reference the package.
// Install InfluxDB.Client.Linq as a Cake Addin
#addin nuget:?package=InfluxDB.Client.Linq&version=4.8.0-dev.9324&prerelease

// Install InfluxDB.Client.Linq as a Cake Tool
#tool nuget:?package=InfluxDB.Client.Linq&version=4.8.0-dev.9324&prerelease

InfluxDB.Client.Linq

The library supports to use a LINQ expression to query the InfluxDB.

Documentation

This section contains links to the client library documentation.

Usage

How to start

First, add the library as a dependency for your project:

# For actual version please check: https://www.nuget.org/packages/InfluxDB.Client.Linq/

dotnet add package InfluxDB.Client.Linq --version 1.17.0-dev.linq.17

Next, you should add additional using statement to your program:

using InfluxDB.Client.Linq;

The LINQ query depends on QueryApiSync, you could create an instance of QueryApiSync by:

var client = new InfluxDBClient("http://localhost:8086", "my-token");
var queryApi = client.GetQueryApiSync();

In the following examples we assume that the Sensor entity is defined as:

class Sensor
{
    [Column("sensor_id", IsTag = true)] 
    public string SensorId { get; set; }

    /// <summary>
    /// "production" or "testing"
    /// </summary>
    [Column("deployment", IsTag = true)]
    public string Deployment { get; set; }

    /// <summary>
    /// Value measured by sensor
    /// </summary>
    [Column("data")]
    public float Value { get; set; }

    [Column(IsTimestamp = true)] 
    public DateTime Timestamp { get; set; }
}

Time Series

The InfluxDB uses concept of TimeSeries - a collection of data that shares a measurement, tag set, and bucket. You always operate on each time-series, if you querying data with Flux.

Imagine that you have following data:

sensor,deployment=production,sensor_id=id-1 data=15
sensor,deployment=testing,sensor_id=id-1 data=28
sensor,deployment=testing,sensor_id=id-1 data=12
sensor,deployment=production,sensor_id=id-1 data=89

The corresponding time series are:

  • sensor,deployment=production,sensor_id=id-1
  • sensor,deployment=testing,sensor_id=id-1

If you query your data with following Flux:

from(bucket: "my-bucket")
  |> range(start: 0)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> drop(columns: ["_start", "_stop", "_measurement"])
  |> limit(n:1)

The result will be one item for each time-series:

sensor,deployment=production,sensor_id=id-1 data=15
sensor,deployment=testing,sensor_id=id-1 data=28

and this is also way how this LINQ driver works.

The driver supposes that you are querying over one time-series.

There is a way how to change this configuration:

Enable querying multiple time-series

var settings = new QueryableOptimizerSettings{QueryMultipleTimeSeries = true};
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi, settings)
    select s;

The group() function is way how to query multiple time-series and gets correct results.

The following query works correctly:

from(bucket: "my-bucket")
  |> range(start: 0)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> drop(columns: ["_start", "_stop", "_measurement"])
  |> group()
  |> limit(n:1)

and corresponding result:

sensor,deployment=production,sensor_id=id-1 data=15

Do not used this functionality if it is not required because it brings a performance costs caused by sorting:

Group does not guarantee sort order

The group() does not guarantee sort order of output records. To ensure data is sorted correctly, use orderby expression.

Client Side Evaluation

The library attempts to evaluate a query on the server as much as possible. The client side evaluations is required for aggregation function if there is more then one time series.

If you want to count your data with following Flux:

from(bucket: "my-bucket")
  |> range(start: 0)
  |> drop(columns: ["_start", "_stop", "_measurement"])
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> stateCount(fn: (r) => true, column: "linq_result_column") 
  |> last(column: "linq_result_column") 
  |> keep(columns: ["linq_result_column"])

The result will be one count for each time-series:

#group,false,false,false
#datatype,string,long,long
#default,_result,,
,result,table,linq_result_column
,,0,1
,,0,1

and client has to aggregate this multiple results into one scalar value.

Operators that could cause client side evaluation:

  • Count
  • CountLong

TL;DR

Perform Query

The LINQ query requires bucket and organization as a source of data. Both of them could be name or ID.

var query = (from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.SensorId == "id-1"
    where s.Value > 12
    where s.Timestamp > new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc)
    where s.Timestamp < new DateTime(2021, 01, 10, 5, 10, 0, DateTimeKind.Utc)
    orderby s.Timestamp
    select s)
    .Take(2)
    .Skip(2);

var sensors = query.ToList();

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 2019-11-16T08:20:15Z, stop: 2021-01-10T05:10:00Z) 
    |> filter(fn: (r) => (r["sensor_id"] == "id-1")) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> filter(fn: (r) => (r["data"] > 12)) 
    |> limit(n: 2, offset: 2)

Filtering

The range() and filter() are pushdown functions that allow push their data manipulation down to the underlying data source rather than storing and manipulating data in memory. Using pushdown functions at the beginning of query we greatly reduce the amount of server memory necessary to run a query.

The LINQ provider needs to aligns fields within each input table that have the same timestamp to column-wise format:

From
_time _value _measurement _field
1970-01-01T00:00:00.000000001Z 1.0 "m1" "f1"
1970-01-01T00:00:00.000000001Z 2.0 "m1" "f2"
1970-01-01T00:00:00.000000002Z 3.0 "m1" "f1"
1970-01-01T00:00:00.000000002Z 4.0 "m1" "f2"
To
_time _measurement f1 f2
1970-01-01T00:00:00.000000001Z "m1" 1.0 2.0
1970-01-01T00:00:00.000000002Z "m1" 3.0 4.0

For that reason we need to use the pivot() function. The pivot is heavy and should be used at the end of our Flux query.

There is an also possibility to disable appending pivot by:

var optimizerSettings =
    new QueryableOptimizerSettings
    {
        AlignFieldsWithPivot = false
    };
    
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi, optimizerSettings)
    select s;

Mapping LINQ filters

For the best performance on the both side - server, LINQ provider we maps the LINQ expressions to FLUX query following way:

Filter by Timestamp

Mapped to range().

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Timestamp >= new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc)
    select s;

var sensors = query.ToList();

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 2019-11-16T08:20:15ZZ) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "_measurement"])
Filter by Tag

Mapped to filter() before pivot().

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.SensorId == "id-1"
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0)
    |> filter(fn: (r) => (r["sensor_id"] == "id-1"))  
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "_measurement"])
Filter by Field

The filter by field has to be after the pivot() because we want to select all fields from pivoted table.

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Value < 28
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0)
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")  
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> filter(fn: (r) => (r["data"] < 28))

If we move the filter() for fields before the pivot() then we will gets wrong results:

Data
m1 f1=1,f2=2 1
m1 f1=3,f2=4 2
Without filter
from(bucket: "my-bucket") 
    |> range(start: 0)
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])

Results:

_time f1 f2
1970-01-01T00:00:00.000000001Z 1.0 2.0
1970-01-01T00:00:00.000000002Z 3.0 4.0
Filter before pivot()

filter: f1 > 0

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> filter(fn: (r) => (r["_field"] == "f1" and r["_value"] > 0))
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])

Results:

_time f1
1970-01-01T00:00:00.000000001Z 1.0
1970-01-01T00:00:00.000000002Z 3.0

Time Range Filtering

The time filtering expressions are mapped to Flux range() function. This function has start and stop parameters with following behaviour: start <= _time < stop:

Results include records with _time values greater than or equal to the specified start time and less than the specified stop time.

This means that we have to add one nanosecond to start if we want timestamp greater than and also add one nanosecond to stop if we want to timestamp lesser or equal than.

Example 1:
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Timestamp > new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc)
    where s.Timestamp < new DateTime(2021, 01, 10, 5, 10, 0, DateTimeKind.Utc)
    select s;

var sensors = query.ToList();

Flux Query:

start_shifted = int(v: time(v: "2019-11-16T08:20:15Z")) + 1

from(bucket: "my-bucket") 
    |> range(start: time(v: start_shifted), stop: 2021-01-10T05:10:00Z)
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
Example 2:
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Timestamp >= new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc)
    where s.Timestamp <= new DateTime(2021, 01, 10, 5, 10, 0, DateTimeKind.Utc)
    select s;

var sensors = query.ToList();

Flux Query:

stop_shifted = int(v: time(v: "2021-01-10T05:10:00Z")) + 1

from(bucket: "my-bucket") 
    |> range(start: 2019-11-16T08:20:15Z, stop: time(v: stop_shifted)) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "_measurement"])
Example 3:
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Timestamp >= new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc)
    select s;

var sensors = query.ToList();

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 2019-11-16T08:20:15ZZ) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "_measurement"])
Example 4:
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Timestamp <= new DateTime(2021, 01, 10, 5, 10, 0, DateTimeKind.Utc)
    select s;

var sensors = query.ToList();

Flux Query:

stop_shifted = int(v: time(v: "2021-01-10T05:10:00Z")) + 1

from(bucket: "my-bucket") 
    |> range(start: 0, stop: time(v: stop_shifted))
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
Example 5:
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Timestamp == new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc)
    select s;

var sensors = query.ToList();

Flux Query:

stop_shifted = int(v: time(v: "2019-11-16T08:20:15Z")) + 1

from(bucket: "my-bucket") 
    |> range(start: 2019-11-16T08:20:15Z, stop: time(v: stop_shifted)) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "_measurement"])

There is also a possibility to specify the default value for start and stop parameter. This is useful when you need to include data with future timestamps when no time bounds are explicitly set.

var settings = new QueryableOptimizerSettings
{
    RangeStartValue = DateTime.UtcNow.AddHours(-24),
    RangeStopValue = DateTime.UtcNow.AddHours(1)
};
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi, settings)
    select s;

TD;LR

Supported LINQ operators

Equal

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.SensorId == "id-1"
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0)
    |> filter(fn: (r) => (r["sensor_id"] == "id-1"))  
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "_measurement"])

Not Equal

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.SensorId != "id-1"
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0)
    |> filter(fn: (r) => (r["sensor_id"] != "id-1")) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "_measurement"])

Less Than

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Value < 28
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> filter(fn: (r) => (r["data"] < 28))

Less Than Or Equal

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Value <= 28
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> filter(fn: (r) => (r["data"] <= 28))

Greater Than

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Value > 28
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> filter(fn: (r) => (r["data"] > 28))

Greater Than Or Equal

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Value >= 28
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> filter(fn: (r) => (r["data"] >= 28))

And

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Value >= 28 && s.SensorId != "id-1"
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> filter(fn: (r) => (r["sensor_id"] != "id-1"))
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> filter(fn: (r) => (r["data"] >= 28))

Or

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Value >= 28 || s.Value <= 5
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> filter(fn: (r) => ((r["data"] >= 28) or (r["data"] <=> 28)))

Any

The following code demonstrates how to use the Any operator to determine whether a collection contains any elements. By default the InfluxDB.Client doesn't supports to store a subcollection in your DomainObject.

Imagine that you have following entities:

class SensorCustom
{
    public Guid Id { get; set; }
    
    public float Data { get; set; }
    
    public DateTimeOffset Time { get; set; }
    
    public virtual ICollection<SensorAttribute> Attributes { get; set; }
}

class SensorAttribute
{
    public string Name { get; set; }
    public string Value { get; set; }
}

To be able to store SensorCustom entity in InfluxDB and retrieve it from database you should implement IDomainObjectMapper. The converter tells to the Client how to map DomainObject into PointData and how to map FluxRecord to DomainObject.

Entity Converter:

private class SensorEntityConverter : IDomainObjectMapper
{
    //
    // Parse incoming FluxRecord to DomainObject
    //
    public T ConvertToEntity<T>(FluxRecord fluxRecord)
    {
        if (typeof(T) != typeof(SensorCustom))
        {
            throw new NotSupportedException($"This converter doesn't supports: {typeof(SensorCustom)}");
        }

        //
        // Create SensorCustom entity and parse `SeriesId`, `Value` and `Time`
        //
        var customEntity = new SensorCustom
        {
            Id = Guid.Parse(Convert.ToString(fluxRecord.GetValueByKey("series_id"))!),
            Data = Convert.ToDouble(fluxRecord.GetValueByKey("data")),
            Time = fluxRecord.GetTime().GetValueOrDefault().ToDateTimeUtc(),
            Attributes = new List<SensorAttribute>()
        };
        
        foreach (var (key, value) in fluxRecord.Values)
        {
            //
            // Parse SubCollection values
            //
            if (key.StartsWith("property_"))
            {
                var attribute = new SensorAttribute
                {
                    Name = key.Replace("property_", string.Empty), Value = Convert.ToString(value)
                };
                
                customEntity.Attributes.Add(attribute);
            }
        }

        return (T) Convert.ChangeType(customEntity, typeof(T));
    }

    //
    // Convert DomainObject into PointData
    //
    public PointData ConvertToPointData<T>(T entity, WritePrecision precision)
    {
        if (!(entity is SensorCustom ce))
        {
            throw new NotSupportedException($"This converter doesn't supports: {typeof(SensorCustom)}");
        }

        //
        // Map `SeriesId`, `Value` and `Time` to Tag, Field and Timestamp
        //
        var point = PointData
            .Measurement("custom_measurement")
            .Tag("series_id", ce.Id.ToString())
            .Field("data", ce.Data)
            .Timestamp(ce.Time, precision);

        //
        // Map subattributes to Fields
        //
        foreach (var attribute in ce.Attributes ?? new List<SensorAttribute>())
        {
            point = point.Field($"property_{attribute.Name}", attribute.Value);
        }

        return point;
    }
}

The Converter could be passed to QueryApiSync, QueryApi or WriteApi by:

// Create Converter
var converter = new SensorEntityConverter();

// Get Query and Write API
var queryApi = client.GetQueryApiSync(converter);
var writeApi = client.GetWriteApi(converter);

The LINQ provider needs to know how properties of DomainObject are stored in InfluxDB - their name and type (tag, field, timestamp).

If you use a IDomainObjectMapper instead of InfluxDB Attributes you should implement IMemberNameResolver:

private class SensorMemberResolver: IMemberNameResolver
{
    //
    // Tell to LINQ providers how is property of DomainObject mapped - Tag, Field, Timestamp, ... ?
    //
    public MemberType ResolveMemberType(MemberInfo memberInfo)
    {
        //
        // Mapping of subcollection
        //
        if (memberInfo.DeclaringType == typeof(SensorAttribute))
        {
            return memberInfo.Name switch
            {
                "Name" => MemberType.NamedField,
                "Value" => MemberType.NamedFieldValue,
                _ => MemberType.Field
            };
        }

        //
        // Mapping of "root" domain
        //
        return memberInfo.Name switch
        {
            "Time" => MemberType.Timestamp,
            "Id" => MemberType.Tag,
            _ => MemberType.Field
        };
    }

    //
    // Tell to LINQ provider how is property of DomainObject named 
    //
    public string GetColumnName(MemberInfo memberInfo)
    {
        return memberInfo.Name switch
        {
            "Id" => "series_id",
            "Data" => "data",
            _ => memberInfo.Name
        };
    }

    //
    // Tell to LINQ provider how is named property that is flattened
    //
    public string GetNamedFieldName(MemberInfo memberInfo, object value)
    {
        return "attribute_" + Convert.ToString(value);
    }
}

Now We are able to provide a required information to the LINQ provider by memberResolver parameter:

var memberResolver = new SensorMemberResolver();

var query = from s in InfluxDBQueryable<SensorCustom>.Queryable("my-bucket", "my-org", queryApi, memberResolver)
    where s.Attributes.Any(a => a.Name == "quality" && a.Value == "good")
    select s;

Flux Query:

from(bucket: "my-bucket")
    |> range(start: 0)
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> filter(fn: (r) => (r["attribute_quality"] == "good"))

For more info see CustomDomainMappingAndLinq example.

Take

var query = (from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    select s)
    .Take(10);

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> limit(n: 10)

Note: the limit() function can be align before pivot() function by:

var optimizerSettings =
    new QueryableOptimizerSettings
    {
        AlignLimitFunctionAfterPivot = false
    };

Performance: The pivot() is a “heavy” function. Using limit() before pivot() is much faster but works only if you have consistent data series. See #318 for more details.

TakeLast

var query = (from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    select s)
    .TakeLast(10);

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> tail(n: 10)

Note: the tail() function can be align before pivot() function by:

var optimizerSettings =
    new QueryableOptimizerSettings
    {
        AlignLimitFunctionAfterPivot = false
    };

Performance: The pivot() is a “heavy” function. Using tail() before pivot() is much faster but works only if you have consistent data series. See #318 for more details.

Skip

var query = (from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    select s)
    .Take(10)
    .Skip(50);

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> limit(n: 10, offset: 50)

OrderBy

Example 1:
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    orderby s.Deployment
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> sort(columns: ["deployment"], desc: false)
Example 2:
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    orderby s.Timestamp descending 
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> sort(columns: ["_time"], desc: true)

Count

Possibility of partial client side evaluation

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    select s;

var sensors = query.Count();

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> stateCount(fn: (r) => true, column: "linq_result_column") 
    |> last(column: "linq_result_column") 
    |> keep(columns: ["linq_result_column"])

LongCount

Possibility of partial client side evaluation

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    select s;

var sensors = query.LongCount();

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0)
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> stateCount(fn: (r) => true, column: "linq_result_column") 
    |> last(column: "linq_result_column") 
    |> keep(columns: ["linq_result_column"])

Contains

int[] values = {15, 28};

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where values.Contains(s.Value)
    select s;

var sensors = query.Count();

Flux Query:

from(bucket: "my-bucket")
    |> range(start: 0)
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> filter(fn: (r) => contains(value: r["data"], set: [15, 28]))

Custom LINQ operators

AggregateWindow

The AggregateWindow applies an aggregate function to fixed windows of time. Can be used only for a field which is defined as timestamp - [Column(IsTimestamp = true)]. For more info about aggregateWindow() function see Flux's documentation - https://docs.influxdata.com/flux/v0.x/stdlib/universe/aggregatewindow/.

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(40), "mean")
    select s;

Flux Query:

from(bucket: "my-bucket") 
    |> range(start: 0) 
    |> aggregateWindow(every: 20s, period: 40s, fn: mean) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_measurement"])

Domain Converter

There is also possibility to use custom domain converter to transform data from/to your DomainObject.

Instead of following Influx attributes:

[Measurement("temperature")]
private class Temperature
{
    [Column("location", IsTag = true)] public string Location { get; set; }

    [Column("value")] public double Value { get; set; }

    [Column(IsTimestamp = true)] public DateTime Time { get; set; }
}

you could create own instance of IDomainObjectMapper and use it with QueryApiSync, QueryApi and WriteApi.

var converter = new DomainEntityConverter();
var queryApi = client.GetQueryApiSync(converter)

To satisfy LINQ Query Provider you have to implement IMemberNameResolver:

var resolver = new MemberNameResolver();

var query = from s in InfluxDBQueryable<SensorCustom>.Queryable("my-bucket", "my-org", queryApi, nameResolver)
    where s.Attributes.Any(a => a.Name == "quality" && a.Value == "good")
    select s;

for more details see Any operator and for full example see: CustomDomainMappingAndLinq.

How to debug output Flux Query

var query = (from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
        where s.SensorId == "id-1"
        where s.Value > 12
        where s.Timestamp > new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc)
        where s.Timestamp < new DateTime(2021, 01, 10, 5, 10, 0, DateTimeKind.Utc)
        orderby s.Timestamp
        select s)
    .Take(2)
    .Skip(2);
    
Console.WriteLine("==== Debug LINQ Queryable Flux output ====");
var influxQuery = ((InfluxDBQueryable<Sensor>) query).ToDebugQuery();
foreach (var statement in influxQuery.Extern.Body)
{
    var os = statement as OptionStatement;
    var va = os?.Assignment as VariableAssignment;
    var name = va?.Id.Name;
    var value = va?.Init.GetType().GetProperty("Value")?.GetValue(va.Init, null);

    Console.WriteLine($"{name}={value}");
}
Console.WriteLine();
Console.WriteLine(influxQuery._Query);

How to filter by Measurement

By default, as an optimization step, Flux queries generated by LINQ will automatically drop the Start, Stop and Measurement columns:

from(bucket: "my-bucket")
  |> range(start: 0)
  |> drop(columns: ["_start", "_stop", "_measurement"])
  ...

This is because typical POCO classes do not include them:

[Measurement("temperature")]
private class Temperature
{
    [Column("location", IsTag = true)] public string Location { get; set; }
    [Column("value")] public double Value { get; set; }
    [Column(IsTimestamp = true)] public DateTime Time { get; set; }
}

It is, however, possible to utilize the Measurement column in LINQ queries by enabling it in the query optimization settings:

var optimizerSettings =
    new QueryableOptimizerSettings
    {
        DropMeasurementColumn = false,
        
        // Note we can also enable the start and stop columns
        //DropStartColumn = false,
        //DropStopColumn = false
    };

var queryable =
    new InfluxDBQueryable<InfluxPoint>("my-bucket", "my-org", queryApi, new DefaultMemberNameResolver(), optimizerSettings);

var latest =
    await queryable.Where(p => p.Measurement == "temperature")
                   .OrderByDescending(p => p.Time)
                   .ToInfluxQueryable()
                   .GetAsyncEnumerator()
                   .FirstOrDefaultAsync();

private class InfluxPoint
{
    [Column(IsMeasurement = true)] public string Measurement { get; set; }
    [Column("location", IsTag = true)] public string Location { get; set; }
    [Column("value")] public double Value { get; set; }
    [Column(IsTimestamp = true)] public DateTime Time { get; set; }
}

Asynchronous Queries

The LINQ driver also supports asynchronous querying. For asynchronous queries you have to initialize InfluxDBQueryable with asynchronous version of QueryApi and transform IQueryable<T> to IAsyncEnumerable<T>:

var client = new InfluxDBClient("http://localhost:8086", "my-token");
var queryApi = client.GetQueryApi();

var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", queryApi)
    select s;

IAsyncEnumerable<Sensor> enumerable = query
    .ToInfluxQueryable()
    .GetAsyncEnumerator();
Product Versions
.NET net5.0 net5.0-windows net6.0 net6.0-android net6.0-ios net6.0-maccatalyst net6.0-macos net6.0-tvos net6.0-windows net7.0 net7.0-android net7.0-ios net7.0-maccatalyst net7.0-macos net7.0-tvos net7.0-windows
.NET Core netcoreapp2.0 netcoreapp2.1 netcoreapp2.2 netcoreapp3.0 netcoreapp3.1
.NET Standard netstandard2.0 netstandard2.1
.NET Framework net461 net462 net463 net47 net471 net472 net48 net481
MonoAndroid monoandroid
MonoMac monomac
MonoTouch monotouch
Tizen tizen40 tizen60
Xamarin.iOS xamarinios
Xamarin.Mac xamarinmac
Xamarin.TVOS xamarintvos
Xamarin.WatchOS xamarinwatchos
Compatible target framework(s)
Additional computed target framework(s)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (2)

Showing the top 2 NuGet packages that depend on InfluxDB.Client.Linq:

Package Downloads
SpmisNet.Data

Package Description

DeerNet.InfluxDb2

Package Description

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
4.11.0-dev.10059 40 1/26/2023
4.10.0 374 1/26/2023
4.10.0-dev.10033 44 1/25/2023
4.10.0-dev.10032 41 1/25/2023
4.10.0-dev.10031 43 1/25/2023
4.10.0-dev.9936 273 12/26/2022
4.10.0-dev.9935 43 12/26/2022
4.10.0-dev.9881 46 12/21/2022
4.10.0-dev.9880 43 12/21/2022
4.10.0-dev.9818 46 12/16/2022
4.10.0-dev.9773 45 12/12/2022
4.10.0-dev.9756 44 12/12/2022
4.10.0-dev.9693 46 12/6/2022
4.9.0 2,194 12/6/2022
4.9.0-dev.9684 44 12/6/2022
4.9.0-dev.9666 45 12/6/2022
4.9.0-dev.9617 46 12/6/2022
4.9.0-dev.9478 46 12/5/2022
4.9.0-dev.9469 44 12/5/2022
4.9.0-dev.9444 43 12/5/2022
4.9.0-dev.9411 44 12/5/2022
4.9.0-dev.9350 51 12/1/2022
4.8.0 980 12/1/2022
4.8.0-dev.9324 49 11/30/2022
4.8.0-dev.9232 47 11/28/2022
4.8.0-dev.9223 46 11/28/2022
4.8.0-dev.9222 46 11/28/2022
4.8.0-dev.9117 62 11/21/2022
4.8.0-dev.9108 46 11/21/2022
4.8.0-dev.9099 46 11/21/2022
4.8.0-dev.9029 47 11/16/2022
4.8.0-dev.8971 46 11/15/2022
4.8.0-dev.8961 50 11/14/2022
4.8.0-dev.8928 48 11/14/2022
4.8.0-dev.8899 47 11/14/2022
4.8.0-dev.8898 46 11/14/2022
4.8.0-dev.8839 51 11/14/2022
4.8.0-dev.8740 46 11/7/2022
4.8.0-dev.8725 44 11/7/2022
4.8.0-dev.8648 46 11/3/2022
4.7.0 4,607 11/3/2022
4.7.0-dev.8625 49 11/2/2022
4.7.0-dev.8594 46 10/31/2022
4.7.0-dev.8579 45 10/31/2022
4.7.0-dev.8557 46 10/31/2022
4.7.0-dev.8540 44 10/31/2022
4.7.0-dev.8518 47 10/31/2022
4.7.0-dev.8517 48 10/31/2022
4.7.0-dev.8509 48 10/31/2022
4.7.0-dev.8377 51 10/26/2022
4.7.0-dev.8360 51 10/25/2022
4.7.0-dev.8350 51 10/24/2022
4.7.0-dev.8335 53 10/24/2022
4.7.0-dev.8334 51 10/24/2022
4.7.0-dev.8223 81 10/19/2022
4.7.0-dev.8178 45 10/17/2022
4.7.0-dev.8170 46 10/17/2022
4.7.0-dev.8148 46 10/17/2022
4.7.0-dev.8133 44 10/17/2022
4.7.0-dev.8097 47 10/17/2022
4.7.0-dev.8034 59 10/11/2022
4.7.0-dev.8025 51 10/11/2022
4.7.0-dev.8009 51 10/10/2022
4.7.0-dev.8001 54 10/10/2022
4.7.0-dev.7959 48 10/4/2022
4.7.0-dev.7905 48 9/30/2022
4.7.0-dev.7875 48 9/29/2022
4.6.0 2,211 9/29/2022
4.6.0-dev.7832 51 9/29/2022
4.6.0-dev.7817 49 9/29/2022
4.6.0-dev.7779 53 9/27/2022
4.6.0-dev.7778 55 9/27/2022
4.6.0-dev.7734 50 9/26/2022
4.6.0-dev.7733 49 9/26/2022
4.6.0-dev.7677 56 9/20/2022
4.6.0-dev.7650 60 9/16/2022
4.6.0-dev.7626 98 9/14/2022
4.6.0-dev.7618 98 9/14/2022
4.6.0-dev.7574 60 9/13/2022
4.6.0-dev.7572 56 9/13/2022
4.6.0-dev.7528 55 9/12/2022
4.6.0-dev.7502 57 9/9/2022
4.6.0-dev.7479 73 9/8/2022
4.6.0-dev.7471 68 9/8/2022
4.6.0-dev.7447 52 9/7/2022
4.6.0-dev.7425 53 9/7/2022
4.6.0-dev.7395 52 9/6/2022
4.6.0-dev.7344 59 8/31/2022
4.6.0-dev.7329 47 8/31/2022
4.6.0-dev.7292 50 8/30/2022
4.6.0-dev.7240 54 8/29/2022
4.5.0 1,978 8/29/2022
4.5.0-dev.7216 59 8/27/2022
4.5.0-dev.7147 61 8/22/2022
4.5.0-dev.7134 56 8/17/2022
4.5.0-dev.7096 64 8/15/2022
4.5.0-dev.7070 71 8/11/2022
4.5.0-dev.7040 85 8/10/2022
4.5.0-dev.7011 68 8/3/2022
4.5.0-dev.6987 68 8/1/2022
4.5.0-dev.6962 71 7/29/2022
4.4.0 2,959 7/29/2022
4.4.0-dev.6901 68 7/25/2022
4.4.0-dev.6843 67 7/19/2022
4.4.0-dev.6804 67 7/19/2022
4.4.0-dev.6789 65 7/19/2022
4.4.0-dev.6760 66 7/19/2022
4.4.0-dev.6705 72 7/14/2022
4.4.0-dev.6663 100 6/24/2022
4.4.0-dev.6655 67 6/24/2022
4.3.0 2,790 6/24/2022
4.3.0-dev.multiple.buckets3 88 6/21/2022
4.3.0-dev.multiple.buckets2 61 6/17/2022
4.3.0-dev.multiple.buckets1 64 6/17/2022
4.3.0-dev.6631 64 6/22/2022
4.3.0-dev.6623 63 6/22/2022
4.3.0-dev.6374 72 6/13/2022
4.3.0-dev.6286 70 5/20/2022
4.2.0 2,042 5/20/2022
4.2.0-dev.6257 82 5/13/2022
4.2.0-dev.6248 72 5/12/2022
4.2.0-dev.6233 76 5/12/2022
4.2.0-dev.6194 73 5/10/2022
4.2.0-dev.6193 69 5/10/2022
4.2.0-dev.6158 2,522 5/6/2022
4.2.0-dev.6135 76 5/6/2022
4.2.0-dev.6091 80 4/28/2022
4.2.0-dev.6048 80 4/28/2022
4.2.0-dev.6047 78 4/28/2022
4.2.0-dev.5966 78 4/25/2022
4.2.0-dev.5938 77 4/19/2022
4.1.0 3,086 4/19/2022
4.1.0-dev.5910 264 4/13/2022
4.1.0-dev.5888 73 4/13/2022
4.1.0-dev.5887 78 4/13/2022
4.1.0-dev.5794 79 4/6/2022
4.1.0-dev.5725 85 3/18/2022
4.0.0 1,845 3/18/2022
4.0.0-rc3 251 3/4/2022
4.0.0-rc2 393 2/25/2022
4.0.0-rc1 82 2/18/2022
4.0.0-dev.5709 77 3/18/2022
4.0.0-dev.5684 80 3/15/2022
4.0.0-dev.5630 81 3/4/2022
4.0.0-dev.5607 83 3/3/2022
4.0.0-dev.5579 79 2/25/2022
4.0.0-dev.5556 81 2/24/2022
4.0.0-dev.5555 80 2/24/2022
4.0.0-dev.5497 73 2/23/2022
4.0.0-dev.5489 81 2/23/2022
4.0.0-dev.5460 77 2/23/2022
4.0.0-dev.5444 76 2/22/2022
4.0.0-dev.5333 80 2/17/2022
4.0.0-dev.5303 78 2/16/2022
4.0.0-dev.5280 83 2/16/2022
4.0.0-dev.5279 81 2/16/2022
4.0.0-dev.5241 178 2/15/2022
4.0.0-dev.5225 78 2/15/2022
4.0.0-dev.5217 78 2/15/2022
4.0.0-dev.5209 76 2/15/2022
4.0.0-dev.5200 75 2/14/2022
4.0.0-dev.5188 78 2/10/2022
4.0.0-dev.5180 80 2/10/2022
4.0.0-dev.5172 79 2/10/2022
4.0.0-dev.5130 76 2/10/2022
4.0.0-dev.5122 79 2/9/2022
4.0.0-dev.5103 84 2/9/2022
4.0.0-dev.5097 83 2/9/2022
4.0.0-dev.5091 83 2/9/2022
4.0.0-dev.5084 82 2/8/2022
3.4.0-dev.5263 80 2/15/2022
3.4.0-dev.4986 86 2/7/2022
3.4.0-dev.4968 98 2/4/2022
3.3.0 5,481 2/4/2022
3.3.0-dev.4889 87 2/3/2022
3.3.0-dev.4865 92 2/1/2022
3.3.0-dev.4823 96 1/19/2022
3.3.0-dev.4691 98 1/7/2022
3.3.0-dev.4557 1,313 11/26/2021
3.2.0 5,363 11/26/2021
3.2.0-dev.4533 4,806 11/24/2021
3.2.0-dev.4484 165 11/11/2021
3.2.0-dev.4475 136 11/10/2021
3.2.0-dev.4387 115 10/26/2021
3.2.0-dev.4363 124 10/22/2021
3.2.0-dev.4356 125 10/22/2021
3.1.0 1,576 10/22/2021
3.1.0-dev.4303 127 10/18/2021
3.1.0-dev.4293 123 10/15/2021
3.1.0-dev.4286 104 10/15/2021
3.1.0-dev.4240 139 10/12/2021
3.1.0-dev.4202 106 10/11/2021
3.1.0-dev.4183 144 10/11/2021
3.1.0-dev.4131 112 10/8/2021
3.1.0-dev.3999 118 10/5/2021
3.1.0-dev.3841 204 9/29/2021
3.1.0-dev.3798 120 9/17/2021
3.0.0 925 9/17/2021
3.0.0-dev.3726 459 8/31/2021
3.0.0-dev.3719 104 8/31/2021
3.0.0-dev.3671 128 8/20/2021
2.2.0-dev.3652 111 8/20/2021
2.1.0 1,345 8/20/2021
2.1.0-dev.3605 119 8/17/2021
2.1.0-dev.3584 125 8/16/2021
2.1.0-dev.3558 112 8/16/2021
2.1.0-dev.3527 155 7/29/2021
2.1.0-dev.3519 165 7/29/2021
2.1.0-dev.3490 109 7/20/2021
2.1.0-dev.3445 133 7/12/2021
2.1.0-dev.3434 164 7/9/2021
2.0.0 8,430 7/9/2021
2.0.0-dev.3401 150 6/25/2021
2.0.0-dev.3368 136 6/23/2021
2.0.0-dev.3361 147 6/23/2021
2.0.0-dev.3330 137 6/17/2021
2.0.0-dev.3291 143 6/16/2021
1.20.0-dev.3218 163 6/4/2021
1.19.0 704 6/4/2021
1.19.0-dev.3204 137 6/3/2021
1.19.0-dev.3160 120 6/2/2021
1.19.0-dev.3159 117 6/2/2021
1.19.0-dev.3084 778 5/7/2021
1.19.0-dev.3051 138 5/5/2021
1.19.0-dev.3044 137 5/5/2021
1.19.0-dev.3008 139 4/30/2021
1.18.0 1,062 4/30/2021
1.18.0-dev.2973 148 4/27/2021
1.18.0-dev.2930 130 4/16/2021
1.18.0-dev.2919 125 4/13/2021
1.18.0-dev.2893 114 4/12/2021
1.18.0-dev.2880 131 4/12/2021
1.18.0-dev.2856 123 4/7/2021
1.18.0-dev.2830 208 4/1/2021
1.18.0-dev.2816 135 4/1/2021
1.17.0 564 4/1/2021
1.17.0-dev.linq.17 634 3/18/2021
1.17.0-dev.linq.16 116 3/16/2021
1.17.0-dev.linq.15 148 3/15/2021
1.17.0-dev.linq.14 154 3/12/2021
1.17.0-dev.linq.13 182 3/11/2021
1.17.0-dev.linq.12 134 3/10/2021
1.17.0-dev.linq.11 128 3/8/2021
1.17.0-dev.2776 155 3/26/2021
1.17.0-dev.2713 170 3/25/2021
1.16.0-dev.linq.10 1,180 2/4/2021
1.15.0-dev.linq.9 152 2/4/2021
1.15.0-dev.linq.8 132 1/28/2021
1.15.0-dev.linq.7 148 1/27/2021
1.15.0-dev.linq.6 170 1/20/2021
1.15.0-dev.linq.5 187 1/19/2021
1.15.0-dev.linq.4 149 1/15/2021
1.15.0-dev.linq.3 129 1/14/2021
1.15.0-dev.linq.2 144 1/13/2021
1.15.0-dev.linq.1 157 1/12/2021