Class BaseExtractor<TConfig>
Base class for extractors writing timeseries, events or datapoints.
Inherited Members
Namespace: Cognite.Extractor.Utils
Assembly: ExtractorUtils.dll
Syntax
public abstract class BaseExtractor<TConfig> : IDisposable, IAsyncDisposable where TConfig : VersionedConfig
Type Parameters
Name | Description |
---|---|
TConfig |
Constructors
| Edit this page View SourceBaseExtractor(TConfig, IServiceProvider, CogniteDestination?, ExtractionRun?, RemoteConfigManager<TConfig>?)
Constructor
Declaration
public BaseExtractor(TConfig config, IServiceProvider provider, CogniteDestination? destination = null, ExtractionRun? run = null, RemoteConfigManager<TConfig>? configManager = null)
Parameters
Type | Name | Description |
---|---|---|
TConfig | config | Configuration object |
IServiceProvider | provider | Service provider |
CogniteDestination | destination | Cognite destination |
ExtractionRun | run | Optional extraction run |
RemoteConfigManager<TConfig> | configManager | Optional remote config manager |
Properties
| Edit this page View SourceConfig
Configuration object
Declaration
protected TConfig Config { get; }
Property Value
Type | Description |
---|---|
TConfig |
ConfigManager
Config manager for fetching remote configs, will contain the newest config version and its revision.
Declaration
protected RemoteConfigManager<TConfig>? ConfigManager { get; }
Property Value
Type | Description |
---|---|
RemoteConfigManager<TConfig> |
Destination
CDF destination
Declaration
protected CogniteDestination? Destination { get; }
Property Value
Type | Description |
---|---|
CogniteDestination |
EventUploadQueue
Event upload queue
Declaration
protected EventUploadQueue? EventUploadQueue { get; }
Property Value
Type | Description |
---|---|
EventUploadQueue |
Provider
Access to the service provider this extractor was built from
Declaration
protected IServiceProvider Provider { get; }
Property Value
Type | Description |
---|---|
IServiceProvider |
RawUploadQueues
Raw upload queues, by dbName-tableName and type.
Declaration
protected Dictionary<(string name, Type type), IUploadQueue> RawUploadQueues { get; }
Property Value
Type | Description |
---|---|
Dictionary<(string name, Type type), IUploadQueue> |
Run
Extraction run for reporting to an extraction pipeline in CDF.
Declaration
protected ExtractionRun? Run { get; }
Property Value
Type | Description |
---|---|
ExtractionRun |
Scheduler
Scheduler for running various periodic tasks
Declaration
protected PeriodicScheduler Scheduler { get; set; }
Property Value
Type | Description |
---|---|
PeriodicScheduler |
Source
Cancellation token source
Declaration
protected CancellationTokenSource Source { get; set; }
Property Value
Type | Description |
---|---|
CancellationTokenSource |
TSUploadQueue
Timeseries upload queue
Declaration
protected TimeSeriesUploadQueue? TSUploadQueue { get; }
Property Value
Type | Description |
---|---|
TimeSeriesUploadQueue |
Methods
| Edit this page View SourceCreateEventQueue(int, TimeSpan, Func<QueueUploadResult<EventCreate>, Task>?, string?)
Set the EventUploadQueue value to a new event queue.
Declaration
protected void CreateEventQueue(int maxSize, TimeSpan uploadInterval, Func<QueueUploadResult<EventCreate>, Task>? callback, string? bufferPath = null)
Parameters
Type | Name | Description |
---|---|---|
int | maxSize | Maximum size of queue before pushing to CDF, 0 for no limit |
TimeSpan | uploadInterval | Interval between each push to CDF |
Func<QueueUploadResult<EventCreate>, Task> | callback | Callback after each push to CDF |
string | bufferPath | Optional path to buffer file |
CreateRawQueue<T>(string, string, int, TimeSpan, Func<QueueUploadResult<(string key, T columns)>, Task>)
Create a raw queue with the given type and name
Declaration
protected void CreateRawQueue<T>(string dbName, string tableName, int maxSize, TimeSpan uploadInterval, Func<QueueUploadResult<(string key, T columns)>, Task> callback)
Parameters
Type | Name | Description |
---|---|---|
string | dbName | Name of database in Raw |
string | tableName | Name of table in Raw |
int | maxSize | Max size of queue before triggering push, 0 for no limit |
TimeSpan | uploadInterval | Interval between each push to CDF |
Func<QueueUploadResult<(string key, T columns)>, Task> | callback | Callback after each push |
Type Parameters
Name | Description |
---|---|
T | Type of columns in raw queue |
CreateTimeseriesQueue(int, TimeSpan, Func<QueueUploadResult<(Identity id, Datapoint dp)>, Task>?, string?)
Set the TSUploadQueue value to a new timeseries queue.
Declaration
protected void CreateTimeseriesQueue(int maxSize, TimeSpan uploadInterval, Func<QueueUploadResult<(Identity id, Datapoint dp)>, Task>? callback, string? bufferPath = null)
Parameters
Type | Name | Description |
---|---|---|
int | maxSize | Maximum size of queue before pushing to CDF, 0 for no limit |
TimeSpan | uploadInterval | Interval between each push to CDF |
Func<QueueUploadResult<(Identity id, Datapoint dp)>, Task> | callback | Callback after each push to CDF |
string | bufferPath | Optional path to buffer file |
Dispose()
Dispose extractor. Use DisposeAsync instead if possible.
Declaration
public void Dispose()
Dispose(bool)
Dispose of extractor, waiting for running tasks and pushing everything pending to CDF. Use DisposeAsync instead if possible.
Declaration
protected virtual void Dispose(bool disposing)
Parameters
Type | Name | Description |
---|---|---|
bool | disposing | True if disposing |
DisposeAsync()
Dispose extractor asynchronously. Preferred over synchronous dispose.
Declaration
public ValueTask DisposeAsync()
Returns
Type | Description |
---|---|
ValueTask |
DisposeAsyncCore()
Internal method to dispose asynchronously.
Declaration
protected virtual ValueTask DisposeAsyncCore()
Returns
Type | Description |
---|---|
ValueTask |
Init(CancellationToken)
Called before Start() and TestConfig(), by default initializes just Source and Scheduler.
Declaration
protected virtual void Init(CancellationToken token)
Parameters
Type | Name | Description |
---|---|---|
CancellationToken | token |
OnStop()
Called when the extractor is stopping.
Declaration
protected virtual Task OnStop()
Returns
Type | Description |
---|---|
Task |
RunWithHighAvailabilityAndWait(HighAvailabilityConfig, TimeSpan?, TimeSpan?)
Method called to add high availability to an extractor.
Declaration
public Task RunWithHighAvailabilityAndWait(HighAvailabilityConfig config, TimeSpan? interval = null, TimeSpan? inactivityThreshold = null)
Parameters
Type | Name | Description |
---|---|---|
HighAvailabilityConfig | config | Configuration object |
TimeSpan? | interval | Optional update state interval. |
TimeSpan? | inactivityThreshold | Optional threshold for extractor being inactive. |
Returns
Type | Description |
---|---|
Task |
ScheduleDatapointsRun(string, TimeSpan, Func<CancellationToken, Task<IEnumerable<(Identity id, Datapoint dp)>>>)
Schedule a periodic run retrieving datapoints from source systems.
Declaration
protected void ScheduleDatapointsRun(string scheduleName, TimeSpan readInterval, Func<CancellationToken, Task<IEnumerable<(Identity id, Datapoint dp)>>> readDatapoints)
Parameters
Type | Name | Description |
---|---|---|
string | scheduleName | Name of task in Scheduler, must be unique |
TimeSpan | readInterval | Interval between each read |
Func<CancellationToken, Task<IEnumerable<(Identity id, Datapoint dp)>>> | readDatapoints | Function reading datapoints from the source system |
ScheduleEventsRun(string, TimeSpan, Func<CancellationToken, Task<IEnumerable<EventCreate>>>)
Schedule a periodic run retrieving events from source systems.
Declaration
protected void ScheduleEventsRun(string scheduleName, TimeSpan readInterval, Func<CancellationToken, Task<IEnumerable<EventCreate>>> readEvents)
Parameters
Type | Name | Description |
---|---|---|
string | scheduleName | Name of task in Scheduler, must be unique |
TimeSpan | readInterval | Interval between each read |
Func<CancellationToken, Task<IEnumerable<EventCreate>>> | readEvents | Function reading events from the source system |
ScheduleRawRun<T>(string, string, string, TimeSpan, Func<CancellationToken, Task<IEnumerable<(string key, T columns)>>>)
Schedule a periodic run with the given type and name. Requires a raw queue with matching type and db/table to be created first.
Declaration
protected void ScheduleRawRun<T>(string scheduleName, string dbName, string tableName, TimeSpan readInterval, Func<CancellationToken, Task<IEnumerable<(string key, T columns)>>> readRawRows)
Parameters
Type | Name | Description |
---|---|---|
string | scheduleName | Name of schedule in Scheduler, must be unique |
string | dbName | Database in Raw |
string | tableName | Table in Raw |
TimeSpan | readInterval | Interval between each execution of |
Func<CancellationToken, Task<IEnumerable<(string key, T columns)>>> | readRawRows | Function asynchronously returning a new batch of raw rows from the source system |
Type Parameters
Name | Description |
---|---|
T | Type of columns |
Start()
Internal method starting the extractor. Should handle any creation of timeseries, setup of source systems, and calls to the various Schedule and Create protected methods. Should not be the actual extraction. Start should return once the extractor has successfully started. Other tasks can be scheduled in the PeriodicScheduler.
Declaration
protected abstract Task Start()
Returns
Type | Description |
---|---|
Task |
Start(CancellationToken)
Method called to start the extractor.
Declaration
public virtual Task Start(CancellationToken token)
Parameters
Type | Name | Description |
---|---|---|
CancellationToken | token | Cancellation token |
Returns
Type | Description |
---|---|
Task |
TestConfig()
Verify that the extractor is configured correctly.
Declaration
protected virtual Task TestConfig()
Returns
Type | Description |
---|---|
Task | Task |
Events
| Edit this page View SourceOnConfigUpdate
If a RemoteConfigManager is provided, it will periodically look for config updates.
Declaration
protected event BaseExtractor<TConfig>.OnConfigUpdateHandler? OnConfigUpdate
Event Type
Type | Description |
---|---|
BaseExtractor<TConfig>.OnConfigUpdateHandler |