From 81522ca26f848100a0c9117fbae7d123b6c8ff92 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Mon, 14 Jul 2025 20:22:55 -0400 Subject: [PATCH] add influxdb1 template --- python/destinations/influxdb_1/README.md | 59 +++++++++ python/destinations/influxdb_1/dockerfile | 28 ++++ python/destinations/influxdb_1/icon.png | Bin 0 -> 12013 bytes python/destinations/influxdb_1/library.json | 122 ++++++++++++++++++ python/destinations/influxdb_1/main.py | 60 +++++++++ .../destinations/influxdb_1/requirements.txt | 2 + 6 files changed, 271 insertions(+) create mode 100644 python/destinations/influxdb_1/README.md create mode 100644 python/destinations/influxdb_1/dockerfile create mode 100644 python/destinations/influxdb_1/icon.png create mode 100644 python/destinations/influxdb_1/library.json create mode 100644 python/destinations/influxdb_1/main.py create mode 100644 python/destinations/influxdb_1/requirements.txt diff --git a/python/destinations/influxdb_1/README.md b/python/destinations/influxdb_1/README.md new file mode 100644 index 00000000..ed81907a --- /dev/null +++ b/python/destinations/influxdb_1/README.md @@ -0,0 +1,59 @@ +# InfluxDB v1 + +[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/influxdb_1) demonstrates how to +consume data from a Kafka topic in Quix and persist the data to an InfluxDB v3 database using the InfluxDB v1 write API. + +To learn more about how it functions, [check out the underlying +Quix Streams `InfluxDB1Source`](https://quix.io/docs/quix-streams/connectors/sinks/influxdb1-sink.html). + +## How to run + +Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the `Connectors` tab to use this connector. + +Clicking `Set up connector` allows you to enter your connection details and runtime parameters. + +Then either: +* click `Test connection & deploy` to deploy the pre-built and configured container into Quix. + +* or click `Customise connector` to inspect or alter the code before deployment. + +## Environment Variables + +The connector uses the following environment variables: + +### Required +- **input**: Quix input topic +- **INFLUXDB_HOST**: Host address for the InfluxDB instance. +- **INFLUXDB_PORT**: Port for the InfluxDB instance. +- **INFLUXDB_USERNAME**: Username for the InfluxDB instance. +- **INFLUXDB_PASSWORD**: Password for the InfluxDB instance. + +### Optional +- **INFLUXDB_DATABASE**: Database name in InfluxDB where data should be stored. + Default: `quix` +- **INFLUXDB_TAG_KEYS**: A comma-separated list of column names (based on message value) to be used as tags when writing data to InfluxDB. + Can optionally replace with a callable in the template directly. +- **INFLUXDB_FIELD_KEYS**: A comma-separated list of column names (based on message value) to be used as fields when writing data to InfluxDB. + Can optionally replace with a callable in the template directly. +- **INFLUXDB_MEASUREMENT_NAME**: The InfluxDB measurement to write data to. + Can optionally replace with a callable in the template directly. + Default: `default` +- **TIMESTAMP_COLUMN**: This is the column in your data that represents the timestamp in nanoseconds. + Defaults to use the message timestamp received from the broker if not supplied. + Can optionally replace with a callable in the template directly. +- **BUFFER_SIZE**: Number of records to buffer before writing to TDengine. + Default: `50` +- **BUFFER_TIMEOUT**: Maximum time (in seconds) to buffer records before writing to TDengine. + Default: `1` + +## Requirements / Prerequisites + +You will need to have an InfluxDB 3.0 instance available and an API authentication token. + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open Source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation. diff --git a/python/destinations/influxdb_1/dockerfile b/python/destinations/influxdb_1/dockerfile new file mode 100644 index 00000000..752b6e83 --- /dev/null +++ b/python/destinations/influxdb_1/dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/destinations/influxdb_1/icon.png b/python/destinations/influxdb_1/icon.png new file mode 100644 index 0000000000000000000000000000000000000000..1b950bf3cb3e80979fa49b922b6ebe344fcfc197 GIT binary patch literal 12013 zcmdUVWmjBX)FkfKNPwUX39iB2HF&T9frbQk4UL509xS*w?(VLE2lwFa?hbSF&a5?m zVdev0dNrr--DgYfstQ$8k;BG#ivb4*hpiwltpNuIPx9{%4F&il_}6{}92{G%g0zID zd)iStx+bCd!YP&I8ws}*8Es^c`Ztg)426hR=PN!%%@=37a5-}q7j6RcYW~4n zB&82KpT3GJ4=d0mbMmz`5#-RK{4PX| zAO4s;j~A{gGJEC6Ub!tiYC2*g^1WY8LFzOe7>$;J10%bl2s_Np8(ydVf!KhGH1CyH zViyh^PC&ig#Ir_OoVXy3bdL^vZ+&{Up;Ob~+24m&@+-eOTH7^U<6#MLY5Q!l`AT3= z8h%LgcX;Uv1=L$LF?bePyr1Y%Z?Wd`coB^Ibv2O_#Rh$gRh@LlE|Wnhn71|vQ6b`^ z))~4Tm~WjG?O!_V4UmuuMuNt#+{fu<&rSIJ;4mUui^Q}L1fn{Sd%O{~)5REjaX*N< zr#LrFf0;rss_^LAa9u|5?i4e9d)KGzduGbmcqMs+fGnF&QXK|7tT%@szs=MbR2D@& z#_{^rM5KbT0ik;K)DQIQ&{GCgO5BP2+NL{!Zax2l2|;tf>htJBpAZekTtvFVk4$1lEH_aAn5U-=zET{WA!|*v}ac5 zK*Y_3-oVNe-4&Tozjx&9WXYNk7lLpBN>zqjD?X|mss&>qQse0H0KUYBHwHl;gRA$0 zoWCn`@vFGx4>2KCA+g1L>%CtNUVmDtaBoWRuOS#-s6^P{T^ZcB)ua-|G|%ubU%E{p z+(6C#7}qJriE)J<(`fI%goG?hN7#PD_?zM`==M;y^o}&Q?c93wl8sy#W!LaNDbX_} zM+1IvsxUF&k#_+ZS=K~0SZ?hq&R%LH<`lU_?x@)`y7gIN^z!a1YZy;83{Cm0n?FL) z5|5B1lySwIMA7|4L!V~Tzkr&<6=fGHNA>qSEJYnCmrGS3;Q1(k-%Ax?)z z79}X_+p1XBQD;nQ(W~TS+IP?s0j=dTKH1CQgj(hoEUD!=q^g{Tb|S0x7s_H$F>G(; z<1FXNiRfnk9F^!t@T4DQ1QM3pbyowKoBzH+-rj9SgEIvy2;v`NG)nM_9Yi1lo6l1X z35dROM^jdX%lzvwaT1P(i4ieyMWS?e%>G?M$2IQVZDyA9PZ@hV-|_e8!1culG(mB> zw|)K)ebap}5}e)Es$%ttf#5o$Qt(h~l$?|% z%!myt%iYpQy`sDnBxF3>Po$CRrwfk}fmXYv(_QrER-&Wmqt&M$;#Vm$(GQb3X}fia zbCYhMZmTiq&-8yzD;yf%JrBDZd$3tXgJ6;_$2;If$*UOcASGk<&+m;*Y?%`V5?ZdfVF3de8&|NE|oFJfLtFgsI;>XNvkueEx+t4ZCD3!u%UvS=2>% zDuQnjCj&piPlO&?WT988-yM&tMI)7%G#}3Ob-IVIhxn&%u|M}YI9(dbK&_ON#sz+g za>lAyq8Vi%WPso`QgboXAnrefC&q-G)yBgtszRH^uy@*86E0has51S0p8q_;{=TCF zu8QvgpItROwU5ACIG0x;Cad%BAVi1CHj8!ctW1Ojx{Hox$e()IYCm@h^CY)2n++0j z>>c74_pL(QX!>TqfEc+NYE(gsGONNE?fDBUGbzWByYvuiQ;mXGGk~jorY? zRsK8R&Ne`l;=8PlTKok6vXDKJ!xA}@)R^NoPtKx5t@i`}(+~1!_9-!{g(43}u*+P1 z=Uj-}k^oQU%7iz7a`{ z&Ne`#!Fl*dsKjz_Q*e?9urt)-iW)Wu$bycAkKvH}F}4grth_*E8qJOZ2&|2QfBom7 zepHQIy#+RbsfHkBCa-gsBtD6Sp?73;R&q=)s_r7kQ}EL~0`~=U6sgBYr@+e40djAL z;9O5_X{FYJ;GeH)7HIOxC>b$v`!Av6oAz?Sau)3A?fEzkH!gWG{AH{)Y7pIl=S`#R zi<~%(RIt9^)COd*%QkJ7XV)+n3N-knw^El{G}^;Nbz)ecnXucaDNsv;in+sugMZ)S zkFPU#By~X6bq4y&tuHZW%G(AVB|mP^ay)1ErR@^E+<#*5Onn`UH(NsRE==4I;6Z!( z1OrnG=n<-ul9Kq%#+JCWf7jpF5+rV|C`zvq9qQ^sH=!E(*{J;85(Z$K2;|=e|2rsf zlrsH&f700|fUDV~D-{JR{vJe>RuU{B>q;_0XWeDWE-4Oud38@11BY*dpF8c5t3Ih@ zbv~i-XiA4i*4a`*SQM&`IzfdmPqkWQlp#rX{mSsGe>s}JO9sf4a?K>{UlVshm(s(iR*@CeV3z+9A zLYrH>AOnA|JrtH}C45?$k7l&p<}8h1x~+~cJhELap#E|rl)*6vxy|2LW*5KS2c<8y z>Qp)w=|LLp;V=we(3YgGYKO##%U=vRoS&VNnhXgqStdWvpv6;}O3mz=Lke3|pB{~3 zjqWn+x!F0S+MZpMn8Gl{yj~|)zs*zFeS@GW$!JYW?H8677@dqB&?%&v*2T{$y4hEa z*LKz>&Pkqx`^-sCz;v&88m5-Adu>bJ*BHCFBPq$uXeu=qlkKl4Snq4iyRIDaxlZII zr^szuX+!?Iq4MpX6N_8G($dKM!jjU7#|fqk<$Mh?m;41>U6?|Ijet%lLJltn7As3z zL$A8ZRBhbAJ1c~`+U2x6fE89=Q)2%pkO&9-u3d-_rOu-I&g zum4>c(|d!!SN&scMZwCuI&?o6qp#&)ML=A~{Kv<5>h3N%W(6s2&S}aK;&-poDz)*m z^0t0i7Lt&E1JmUd#G*$S`k72S;?dmt*G^AIhNmi*y2wT4VW|~T1%oSZg+^N^*7c3l z%}(36SG|GNCvJ(Sg!s)9^~K-%@T<0q>@5~i<0NBWWPR0{fQe7iC-ZKjp&TZ)*#EKb zF*${R*pK{i`z){+d8fN8=dYav65SrT#eXd#J=_0E>6?7Gi@^urfgpw25Dr3C96@vX zq}K&@un9Em%iMms<-~CNP!uPT)|9toHW@_0_n<$)MVqgC0@W?6G32LRY1o3HBUs5n zb6IY}D(Dd<>%iGHU#0Nii|v19KkZMR#kgxj(ZWks2olJ>5d6w4CLq`+U=O{RGA*W; z6hXw3!qEfSg{>BF!4<0UZfwH~ zJa&O1i4PmKK#GoMysYzzOp)>$g$$+=Ft;3Ta8EH-pjhzDUY}piozvnwSnuOq>>Bd) zz%;x~V?N?ilL-Kny)di52Lh*Hi*U-|78+n|;W#cJH1YJNob7Hl;Va(yexoLCJ6Dg- zMau1!n!~wUurH47Y*=zwanW6|+lKc)q5NlwVJx-fXM8GOM9hA%6QDZ8))CvO3QYW|F!CI{QSS@MyFq5{rES+!!}PRkJnd+ zunN_mXkvpCK;Rli9>0GDZkx0|LdxFrL=>It-9rBM>Vz_M`*(#|oF{cSx4CXqn?I!< z<@oc_z0>O!(Nw)Otqogvt2Sr|sie^p-VwfpTb?%k3RLZd;R9LP>Q^JAif{=CTFmWe z!{LHrkh_}*%609;@3PKlv5WIyqdxd68V+D5X?QiIr3mOpxj)NXXvSspzAIb4wHfRf zzSFx(>6@l2GhxzJ6^&q7mC*w|ui?xb7Y>1{$rV;#^^=@#4Q4Mx8vhM$06w-Z%?(t~ ztr77gCtzE|Cm@)eYrU;r+&Z?l@+XLDQwi1pcjLRs;SYP7QkauGq*M_&)gK4Hp+xS; zsm`!l;o6=hzk>a<7eizlcZC7QueNE_6Ej6H^^}@gLR!)CBUSenvJC6oRychXjb5 zOLYY8c_=o)QM*$xi=0qN5?@z*Kq*qtld*oCUO78{uk*c;&+6!BkTTyq6_lviS|6z< zPaqLGxSdIow;yS^Dd2k4GjKrh4#6Ky?(gI~UAH~lQlg2R9$K};&+>fpfoy*u;R$%B z|FMt0z}qRxu%dsjUhsFkaX}m_k{EZDX^VTx{ppT?`SQeELL;*(a?d+W!Hm~eUVcC) z_{I%Z5!^R-jUJO?E1s7yi(P#I5#O}7&qhlVLX@1`6d1DxYF;+xw(}t+iSLTu@hU@| zYwHw-5@qOyK+xezw3lYOt;?7KU~hS`Gk)_HWGU^ccOsFIyGe-%z5WwSTi%Tit{Ky2 zeY1*SpFBO?EDia$e-+tSwUv9N58b6CaCxBHZo2{xLtOoaMDl5~1TtH3 z@_PYmKF8V359*_bTEklo5wUl14;YPR5~9w$_U`liLrRa0^iGvvnNq-*r4g7RZeUMe z@H-J#F{RJY)M>MU1R!X4&{<|Y*HwhsO8~f8n0FqqkA|3~@?51h@8!aO#KY|$?9p;b z-SUG@{?t$p=!9F=(M9dDtjXn-a!Vp}c2-8>{zlY8yss)HbpxHSsX@+n;91_AITIar zScS{EV}kEP!@AE}ZaG?;2(HYSOVgJKeG<0cl9&Ypq`_CY32Y2eBAG ze55&BWrmcr*`Uy(g=F%q{KK<%^!fT!8W#-H=#lW3*oAo?Rv|5=w)H+yc*VqBCrwFb|R(woAiv&cF&>-znp8q7HH^Y zQIq;~P6kvU!!l50b|0i;as^rGZyH8kn(u~kO?Mu)(~K>$z8d}-WW%y${lY?VQkp!I zYbHpzkg4m2uqkjYyF=%Q{(65_qUICU<}EJoA^c9P1%-UsK)PDZz-3hBYKW*KUMQ#O zMH8JOBwur3$ePrM>lR~@NfBFXNCkqR@si#oFug1nex9P+Tq?wWK<#FW!m`w+;A%@5 zV!T?v4eS4*M1{@1lQ8Ze+Q;tSBuGd9hpkArp{Y7I-XSGO{XF zhId{j0Bc3jtR^6-o~!$=*}<%j^sd;$ zn{hAP%w~(rEJ`t<8T!slJ@1r%a^Ge&0iCfahm~{?r{CxF^5V8Tep)N<8S6P3`CgO3 z_kjcHUI=HRBX|Ll%Du*IN#ib8Lr+|sywi%&huxo;*I8-16GML!T31nHhtjRPA`P$` z2)guG5|?I@{iv$GQGxF_%DA_+$6>w}2@aLA2EVtlKVHy6L#{>~ft2Zc7CJL(h-RQ_vSM#uzM5l*(aRvLP1k4ZTF znI`g>yw0ETM725oQru0YC;Xn92Wx>f>lryh-)45ec2mm^bL+(Xp6S1j#U#z8Q+0Px zi$M7(nl8DeL61EExy4{L`O^luf1!>STAcYbV#u&?EpP}0uzRJ$7~L5f?%H7Yc^t4S z$S>8WT((}e;7Y9@W>(Ga6%y5V|A8YFVaOF%*mN;v=BSs&R9t;{t9bM5GwU2Ouynq1 z`ri8T-^_M4@(SR>5)l-JEpXJnBtN6Z(iO}bnPr}|Oy?_cmLA{iA6w<+7QDsjG769K7((?&b7lPUDL^q9s&)`huA z?J+_lR9b^g!xnyHoBg@pCYoLI{G>W6m(5FZ{bb46cC0!7qVUsER7aLAWk7i+ol!s^DrEtzUkjApW{Laz=vna!{>mxP zvZAX?z2LsOG zLw5-6-4v{P!M1q%CGaX0UWwg5=X8jYrOs>mZfQ^tf@Tm|mP|rH?l+~{Hj;cy22?Bs zK0A_S%MaTs!KXi4BCozX-Q%;JyosNOvaPc2IT8}1oTP3LHcjOm^*f3%G2DI(Lk*Ym zTcE74RKg9pM`V3{5bHIZN0Vd%yb>;&KrWd)I$Ml;x1rA6z6!8Sr2t?S4J zfr_`O#Qrd{rO!mxelQrywqPrdiul0Wa+5(>y#FReF`1j%)7RK)8G<=m4;{EWvus%q z#`D#?Ij>3kYM_6?TqL6SpLsM!eL#x^v*e4F3}YP1H+GJ#q5tGe(pYUN^IuIuN9J%< z!gIVW{&kGJ2WHrP_rM1in+K_Io-c{H0smReV`7cN#7`nMtoMMWZ_wS~oxRU^KJ;UI zm2quWY>9l>gd5&ta_3tdS0TkDyMdK@U9+#Ra|NaT!yC ztSc}sn9RU^m%)CHmTF<)EAkrU!B~pB|8sPPG;R5dCB^U-at-9>AN3;JRdRF}s*>?}MGL~QcT>QspYX!YzWwX{8DRZxiSzmx3kfoABM z9QMZ6dTg^6D{ zzr6vdkB>;B>ELD&C&^L&b0S&}l{mUdSt-d6)61yDIFvPyg5b@+c(Q*1YQ@9=qcQlv z@)LVmxA~dBJs~ZJ_^olk*h1-$@SnTf+Jw6@X0VNA?Al0att{Elk`?P|58iqFVJ^H3 zdIX8}arTL|=C>nXVRho;MTFbk7h;x$Te6hn!*@MRN{T(2y|E4j>p0)3Hx&@WJbJ@; zcx%@+8Ksv!TbAd*L(KrKy~imiENQ+lZB8ULQiWhKSoRZytz@|s_a>oR?)A4t;a!Ov zSsvf+mw-Y~VtLzsRkh!pcOw~!a|^Wo-8Wr$Cd!9vg*ox-Qcj@5-wi&G4Hz!@Xi05`!q3Rr-mm-x< zal4Sd#4+oYk%pifjG=oDuexhBCY!H@xzg$gQX!4j<YbrX=zVu8b1`To?GE;fqsI{sG zLh!{L*jwUBg=^fPKK1)h|BweX}XZknNpmXEmV^y^kL$F1_WVgREl!0 z@RAhTh{5%|yC6hUhzbrBrF2`AX&3LU;grMlz@jS@N$~j^*I1}`HexR@Zhrvl;EVhD zXr&AJ+kK~zE$>TuTKdk~*xJNpUyV0y)%gx}!vy_8rf^JD6&KZtf!=g2sZ#11@fVNeyQ7a1>sYL%-;}B}JAUx{UQW)GtaLn)#xx^qY}Gc$DLT({Q3I z!hU@33{8{rzxt-*?Y%WT+huW_)kZY5&uMVk`{&$8qx8Vofh-_ejz@n|ytGZZWShiI zU$9_p!T>NjYx@r9gdL~~Sk5FIpw{H^ZqkJjk^n+kGZF~KCAN4G;$?h15@X3+mMu59 zQejabDn`SPW@Sz?@$GZ*%PE$W9Ff?Dh)JV|c|2cjTq@YC@s#UM>U18-lvv zsmtflmLtW(%;6Pzw~pC|5rIc>k}pyla$eU}E~>6i@CyfL!fZVc_190Gt#5BbX$&I) z%4Fq4LioK?u5e3GP{(0S*#1i-{=>YB7MK0ig}{!i0#a_jg9;H5Ssw=0>L!zc*An=q z3aft(5D{Wa4}+U4Ci1WZ!dCBM^LtQ%ZKT7;L64mMk+&hJW5io$m`8&ZIlfxSz?!j9 z>C&I2i-*`C@*j?V5-W>ah{(*+1#lb7h3O`y(4tN0!j9n$89MA0! zIe@nG&Vp<2KIZR?ST-w^8qD zPj^d-$u{NSXyhz}+YxwNyJeJr8lD&I=b!Tn!+z?s@$GQ0?Cz0tP2b?@PA0xGB88Ja z{xgXTW`w;B$u#-k=C+>A_6vXl!bt7~1^HdLn2>9AY_W!yDiI{zOU3!mYVfzjicD@FMzlLUQzDAxF*n?H$g%lh`ZPw|8dVE6%?_$EieQ2pX9(WVMtzrqg4)5VvDjg6%79#nZ@?WnaS>rCI(BQmYEPbVouhobOJlmh*Wi)M`tnddx4%x{Au6BWQexOGC@>4E;V z*T!;;Y)R3Fi3R<7oXm~a&JuWDhm!nmPGMwkZG6N`Po~00_7-B1Q~7 z_4-FF_RRrg`5M3o6UMyK@bqH7TTzBsVR^$YtL&YbcOu=sO1$UZHq71p2-Lap!GBOd z=NS8ZD!1|{F7;MiN+ntAmS*lfY)eGs8%q(ZV(b9UeR@kVS$5qO)3W(Rcz9OmFGZVj&<6Sd@ zLpCG+e9(P_N5=;BG!Hwg)xVNiRNQ5k{lZwDQV;AzVvne48jO~6hqJb9iODMhbq}k3 zr`i&I1^=eJ3zAi{Ffz$V$%-O2SLM^a809T|c;fap3VuW>ERal?gJ+zWK8%Hpyn}{J z^!CEYu$Nk_^+nuB-_oQ0E0KSTn$H+%DH7jl# z<+JR>nXOgCo{@!)4|~XBqvW*b3dZM)q_7eTY%OzBkrC`_UOD>hd7g-D_of^SUog^e zXuSeNGJ)fJIoZ*{SKFbGzc^6I)IImOdt-cX_4+}Kutd;m(}QQRq=C3H!seT;udZ=mv0vcJe3~6I z*^tWUJn1iHU=cvdFO-wY*zt;TuUg1h2kD(k3;zau8-m~3>#m9++qAu@9~av`6Kx<^ z;gjg6U{zr&oN^}pz^9SU?OH-=yNhJE|K*N8)X15l6C%w=Lq`#R4H>S;eSI%yJ*YL2 zFcc^G^iO&$X7baF?CjKaBungc2fCsi(=6o>ap#9#!yg>|WF!cEBh@;)kgHX7S~yYW zHO1lE)gJds%IDgSt$S>ZM;AnuhuUHH3VQ`l=cD-*BlQcVc;=$c>9v;yKnsa!*xx!S z?w^TRb9o*9%Jh%xfPQjlvGGL<*Xs-XXKwP6oZV()i8~&c@+-qM1n5X2o}XCog(6nV zvGo97wPmx&-Y8O!m-a?3(Zfns3jvY*{eul@uKyQrwbtFrp9x?|h_d9j&3(*&^8Dv* zUO*7{lX?t^Vh>c_?ZyrDX#|`RJ?Z zXO8QPMed1p6<35!k#vD|yNjpfhzdqmyw=(Ex4dpwT@>6gs%*nPrXq*_Dz=4op<7%% zPJvf|{v&uEgPsC=RI4Ntgj>Pj6Qip=Ec(6{%bp+;}bOF zZG%QN&=?BR@&(3(;m)HfVi&L`)YCih%$^HbGI1!sl8hrKJ;HPc5dF&}G><0`)Q_vA zfs1K8y)v?^N0@nAOh|H(-NIuq5^tcxKGigZCRD?_G)O{Z!`g(g#(^J!LkPu$% zIKR#&&i~;=MS^Sn4lW?W0r~W5WAD#HXG(WxOfv{Z=mNxO61YxF>IgLzH{$#4KNk(k zgUE`BNH#LqgsxC$odK||8|(qT=%p2g1LSE^qlS9XQ1)pZj=Yk3JuRpTS2K zfMPc2`k!>>fG9{Z0ycABQ7Y$Hg|CdEWIafvRQ`#Huoe(u*b27LfB?ntKQGhk3|UaK9c^LI zJtf}YVO@D`RGz|&CIfU{K-jIi4VOe$R)u(#z33bBeh)3Q%>G`kW?_dbi&NteecqJ- zp_f*PE1_hD9m_aV4kM-SaBC4on_^5&OMzn#y-a%6%wO*@OGD}jM$jY7rKy)$dKqG! zsBr_KLUWw$b^}a1JWY0hxdgmF_y}#uNSIjKOlxcb4JAD`(p*Sok_-ygG{yWWJpwXW z-N0Si89^$k=2=1ziAo*CCJ)~0`Ulu>a*3F#%MDXFN5L0-cmOz_;BWE7ij*Zvf7)hf znY+@m4=pDJpZ?s(Wvzb{9KfAeqKqJ|C>*TZIKy;-b;ehW`c)0ly-I|(G$3yB6tA&qi<1T*bP3Ijc@0_0mS+(!7N&zKOL_d z03>oY_$&ObSCpHTqd~Hlj^CD7qrzUQ#fmBgJxWM5TKL9@>#hk0940l-gT2`knIw;lh zwm@QP9&_BW$oGZ<)WVZ9n?$(rha`jY;5c}}k=urTF0I2^#9BuNc2P7wcy`sI zPVPzUHEOEnXzVSYNu5dVv@_S&3j=v(Dek)tI^73fQLl^)bmCpxyj6*pJg)M(va<=e z`C)9qCi7A+Jy3?s49i1AmZcsDh5}9ON`B>q+CcC(Huh@#PjC&|TJ!YmU}n}d>DpRh z&0PxXpevvP1M;Y(8bsOzV8`|+zH)TU&_Z2sv&(8x;vPxPK0;2&&5xJ%2RwJB4#J~g zjq+%K=M7*-z;-zRy4eIe=xFI@@1?pzah?9;IjeeQD$|@D*;wVk%7eQmV8pZ z(J%R+qKDE;j`0=RI%w>_cK%U-h4b&s9{~aC_0Sm?Dy$evLK4z-fXRv~oWZmpu>RzD z9hs*IrKsKdGTrN>2&@q$Danw-XUPP~!Ljl#m<*{e0_`V2pXX~M>*o1NpSJ2r!XzeG zbf9c9R!BX&!rt#U^yW1^WT-kS6A`egz?Vtc03Jjz5akK%q0b$9@FS~6NS};}lrIjg zHFToZv!F+T!TB(NlO_Wt0vRpg7y||jsQ$vsgAxhqNC1djuHby5IcXUWkOCU17Q{A# zu7I>xt26z)whFxSN>(QW`D-r&5EKMGXG7#^N>(_jS?C_0tD3Ret)#E=@6h2#WnlK}7GP0dz$42d-t`AfA@(Pz?`*X<4@-WY zk*|6`*Y(#0p4bRi9_og@d?mXz79fX>LWC*;Vgrba1*YHb{6R;Tu~0t1*H_7OsT#XP z2Ew_5aQ_lu;@K{rPF+!+N_MhfZKJWGy*e`;ge{?h06X#)U4!<|o}}+_x810G9u;Uu zy9aTwfHrs<^hRk({wA}dTmc|2F0$EbLT*h7LG_O+ugJUNC4jfHmkwBQzuiy&*6;cl zIo+CmxVJ|l?VZ(0?ZyeXvdFUMOy5j`XxdN~VH%vc?7o@L=fr;V!Sa|GSl!It%C-Q9 zfx;tqnO)bI;yOG8)QiP7x4shv%m#2oOB4YZmazSW^oq=$Q9lU|5KG@vI0P4x5t2wd z04Uyq=!)kPpiMOU>A0XNVO+#7ut+0uC`F?E8tp?z_+&0`i$VjBqe1qQI>bZb{|}q> l|Mh_fY&^Uo!NI-0YN==Q?-1{00ycIy1sN6Ta!H8a{{lk?#w7p% literal 0 HcmV?d00001 diff --git a/python/destinations/influxdb_1/library.json b/python/destinations/influxdb_1/library.json new file mode 100644 index 00000000..c72f9ea5 --- /dev/null +++ b/python/destinations/influxdb_1/library.json @@ -0,0 +1,122 @@ +{ + "libraryItemId": "influxdb-1-destination", + "name": "InfluxDB 1.0 Sink", + "language": "Python", + "tags": { + "Pipeline Stage": ["Destination"], + "Type": ["Connectors"], + "Category": ["Time series DB"] + }, + "shortDescription": "Consume data from a Kafka topic in Quix and persist the data to an InfluxDB 3.0 database.", + "DefaultFile": "main.py", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "IconFile": "icon.png", + "Variables": [ + { + "Name": "input", + "Type": "EnvironmentVariable", + "InputType": "InputTopic", + "Description": "This is the input topic", + "DefaultValue": "input-data", + "Required": true + }, + { + "Name": "INFLUXDB_HOST", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Host address for the InfluxDB instance (formatted as https://).", + "Required": true + }, + { + "Name": "INFLUXDB_PORT", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Port for the InfluxDB instance.", + "Required": true + }, + { + "Name": "INFLUXDB_USERNAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Username for the InfluxDB instance.", + "Required": true + }, + { + "Name": "INFLUXDB_PASSWORD", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "Password for the InfluxDB instance.", + "Required": true + }, + { + "Name": "INFLUXDB_MEASUREMENT_NAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The InfluxDB measurement to write data to. Can optionally replace with a callable in the template directly.", + "DefaultValue": "default", + "Required": false + }, + { + "Name": "INFLUXDB_DATABASE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Database name in InfluxDB where data should be stored.", + "DefaultValue": "quix", + "Required": true + }, + { + "Name": "INFLUXDB_TAG_KEYS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The tags to include when writing the measurement data. Example: Tag1,Tag2. Can optionally replace with a callable in the template directly.", + "Required": false + }, + { + "Name": "INFLUXDB_FIELD_KEYS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The fields to include when writing the measurement data. Example: `Field1,Field2`. Can optionally replace with a callable in the template directly. ", + "Required": false + }, + { + "Name": "CONSUMER_GROUP_NAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The name of the consumer group to use when consuming from Kafka", + "DefaultValue": "influxdb1-sink", + "Required": true + }, + { + "Name": "TIMESTAMP_COLUMN", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The column containing the timestamp column. NOTE: Must be nanoseconds. Can optionally replace with a callable in the template directly.", + "Required": false + }, + { + "Name": "BUFFER_SIZE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The number of records that sink holds before flush data to the InfluxDb", + "DefaultValue": "1000", + "Required": false + }, + { + "Name": "BUFFER_TIMEOUT", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The number of seconds that sink holds before flush data to the InfluxDb", + "DefaultValue": "1", + "Required": false + } + ], + "DeploySettings": { + "DeploymentType": "Service", + "CpuMillicores": 200, + "MemoryInMb": 500, + "Replicas": 1, + "PublicAccess": false, + "ValidateConnection": true + } +} diff --git a/python/destinations/influxdb_1/main.py b/python/destinations/influxdb_1/main.py new file mode 100644 index 00000000..ab418c6d --- /dev/null +++ b/python/destinations/influxdb_1/main.py @@ -0,0 +1,60 @@ +# import Utility modules +import os + +from typing import Optional + +# import vendor-specific modules +from quixstreams import Application +from quixstreams.sinks.core.influxdb1 import ( + InfluxDB1Sink, + FieldsSetter, + MeasurementSetter, + TagsSetter, + TimeSetter, +) + +# for local dev, load env vars from a .env file +from dotenv import load_dotenv +load_dotenv() + + +def _as_iterable(env_var) -> list[str]: + return keys.split(",") if (keys := os.environ.get(env_var)) else [] + + +# Potential Callables - can manually edit these to instead use your own callables. +# --Required-- +measurement_name: MeasurementSetter = os.getenv("INFLUXDB_MEASUREMENT_NAME", "default") +# --Optional-- +tag_keys: TagsSetter = _as_iterable("INFLUXDB_TAG_KEYS") +field_keys: FieldsSetter = _as_iterable("INFLUXDB_FIELD_KEYS") +time_setter: Optional[TimeSetter] = col if (col := os.environ.get("TIMESTAMP_COLUMN")) else None + + +influxdb_v1_sink = InfluxDB1Sink( + host=os.environ["INFLUXDB_HOST"], + port=int(os.environ["INFLUXDB_PORT"]), + username=os.environ["INFLUXDB_USERNAME"], + password=os.environ["INFLUXDB_PASSWORD"], + tags_keys=tag_keys, + fields_keys=field_keys, + time_setter=time_setter, + database=os.getenv("INFLUXDB_DATABASE", "quix"), + measurement=measurement_name, +) + + +app = Application( + consumer_group=os.environ.get("CONSUMER_GROUP_NAME", "influxdb-data-writer"), + auto_offset_reset="earliest", + commit_every=int(os.environ.get("BUFFER_SIZE", "1000")), + commit_interval=float(os.environ.get("BUFFER_DELAY", "1")), +) +input_topic = app.topic(os.environ["input"]) + +sdf = app.dataframe(input_topic) +sdf.sink(influxdb_v1_sink) + + +if __name__ == "__main__": + app.run() diff --git a/python/destinations/influxdb_1/requirements.txt b/python/destinations/influxdb_1/requirements.txt new file mode 100644 index 00000000..c8f56dd7 --- /dev/null +++ b/python/destinations/influxdb_1/requirements.txt @@ -0,0 +1,2 @@ +quixstreams[influxdb3]==3.19.0 +python-dotenv \ No newline at end of file