diff --git a/python/destinations/influxdb_1/README.md b/python/destinations/influxdb_1/README.md new file mode 100644 index 00000000..ed81907a --- /dev/null +++ b/python/destinations/influxdb_1/README.md @@ -0,0 +1,59 @@ +# InfluxDB v1 + +[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/influxdb_1) demonstrates how to +consume data from a Kafka topic in Quix and persist the data to an InfluxDB v3 database using the InfluxDB v1 write API. + +To learn more about how it functions, [check out the underlying +Quix Streams `InfluxDB1Source`](https://quix.io/docs/quix-streams/connectors/sinks/influxdb1-sink.html). + +## How to run + +Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the `Connectors` tab to use this connector. + +Clicking `Set up connector` allows you to enter your connection details and runtime parameters. + +Then either: +* click `Test connection & deploy` to deploy the pre-built and configured container into Quix. + +* or click `Customise connector` to inspect or alter the code before deployment. + +## Environment Variables + +The connector uses the following environment variables: + +### Required +- **input**: Quix input topic +- **INFLUXDB_HOST**: Host address for the InfluxDB instance. +- **INFLUXDB_PORT**: Port for the InfluxDB instance. +- **INFLUXDB_USERNAME**: Username for the InfluxDB instance. +- **INFLUXDB_PASSWORD**: Password for the InfluxDB instance. + +### Optional +- **INFLUXDB_DATABASE**: Database name in InfluxDB where data should be stored. + Default: `quix` +- **INFLUXDB_TAG_KEYS**: A comma-separated list of column names (based on message value) to be used as tags when writing data to InfluxDB. + Can optionally replace with a callable in the template directly. +- **INFLUXDB_FIELD_KEYS**: A comma-separated list of column names (based on message value) to be used as fields when writing data to InfluxDB. + Can optionally replace with a callable in the template directly. +- **INFLUXDB_MEASUREMENT_NAME**: The InfluxDB measurement to write data to. + Can optionally replace with a callable in the template directly. + Default: `default` +- **TIMESTAMP_COLUMN**: This is the column in your data that represents the timestamp in nanoseconds. + Defaults to use the message timestamp received from the broker if not supplied. + Can optionally replace with a callable in the template directly. +- **BUFFER_SIZE**: Number of records to buffer before writing to TDengine. + Default: `50` +- **BUFFER_TIMEOUT**: Maximum time (in seconds) to buffer records before writing to TDengine. + Default: `1` + +## Requirements / Prerequisites + +You will need to have an InfluxDB 3.0 instance available and an API authentication token. + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open Source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation. diff --git a/python/destinations/influxdb_1/dockerfile b/python/destinations/influxdb_1/dockerfile new file mode 100644 index 00000000..752b6e83 --- /dev/null +++ b/python/destinations/influxdb_1/dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/destinations/influxdb_1/icon.png b/python/destinations/influxdb_1/icon.png new file mode 100644 index 00000000..1b950bf3 Binary files /dev/null and b/python/destinations/influxdb_1/icon.png differ diff --git a/python/destinations/influxdb_1/library.json b/python/destinations/influxdb_1/library.json new file mode 100644 index 00000000..c72f9ea5 --- /dev/null +++ b/python/destinations/influxdb_1/library.json @@ -0,0 +1,122 @@ +{ + "libraryItemId": "influxdb-1-destination", + "name": "InfluxDB 1.0 Sink", + "language": "Python", + "tags": { + "Pipeline Stage": ["Destination"], + "Type": ["Connectors"], + "Category": ["Time series DB"] + }, + "shortDescription": "Consume data from a Kafka topic in Quix and persist the data to an InfluxDB 3.0 database.", + "DefaultFile": "main.py", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "IconFile": "icon.png", + "Variables": [ + { + "Name": "input", + "Type": "EnvironmentVariable", + "InputType": "InputTopic", + "Description": "This is the input topic", + "DefaultValue": "input-data", + "Required": true + }, + { + "Name": "INFLUXDB_HOST", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Host address for the InfluxDB instance (formatted as https://).", + "Required": true + }, + { + "Name": "INFLUXDB_PORT", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Port for the InfluxDB instance.", + "Required": true + }, + { + "Name": "INFLUXDB_USERNAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Username for the InfluxDB instance.", + "Required": true + }, + { + "Name": "INFLUXDB_PASSWORD", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "Password for the InfluxDB instance.", + "Required": true + }, + { + "Name": "INFLUXDB_MEASUREMENT_NAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The InfluxDB measurement to write data to. Can optionally replace with a callable in the template directly.", + "DefaultValue": "default", + "Required": false + }, + { + "Name": "INFLUXDB_DATABASE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Database name in InfluxDB where data should be stored.", + "DefaultValue": "quix", + "Required": true + }, + { + "Name": "INFLUXDB_TAG_KEYS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The tags to include when writing the measurement data. Example: Tag1,Tag2. Can optionally replace with a callable in the template directly.", + "Required": false + }, + { + "Name": "INFLUXDB_FIELD_KEYS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The fields to include when writing the measurement data. Example: `Field1,Field2`. Can optionally replace with a callable in the template directly. ", + "Required": false + }, + { + "Name": "CONSUMER_GROUP_NAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The name of the consumer group to use when consuming from Kafka", + "DefaultValue": "influxdb1-sink", + "Required": true + }, + { + "Name": "TIMESTAMP_COLUMN", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The column containing the timestamp column. NOTE: Must be nanoseconds. Can optionally replace with a callable in the template directly.", + "Required": false + }, + { + "Name": "BUFFER_SIZE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The number of records that sink holds before flush data to the InfluxDb", + "DefaultValue": "1000", + "Required": false + }, + { + "Name": "BUFFER_TIMEOUT", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The number of seconds that sink holds before flush data to the InfluxDb", + "DefaultValue": "1", + "Required": false + } + ], + "DeploySettings": { + "DeploymentType": "Service", + "CpuMillicores": 200, + "MemoryInMb": 500, + "Replicas": 1, + "PublicAccess": false, + "ValidateConnection": true + } +} diff --git a/python/destinations/influxdb_1/main.py b/python/destinations/influxdb_1/main.py new file mode 100644 index 00000000..ab418c6d --- /dev/null +++ b/python/destinations/influxdb_1/main.py @@ -0,0 +1,60 @@ +# import Utility modules +import os + +from typing import Optional + +# import vendor-specific modules +from quixstreams import Application +from quixstreams.sinks.core.influxdb1 import ( + InfluxDB1Sink, + FieldsSetter, + MeasurementSetter, + TagsSetter, + TimeSetter, +) + +# for local dev, load env vars from a .env file +from dotenv import load_dotenv +load_dotenv() + + +def _as_iterable(env_var) -> list[str]: + return keys.split(",") if (keys := os.environ.get(env_var)) else [] + + +# Potential Callables - can manually edit these to instead use your own callables. +# --Required-- +measurement_name: MeasurementSetter = os.getenv("INFLUXDB_MEASUREMENT_NAME", "default") +# --Optional-- +tag_keys: TagsSetter = _as_iterable("INFLUXDB_TAG_KEYS") +field_keys: FieldsSetter = _as_iterable("INFLUXDB_FIELD_KEYS") +time_setter: Optional[TimeSetter] = col if (col := os.environ.get("TIMESTAMP_COLUMN")) else None + + +influxdb_v1_sink = InfluxDB1Sink( + host=os.environ["INFLUXDB_HOST"], + port=int(os.environ["INFLUXDB_PORT"]), + username=os.environ["INFLUXDB_USERNAME"], + password=os.environ["INFLUXDB_PASSWORD"], + tags_keys=tag_keys, + fields_keys=field_keys, + time_setter=time_setter, + database=os.getenv("INFLUXDB_DATABASE", "quix"), + measurement=measurement_name, +) + + +app = Application( + consumer_group=os.environ.get("CONSUMER_GROUP_NAME", "influxdb-data-writer"), + auto_offset_reset="earliest", + commit_every=int(os.environ.get("BUFFER_SIZE", "1000")), + commit_interval=float(os.environ.get("BUFFER_DELAY", "1")), +) +input_topic = app.topic(os.environ["input"]) + +sdf = app.dataframe(input_topic) +sdf.sink(influxdb_v1_sink) + + +if __name__ == "__main__": + app.run() diff --git a/python/destinations/influxdb_1/requirements.txt b/python/destinations/influxdb_1/requirements.txt new file mode 100644 index 00000000..c8f56dd7 --- /dev/null +++ b/python/destinations/influxdb_1/requirements.txt @@ -0,0 +1,2 @@ +quixstreams[influxdb3]==3.19.0 +python-dotenv \ No newline at end of file