Class BaseExtractor<TConfig>
Base class for extractors writing timeseries, events or datapoints.
Inherited Members
Namespace: Cognite.Extractor.Utils
Assembly: ExtractorUtils.dll
Syntax
public abstract class BaseExtractor<TConfig> : IDisposable, IAsyncDisposable where TConfig : VersionedConfig
Type Parameters
| Name | Description |
|---|---|
| TConfig |
Constructors
| Edit this page View SourceBaseExtractor(TConfig, IServiceProvider, CogniteDestination?, ExtractionRun?, RemoteConfigManager<TConfig>?)
Constructor
Declaration
public BaseExtractor(TConfig config, IServiceProvider provider, CogniteDestination? destination = null, ExtractionRun? run = null, RemoteConfigManager<TConfig>? configManager = null)
Parameters
| Type | Name | Description |
|---|---|---|
| TConfig | config | Configuration object |
| IServiceProvider | provider | Service provider |
| CogniteDestination | destination | Cognite destination |
| ExtractionRun | run | Optional extraction run |
| RemoteConfigManager<TConfig> | configManager | Optional remote config manager |
Properties
| Edit this page View SourceConfig
Configuration object
Declaration
protected TConfig Config { get; }
Property Value
| Type | Description |
|---|---|
| TConfig |
ConfigManager
Config manager for fetching remote configs, will contain the newest config version and its revision.
Declaration
protected RemoteConfigManager<TConfig>? ConfigManager { get; }
Property Value
| Type | Description |
|---|---|
| RemoteConfigManager<TConfig> |
Destination
CDF destination
Declaration
protected CogniteDestination? Destination { get; }
Property Value
| Type | Description |
|---|---|
| CogniteDestination |
EventUploadQueue
Event upload queue
Declaration
protected EventUploadQueue? EventUploadQueue { get; }
Property Value
| Type | Description |
|---|---|
| EventUploadQueue |
Provider
Access to the service provider this extractor was built from
Declaration
protected IServiceProvider Provider { get; }
Property Value
| Type | Description |
|---|---|
| IServiceProvider |
RawUploadQueues
Raw upload queues, by dbName-tableName and type.
Declaration
protected Dictionary<(string name, Type type), IUploadQueue> RawUploadQueues { get; }
Property Value
| Type | Description |
|---|---|
| Dictionary<(string name, Type type), IUploadQueue> |
Run
Extraction run for reporting to an extraction pipeline in CDF.
Declaration
protected ExtractionRun? Run { get; }
Property Value
| Type | Description |
|---|---|
| ExtractionRun |
Scheduler
Scheduler for running various periodic tasks
Declaration
protected PeriodicScheduler Scheduler { get; set; }
Property Value
| Type | Description |
|---|---|
| PeriodicScheduler |
SchedulerExitTimeoutMs
Timeout in milliseconds for the scheduler to shut down when closing. Defaults to 0, which waits forever.
Declaration
protected int SchedulerExitTimeoutMs { get; set; }
Property Value
| Type | Description |
|---|---|
| int |
Source
Cancellation token source
Declaration
protected CancellationTokenSource Source { get; set; }
Property Value
| Type | Description |
|---|---|
| CancellationTokenSource |
TSUploadQueue
Timeseries upload queue
Declaration
protected TimeSeriesUploadQueue? TSUploadQueue { get; }
Property Value
| Type | Description |
|---|---|
| TimeSeriesUploadQueue |
Methods
| Edit this page View SourceCreateEventQueue(int, TimeSpan, Func<QueueUploadResult<EventCreate>, Task>?, string?)
Set the EventUploadQueue value to a new event queue.
Declaration
protected void CreateEventQueue(int maxSize, TimeSpan uploadInterval, Func<QueueUploadResult<EventCreate>, Task>? callback, string? bufferPath = null)
Parameters
| Type | Name | Description |
|---|---|---|
| int | maxSize | Maximum size of queue before pushing to CDF, 0 for no limit |
| TimeSpan | uploadInterval | Interval between each push to CDF |
| Func<QueueUploadResult<EventCreate>, Task> | callback | Callback after each push to CDF |
| string | bufferPath | Optional path to buffer file |
CreateRawQueue<T>(string, string, int, TimeSpan, Func<QueueUploadResult<(string key, T columns)>, Task>)
Create a raw queue with the given type and name
Declaration
protected void CreateRawQueue<T>(string dbName, string tableName, int maxSize, TimeSpan uploadInterval, Func<QueueUploadResult<(string key, T columns)>, Task> callback)
Parameters
| Type | Name | Description |
|---|---|---|
| string | dbName | Name of database in Raw |
| string | tableName | Name of table in Raw |
| int | maxSize | Max size of queue before triggering push, 0 for no limit |
| TimeSpan | uploadInterval | Interval between each push to CDF |
| Func<QueueUploadResult<(string key, T columns)>, Task> | callback | Callback after each push |
Type Parameters
| Name | Description |
|---|---|
| T | Type of columns in raw queue |
CreateTimeseriesQueue(int, TimeSpan, Func<QueueUploadResult<(Identity id, Datapoint dp)>, Task>?, string?)
Set the TSUploadQueue value to a new timeseries queue.
Declaration
protected void CreateTimeseriesQueue(int maxSize, TimeSpan uploadInterval, Func<QueueUploadResult<(Identity id, Datapoint dp)>, Task>? callback, string? bufferPath = null)
Parameters
| Type | Name | Description |
|---|---|---|
| int | maxSize | Maximum size of queue before pushing to CDF, 0 for no limit |
| TimeSpan | uploadInterval | Interval between each push to CDF |
| Func<QueueUploadResult<(Identity id, Datapoint dp)>, Task> | callback | Callback after each push to CDF |
| string | bufferPath | Optional path to buffer file |
Dispose()
Dispose extractor. Use DisposeAsync instead if possible.
Declaration
public void Dispose()
Dispose(bool)
Dispose of extractor, waiting for running tasks and pushing everything pending to CDF. Use DisposeAsync instead if possible.
Declaration
protected virtual void Dispose(bool disposing)
Parameters
| Type | Name | Description |
|---|---|---|
| bool | disposing | True if disposing |
DisposeAsync()
Dispose extractor asynchronously. Preferred over synchronous dispose.
Declaration
public ValueTask DisposeAsync()
Returns
| Type | Description |
|---|---|
| ValueTask |
DisposeAsyncCore()
Internal method to dispose asynchronously.
Declaration
protected virtual ValueTask DisposeAsyncCore()
Returns
| Type | Description |
|---|---|
| ValueTask |
Init(CancellationToken)
Called before Start() and TestConfig(), by default initializes just Source and Scheduler.
Declaration
protected virtual void Init(CancellationToken token)
Parameters
| Type | Name | Description |
|---|---|---|
| CancellationToken | token |
OnStop()
Called when the extractor is stopping.
Declaration
protected virtual Task OnStop()
Returns
| Type | Description |
|---|---|
| Task |
RunWithHighAvailabilityAndWait(HighAvailabilityConfig, TimeSpan?, TimeSpan?)
Method called to add high availability to an extractor.
Declaration
public Task RunWithHighAvailabilityAndWait(HighAvailabilityConfig config, TimeSpan? interval = null, TimeSpan? inactivityThreshold = null)
Parameters
| Type | Name | Description |
|---|---|---|
| HighAvailabilityConfig | config | Configuration object |
| TimeSpan? | interval | Optional update state interval. |
| TimeSpan? | inactivityThreshold | Optional threshold for extractor being inactive. |
Returns
| Type | Description |
|---|---|
| Task |
ScheduleDatapointsRun(string, TimeSpan, Func<CancellationToken, Task<IEnumerable<(Identity id, Datapoint dp)>>>)
Schedule a periodic run retrieving datapoints from source systems.
Declaration
protected void ScheduleDatapointsRun(string scheduleName, TimeSpan readInterval, Func<CancellationToken, Task<IEnumerable<(Identity id, Datapoint dp)>>> readDatapoints)
Parameters
| Type | Name | Description |
|---|---|---|
| string | scheduleName | Name of task in Scheduler, must be unique |
| TimeSpan | readInterval | Interval between each read |
| Func<CancellationToken, Task<IEnumerable<(Identity id, Datapoint dp)>>> | readDatapoints | Function reading datapoints from the source system |
ScheduleEventsRun(string, TimeSpan, Func<CancellationToken, Task<IEnumerable<EventCreate>>>)
Schedule a periodic run retrieving events from source systems.
Declaration
protected void ScheduleEventsRun(string scheduleName, TimeSpan readInterval, Func<CancellationToken, Task<IEnumerable<EventCreate>>> readEvents)
Parameters
| Type | Name | Description |
|---|---|---|
| string | scheduleName | Name of task in Scheduler, must be unique |
| TimeSpan | readInterval | Interval between each read |
| Func<CancellationToken, Task<IEnumerable<EventCreate>>> | readEvents | Function reading events from the source system |
ScheduleRawRun<T>(string, string, string, TimeSpan, Func<CancellationToken, Task<IEnumerable<(string key, T columns)>>>)
Schedule a periodic run with the given type and name. Requires a raw queue with matching type and db/table to be created first.
Declaration
protected void ScheduleRawRun<T>(string scheduleName, string dbName, string tableName, TimeSpan readInterval, Func<CancellationToken, Task<IEnumerable<(string key, T columns)>>> readRawRows)
Parameters
| Type | Name | Description |
|---|---|---|
| string | scheduleName | Name of schedule in Scheduler, must be unique |
| string | dbName | Database in Raw |
| string | tableName | Table in Raw |
| TimeSpan | readInterval | Interval between each execution of |
| Func<CancellationToken, Task<IEnumerable<(string key, T columns)>>> | readRawRows | Function asynchronously returning a new batch of raw rows from the source system |
Type Parameters
| Name | Description |
|---|---|
| T | Type of columns |
Start()
Internal method starting the extractor. Should handle any creation of timeseries, setup of source systems, and calls to the various Schedule and Create protected methods. Should not be the actual extraction. Start should return once the extractor has successfully started. Other tasks can be scheduled in the PeriodicScheduler.
Declaration
protected abstract Task Start()
Returns
| Type | Description |
|---|---|
| Task |
Start(CancellationToken)
Method called to start the extractor.
Declaration
public virtual Task Start(CancellationToken token)
Parameters
| Type | Name | Description |
|---|---|---|
| CancellationToken | token | Cancellation token |
Returns
| Type | Description |
|---|---|
| Task |
TestConfig()
Verify that the extractor is configured correctly.
Declaration
protected virtual Task TestConfig()
Returns
| Type | Description |
|---|---|
| Task | Task |
Events
| Edit this page View SourceOnConfigUpdate
If a RemoteConfigManager is provided, it will periodically look for config updates.
Declaration
protected event BaseExtractor<TConfig>.OnConfigUpdateHandler? OnConfigUpdate
Event Type
| Type | Description |
|---|---|
| BaseExtractor<TConfig>.OnConfigUpdateHandler |