diff --git a/go.mod b/go.mod index d1ff9f5..00459c0 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,36 @@ module github.com/blueimp/aws-smtp-relay -go 1.14 +go 1.19 require ( - github.com/aws/aws-sdk-go v1.38.61 - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-sql-driver/mysql v1.5.0 // indirect - github.com/kr/pretty v0.2.0 // indirect - github.com/mhale/smtpd v0.0.0-20210322105601-438c8edb069c - github.com/stretchr/testify v1.5.1 // indirect - golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a - gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect + github.com/aws/aws-sdk-go-v2/service/pinpointemail v1.12.4 + github.com/aws/aws-sdk-go-v2/service/ses v1.15.3 + github.com/mhale/smtpd v0.8.0 + golang.org/x/crypto v0.6.0 +) + +require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.13.15 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.30 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.21 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.24 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.23 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.23 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.12.4 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.18.5 // indirect +) + +require ( + github.com/aws/aws-sdk-go-v2 v1.17.5 // indirect + github.com/aws/aws-sdk-go-v2/config v1.18.15 + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.23 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.30.5 + github.com/aws/aws-sdk-go-v2/service/sqs v1.20.4 + github.com/aws/smithy-go v1.13.5 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect ) diff --git a/go.sum b/go.sum index 29e0690..e3af574 100644 --- a/go.sum +++ b/go.sum @@ -1,56 +1,60 @@ -github.com/aws/aws-sdk-go v1.30.3 h1:tmaR+qpBSig6RfhP9IoxALJEE1m0vfLy5tlnEIXu6WI= -github.com/aws/aws-sdk-go v1.30.3/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= -github.com/aws/aws-sdk-go v1.38.61 h1:wizuqQZe0K4iYJ+Slrs0aSQ4P94FAwqBUHwk46Iz5UA= -github.com/aws/aws-sdk-go v1.38.61/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= +github.com/aws/aws-sdk-go-v2 v1.17.5 h1:TzCUW1Nq4H8Xscph5M/skINUitxM5UBAyvm2s7XBzL4= +github.com/aws/aws-sdk-go-v2 v1.17.5/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 h1:dK82zF6kkPeCo8J1e+tGx4JdvDIQzj7ygIoLg8WMuGs= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10/go.mod h1:VeTZetY5KRJLuD/7fkQXMU6Mw7H5m/KP2J5Iy9osMno= +github.com/aws/aws-sdk-go-v2/config v1.18.15 h1:509yMO0pJUGUugBP2H9FOFyV+7Mz7sRR+snfDN5W4NY= +github.com/aws/aws-sdk-go-v2/config v1.18.15/go.mod h1:vS0tddZqpE8cD9CyW0/kITHF5Bq2QasW9Y1DFHD//O0= +github.com/aws/aws-sdk-go-v2/credentials v1.13.15 h1:0rZQIi6deJFjOEgHI9HI2eZcLPPEGQPictX66oRFLL8= +github.com/aws/aws-sdk-go-v2/credentials v1.13.15/go.mod h1:vRMLMD3/rXU+o6j2MW5YefrGMBmdTvkLLGqFwMLBHQc= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.23 h1:Kbiv9PGnQfG/imNI4L/heyUXvzKmcWSBeDvkrQz5pFc= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.23/go.mod h1:mOtmAg65GT1HIL/HT/PynwPbS+UG0BgCZ6vhkPqnxWo= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.29 h1:9/aKwwus0TQxppPXFmf010DFrE+ssSbzroLVYINA+xE= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.29/go.mod h1:Dip3sIGv485+xerzVv24emnjX5Sg88utCL8fwGmCeWg= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.23 h1:b/Vn141DBuLVgXbhRWIrl9g+ww7G+ScV5SzniWR13jQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.23/go.mod h1:mr6c4cHC+S/MMkrjtSlG4QA36kOznDep+0fga5L/fGQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.30 h1:IVx9L7YFhpPq0tTnGo8u8TpluFu7nAn9X3sUDMb11c0= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.30/go.mod h1:vsbq62AOBwQ1LJ/GWKFxX8beUEYeRp/Agitrxee2/qM= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.21 h1:QdxdY43AiwsqG/VAqHA7bIVSm3rKr8/p9i05ydA0/RM= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.21/go.mod h1:QtIEat7ksHH8nFItljyvMI0dGj8lipK2XZ4PhNihTEU= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 h1:y2+VQzC6Zh2ojtV2LoC0MNwHWc6qXv/j2vrQtlftkdA= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11/go.mod h1:iV4q2hsqtNECrfmlXyord9u4zyuFEJX9eLgLpSPzWA8= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.24 h1:Qmm8klpAdkuN3/rPrIMa/hZQ1z93WMBPjOzdAsbSnlo= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.24/go.mod h1:QelGeWBVRh9PbbXsfXKTFlU9FjT6W2yP+dW5jMQzOkg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.23 h1:QoOybhwRfciWUBbZ0gp9S7XaDnCuSTeK/fySB99V1ls= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.23/go.mod h1:9uPh+Hrz2Vn6oMnQYiUi/zbh3ovbnQk19YKINkQny44= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.23 h1:qc+RW0WWZ2KApMnsu/EVCPqLTyIH55uc7YQq7mq4XqE= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.23/go.mod h1:FJhZWVWBCcgAF8jbep7pxQ1QUsjzTwa9tvEXGw2TDRo= +github.com/aws/aws-sdk-go-v2/service/pinpointemail v1.12.4 h1:NcxMBuHmNENvStkgvIJgAqvTmjBGv/drQSmxGuLRjVg= +github.com/aws/aws-sdk-go-v2/service/pinpointemail v1.12.4/go.mod h1:slY3ISjHIVPkU8yFb9qDa12tBDaITlqdHLSbF+wIy7c= +github.com/aws/aws-sdk-go-v2/service/s3 v1.30.5 h1:kFfb+NMap4R7nDvBYyABa/nw7KFMtAfygD1Hyoxh4uE= +github.com/aws/aws-sdk-go-v2/service/s3 v1.30.5/go.mod h1:Dze3kNt4T+Dgb8YCfuIFSBLmE6hadKNxqfdF0Xmqz1I= +github.com/aws/aws-sdk-go-v2/service/ses v1.15.3 h1:O7gl6f1Zi80GLQkyWFJi2PYzF8c7qd/iUBmLIVVp8og= +github.com/aws/aws-sdk-go-v2/service/ses v1.15.3/go.mod h1:xIWGz5r8k9h6T0f0jhkFdLMZJ10abjOfxmUWoWevw14= +github.com/aws/aws-sdk-go-v2/service/sqs v1.20.4 h1:8bI15PB1gxNcW53Dx3DVPZltmRiNQbtEsCtHXabNYMc= +github.com/aws/aws-sdk-go-v2/service/sqs v1.20.4/go.mod h1:DVY5QFIndM5hWkrO+9lK4EN7mJUrttvpbbYcu2/id1Y= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.4 h1:qJdM48OOLl1FBSzI7ZrA1ZfLwOyCYqkXV5lko1hYDBw= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.4/go.mod h1:jtLIhd+V+lft6ktxpItycqHqiVXrPIRjWIsFIlzMriw= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.4 h1:YRkWXQveFb0tFC0TLktmmhGsOcCgLwvq88MC2al47AA= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.4/go.mod h1:zVwRrfdSmbRZWkUkWjOItY7SOalnFnq/Yg2LVPqDjwc= +github.com/aws/aws-sdk-go-v2/service/sts v1.18.5 h1:L1600eLr0YvTT7gNh3Ni24yGI7NSHkq9Gp62vijPRCs= +github.com/aws/aws-sdk-go-v2/service/sts v1.18.5/go.mod h1:1mKZHLLpDMHTNSYPJ7qrcnCQdHCWsNQaT0xRvq2u80s= +github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= -github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= -github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= -github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/mhale/smtpd v0.0.0-20181125220505-3c4c908952b8 h1:DuLRJOD3tr0rbrwDXXw5mw8YRPl70y8RbFpUtCjzOkU= -github.com/mhale/smtpd v0.0.0-20181125220505-3c4c908952b8/go.mod h1:qqKwvL5sfYgFxcMy96Kjx3TCorMfDaQBvmEL2nvdidc= -github.com/mhale/smtpd v0.0.0-20210322105601-438c8edb069c h1:AjouS6Z9yUjUzX7Y2iC9z9XSOIZNx1pJuhpHoX526wg= -github.com/mhale/smtpd v0.0.0-20210322105601-438c8edb069c/go.mod h1:MQl+y2hwIEQCXtNhe5+55n0GZOjSmeqORDIXbqUL3x4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/mhale/smtpd v0.8.0 h1:5JvdsehCg33PQrZBvFyDMMUDQmvbzVpZgKob7eYBJc0= +github.com/mhale/smtpd v0.8.0/go.mod h1:MQl+y2hwIEQCXtNhe5+55n0GZOjSmeqORDIXbqUL3x4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM= -golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a h1:kr2P4QFmQr29mSLA43kwrOcgcReGTfbE9N577tCTuBc= -golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/receiver/aws_ses/cli.go b/internal/receiver/aws_ses/cli.go new file mode 100644 index 0000000..86518f4 --- /dev/null +++ b/internal/receiver/aws_ses/cli.go @@ -0,0 +1,39 @@ +package receiver + +import "flag" + +type CliArgs struct { + enableObserver *bool + queueName *string + queueTimeout *int + queueMaxMessage *int + queueS3Bucket *string + queueSmtpHost *string + queueSmtpPort *int + queueSmtpConnectionTLS *bool + queueSmtpForceSTARTTLS *bool + queueSmtpInsecureTLS *bool + queueSmtpIdentity *string + queueSmtpUser *string + queueSmtpPass *string + queueSmtpMyName *string + queueS3Prefix *string +} + +var FlagCliArgs = CliArgs{ + enableObserver: flag.Bool("o", false, "Enable AWS SES Observer"), + queueName: flag.String("q", "", "Observer AWS SQS Queue Name"), + queueTimeout: flag.Int("T", 10, "Observer AWS SQS Queue Timeout"), + queueMaxMessage: flag.Int("m", 10, "Observer AWS SQS Queue MaxMessages"), + queueS3Bucket: flag.String("b", "", "Observer AWS SQS Queue S3 Bucket"), + queueSmtpHost: flag.String("H", "", "Observer Relay SMTP Host"), + queueSmtpPort: flag.Int("P", 25, "Observer Relay SMTP Port"), + queueSmtpConnectionTLS: flag.Bool("S", false, "Observer Relay SMTP Connection TLS"), + queueSmtpForceSTARTTLS: flag.Bool("F", true, "Observer Relay SMTP Force STARTTLS"), + queueSmtpInsecureTLS: flag.Bool("I", true, "Observer Relay SMTP Insecure TLS(Certs)"), + queueSmtpIdentity: flag.String("Y", "", "Observer Relay SMTP Identity"), + queueSmtpUser: flag.String("U", "", "Observer Relay SMTP User"), + queueSmtpPass: flag.String("A", "", "Observer Relay SMTP User[QUEUE_SMTP_PASS]"), + queueSmtpMyName: flag.String("E", "", "Observer Relay SMTP Hello Name"), + queueS3Prefix: flag.String("X", "", "Observer AWS S3 Key Prefix"), +} diff --git a/internal/receiver/aws_ses/config.go b/internal/receiver/aws_ses/config.go new file mode 100644 index 0000000..c2649c8 --- /dev/null +++ b/internal/receiver/aws_ses/config.go @@ -0,0 +1,87 @@ +package receiver + +import ( + "context" + "errors" + "os" +) + +type AwsSesConfig struct { + QueueName string + Context context.Context + Timeout int32 // seconds + MaxMessages int32 + Bucket string + KeyPrefix string + SMTP struct { + Host string + Port int + ConnectionTLS bool // 465 + ForceSTARTTLS bool // force STARTTLS + InsecureTLS bool // skip TLS verification + User *string + Pass *string + Identity *string // AUTH IDENTITY + MyName *string // EHLO name + } +} + +func ConfigureObserver(clis ...CliArgs) (*AwsSesConfig, error) { + cli := FlagCliArgs + if len(clis) != 0 { + cli = clis[0] + } + if !*cli.enableObserver { + return nil, nil + } + if *cli.queueName == "" { + return nil, errors.New("QueueName is required") + } + timeout := *cli.queueTimeout + maxMessage := *cli.queueMaxMessage + if *cli.queueS3Bucket == "" { + return nil, errors.New("QueueS3Bucket is required") + } + if *cli.queueSmtpHost == "" { + return nil, errors.New("SMTP Host is required") + } + pass, is := os.LookupEnv("QUEUE_SMTP_PASS") + if is { + cli.queueSmtpPass = &pass + } + if *cli.queueSmtpMyName == "" { + defMyName := "AWS-SMTP-Relay-Observer" + cli.queueSmtpMyName = &defMyName + } + + observeCfg := &AwsSesConfig{ + QueueName: *cli.queueName, + Context: context.Background(), + Timeout: int32(timeout), + MaxMessages: int32(maxMessage), + Bucket: *cli.queueS3Bucket, + KeyPrefix: *cli.queueS3Prefix, + SMTP: struct { + Host string + Port int + ConnectionTLS bool + ForceSTARTTLS bool + InsecureTLS bool + User *string + Pass *string + Identity *string + MyName *string + }{ + Host: *cli.queueSmtpHost, + Port: *cli.queueSmtpPort, + ConnectionTLS: *cli.queueSmtpConnectionTLS, + ForceSTARTTLS: *cli.queueSmtpForceSTARTTLS, + InsecureTLS: *cli.queueSmtpInsecureTLS, + User: cli.queueSmtpUser, + Pass: cli.queueSmtpPass, + Identity: cli.queueSmtpIdentity, + MyName: cli.queueSmtpMyName, + }, + } + return observeCfg, nil +} diff --git a/internal/receiver/aws_ses/infra.go b/internal/receiver/aws_ses/infra.go new file mode 100644 index 0000000..d3fd5d7 --- /dev/null +++ b/internal/receiver/aws_ses/infra.go @@ -0,0 +1,59 @@ +package receiver + +import ( + "context" + "crypto/tls" + "io" + "net/smtp" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sqs" +) + +type SQSClient interface { + GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) + DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) + ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) +} + +type S3Client interface { + GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) +} + +type SMTP interface { + Dial(addr string) (SMTPClient, error) + DialTLS(addr string, tls *tls.Config) (SMTPClient, error) +} + +type awsSesSmtp struct{} + +func (s *awsSesSmtp) Dial(addr string) (SMTPClient, error) { + clt, err := smtp.Dial(addr) + // clt.DebugWriter = os.Stdout + return clt, err +} + +func (s *awsSesSmtp) DialTLS(addr string, ctls *tls.Config) (SMTPClient, error) { + conn, err := tls.Dial("tcp", addr, ctls) + if err != nil { + return nil, err + } + clt, err := smtp.NewClient(conn, addr) + if err != nil { + return nil, err + } + // clt.DebugWriter = os.Stdout + return clt, err +} + +type SMTPClient interface { + Close() error + Hello(host string) error + StartTLS(config *tls.Config) error + Auth(a smtp.Auth) error + Mail(from string) error + Rcpt(to string) error + Data() (io.WriteCloser, error) + Quit() error +} diff --git a/internal/receiver/aws_ses/log.go b/internal/receiver/aws_ses/log.go new file mode 100644 index 0000000..5e2dfce --- /dev/null +++ b/internal/receiver/aws_ses/log.go @@ -0,0 +1,38 @@ +package receiver + +import ( + "encoding/json" + "fmt" + "time" +) + +type AwsLog struct { + Time time.Time + Component string + Msg *string `json:"Msg,omitempty"` + Error *string `json:"Error,omitempty"` +} + +func LogError(component string, format string, args ...any) error { + err := fmt.Errorf(format, args...) + errStr := err.Error() + entry := AwsLog{ + Time: time.Now().UTC(), + Component: component, + Error: &errStr, + } + b, _ := json.Marshal(entry) + fmt.Println(string(b)) + return err +} + +func Log(component string, format string, args ...any) { + msg := fmt.Sprintf(format, args...) + entry := AwsLog{ + Time: time.Now().UTC(), + Component: component, + Msg: &msg, + } + b, _ := json.Marshal(entry) + fmt.Println(string(b)) +} diff --git a/internal/receiver/aws_ses/observer.go b/internal/receiver/aws_ses/observer.go new file mode 100644 index 0000000..a3ea5c8 --- /dev/null +++ b/internal/receiver/aws_ses/observer.go @@ -0,0 +1,281 @@ +package receiver + +import ( + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "io" + "net/smtp" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sqs" + sqsTypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" +) + +type AwsSesObserver struct { + SQS struct { + Client SQSClient + MsgInputParams sqs.ReceiveMessageInput + SQSQueueURL *string + } + S3Client S3Client + Smtp SMTP + Config AwsSesConfig +} + +func (aso *AwsSesObserver) getSqsClient(reset ...bool) (SQSClient, error) { + if (len(reset) == 0 || !reset[0]) && aso.SQS.Client != nil { + return aso.SQS.Client, nil + } + cfg, err := config.LoadDefaultConfig(context.TODO()) + if err != nil { + return nil, LogError("AwsSesObserver.getSqsClient", "error loading aws config, %s", err.Error()) + } + aso.SQS.Client = sqs.NewFromConfig(cfg) + return aso.SQS.Client, nil +} + +func (aso *AwsSesObserver) getS3Client(reset ...bool) (S3Client, error) { + if (len(reset) == 0 || !reset[0]) && aso.S3Client != nil { + return aso.S3Client, nil + } + cfg, err := config.LoadDefaultConfig(context.TODO()) + if err != nil { + return nil, LogError("AwsSesObserver.getSqsClient", "error loading aws config, %s", err.Error()) + } + aso.S3Client = s3.NewFromConfig(cfg) + return aso.S3Client, nil +} + +func (aso *AwsSesObserver) InitSQS() error { + // Get URL of queue + sqsClient, err := aso.getSqsClient() + if err != nil { + return err + } + + urlResult, err := sqsClient.GetQueueUrl(aso.Config.Context, &sqs.GetQueueUrlInput{ + QueueName: &aso.Config.QueueName, + }) + if err != nil { + return fmt.Errorf("error getting queue url, " + err.Error()) + } + aso.SQS.SQSQueueURL = urlResult.QueueUrl + + aso.SQS.MsgInputParams = sqs.ReceiveMessageInput{ + MessageAttributeNames: []string{ + string(sqsTypes.QueueAttributeNameAll), + }, + QueueUrl: aso.SQS.SQSQueueURL, + MaxNumberOfMessages: aso.Config.MaxMessages, + VisibilityTimeout: aso.Config.Timeout, + } + return nil +} + +func NewAWSSESObserver(cfg *AwsSesConfig) (*AwsSesObserver, error) { + if cfg.Context == nil { + cfg.Context = context.TODO() + } + return &AwsSesObserver{ + Config: *cfg, + Smtp: &awsSesSmtp{}, + }, nil +} + +func (aso *AwsSesObserver) getS3Key(asn *AwsSesNotification) *string { + my := strings.Join([]string{aso.Config.KeyPrefix, asn.Mail.MessageId}, "") + return &my +} + +func (aso *AwsSesObserver) fetchMessage(asn *AwsSesNotification) (*s3.GetObjectOutput, error) { + var err error + var out *s3.GetObjectOutput + for i := 0; i < 2; i++ { + var s3Client S3Client + s3Client, err = aso.getS3Client(i > 0) + if err != nil { + return nil, err + } + out, err = s3Client.GetObject(aso.Config.Context, &s3.GetObjectInput{ + Bucket: &aso.Config.Bucket, + Key: aso.getS3Key(asn), + }) + if err != nil { + // retry in case of error + continue + } + break + } + return out, err +} + +func (aso *AwsSesObserver) sendMail(asn *AwsSesNotification, out *s3.GetObjectOutput) (error, error) { + var err error + var c SMTPClient + if !aso.Config.SMTP.ConnectionTLS { + c, err = aso.Smtp.Dial(fmt.Sprintf("%s:%d", aso.Config.SMTP.Host, aso.Config.SMTP.Port)) + } else { + c, err = aso.Smtp.DialTLS(fmt.Sprintf("%s:%d", aso.Config.SMTP.Host, aso.Config.SMTP.Port), &tls.Config{InsecureSkipVerify: aso.Config.SMTP.InsecureTLS}) + } + if err != nil { + return nil, err + } + defer c.Close() + myName := aso.Config.SMTP.MyName + err = c.Hello(*myName) + if err != nil { + return nil, err + } + if aso.Config.SMTP.ForceSTARTTLS { + err = c.StartTLS(&tls.Config{InsecureSkipVerify: aso.Config.SMTP.InsecureTLS}) + if err != nil { + return nil, err + } + } + if *aso.Config.SMTP.User != "" && *aso.Config.SMTP.Pass != "" { + // auth := sasl.NewLoginClient(*aso.Config.SMTP.User, *aso.Config.SMTP.Pass) + // auth := smtp.CRAMMD5Auth(*aso.Config.SMTP.User, *aso.Config.SMTP.Pass) + auth := smtp.CRAMMD5Auth(*aso.Config.SMTP.User, *aso.Config.SMTP.Pass) + err = c.Auth(auth) + if err != nil { + return nil, err + } + } + + if err = c.Mail(asn.Mail.CommonHeaders.From[0]); err != nil { + return err, nil + } + rcptCnt := 0 + for _, addr := range asn.Receipt.Recipients { + if err = c.Rcpt(addr); err == nil { + rcptCnt++ + } + } + if rcptCnt == 0 { + return errors.New("no valid recipients"), nil + } + w, err := c.Data() + if err != nil { + return nil, err + } + + _, err = io.Copy(w, out.Body) + if err != nil { + return nil, err + } + + err = w.Close() + if err != nil { + return nil, err + } + return c.Quit(), nil +} + +func (aso *AwsSesObserver) deleteMessage(asn *AwsSesNotification, msg *sqsTypes.Message) error { + var err error + for i := 0; i < 2; i++ { + client, err := aso.getSqsClient(i > 0) + if err != nil { + return err + } + _, err = client.DeleteMessage(aso.Config.Context, &sqs.DeleteMessageInput{ + QueueUrl: aso.SQS.SQSQueueURL, + ReceiptHandle: msg.ReceiptHandle, + }) + if err != nil { + // retry in case of error + continue + } + break + } + + for i := 0; i < 2; i++ { + client, err := aso.getS3Client(i > 0) + if err != nil { + return err + } + _, err = client.DeleteObject(aso.Config.Context, &s3.DeleteObjectInput{ + Bucket: &aso.Config.Bucket, + Key: aso.getS3Key(asn), + }) + if err != nil { + // retry in case of error + continue + } + break + } + return err +} + +func (aso *AwsSesObserver) Observe(cnts ...int) error { + cnt := -1 + if len(cnts) > 0 { + cnt = cnts[0] + } + var err error + Log("sqs/observe", "start observing %d messages", cnt) + for i := 0; cnt < 0 || i < cnt; i++ { + var sqs SQSClient + sqs, err = aso.getSqsClient(false) + if err != nil { + err = LogError("sqs/getSqsClient", err.Error()) + time.Sleep(1000 * time.Millisecond) + aso.getSqsClient(true) + continue + } + msgResult, err := sqs.ReceiveMessage(aso.Config.Context, &aso.SQS.MsgInputParams) + if err != nil { + err = LogError("sqs/receive", "error receiving messages, %v", err.Error()) + time.Sleep(1000 * time.Millisecond) + aso.getSqsClient(true) + continue + } + + if msgResult.Messages != nil { + for _, msg := range msgResult.Messages { + asm := AwsSesMessage{} + err = json.Unmarshal([]byte(*msg.Body), &asm) + if err != nil { + err = LogError("json/AwsSesMessage", err.Error()) + continue + } + if asm.Type == "Notification" { + asn := AwsSesNotification{} + err = json.Unmarshal([]byte(asm.Message), &asn) + if err != nil { + err = LogError("json/AwsSesNotification", err.Error()) + continue + } + out, err := aso.fetchMessage(&asn) + if err != nil { + err = LogError("aso/fetchMessage", err.Error()) + continue + } + warn, err := aso.sendMail(&asn, out) + if warn != nil { + LogError("aso/sendMail", "warn=%v msg=%s to=%v", warn.Error(), asn.Mail.MessageId, asn.Mail.CommonHeaders.To) + } + if err != nil { + err = LogError("aso/sendMail", "err=%v msg=%s to=%v", err.Error(), asn.Mail.MessageId, asn.Mail.CommonHeaders.To) + } else { + Log("smtp/sendMail", "sent msg=%s to=%v", asn.Mail.MessageId, asn.Mail.CommonHeaders.To) + } + err = aso.deleteMessage(&asn, &msg) + if err != nil { + err = LogError("sqs/deleteMessage", err.Error()) + continue + } + } else { + err = LogError("AwsSesMessage", "unknown message type, %s", asm.Type) + } + } + } + } + return err +} diff --git a/internal/receiver/aws_ses/observer_test.go b/internal/receiver/aws_ses/observer_test.go new file mode 100644 index 0000000..98a046d --- /dev/null +++ b/internal/receiver/aws_ses/observer_test.go @@ -0,0 +1,364 @@ +package receiver + +import ( + "bytes" + "context" + "crypto/tls" + "fmt" + "io" + "net/smtp" + "os" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sqs" + sqsTypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" +) + +func boolPtr(v bool) *bool { + return &v +} + +func strPtr(v string) *string { + return &v +} + +func intPtr(v int) *int { + return &v +} + +type mockSQSClient struct { +} + +func (m *mockSQSClient) GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) { + if *params.QueueName == "testQ" { + return &sqs.GetQueueUrlOutput{ + QueueUrl: strPtr("q://testQ"), + }, nil + } + return nil, fmt.Errorf("queue not found") +} +func (m *mockSQSClient) DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) { + if *params.QueueUrl == "q://testQ" && *params.ReceiptHandle == "SomeHandle" { + return nil, nil + } + return nil, fmt.Errorf("queue not found") +} + +type mockS3Client struct { +} + +func (m *mockS3Client) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + if *params.Bucket == "bucket" && *params.Key == "prefix/nrk5vlqu9usuh476ffj0j3is23okmot9h029da01" { + return &s3.GetObjectOutput{ + Body: io.NopCloser(strings.NewReader("testBody")), + }, nil + } + return nil, fmt.Errorf("object not found") +} + +func (m *mockS3Client) DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + if *params.Bucket == "bucket" && *params.Key == "prefix/nrk5vlqu9usuh476ffj0j3is23okmot9h029da01" { + return nil, nil + } + return nil, fmt.Errorf("object not found") +} + +type mockSMTPClient struct { + buf bytes.Buffer +} + +func (sc *mockSMTPClient) Close() error { + return nil +} +func (sc *mockSMTPClient) Hello(host string) error { + if host == "AWS-SMTP-Relay-Observer" { + return nil + } + return fmt.Errorf("hello error") +} + +func (sc *mockSMTPClient) StartTLS(config *tls.Config) error { + if config.InsecureSkipVerify == true { + return nil + } + return fmt.Errorf("starttls error") +} + +func (sc *mockSMTPClient) Auth(a smtp.Auth) error { + mech, _, err := a.Start(&smtp.ServerInfo{}) + if err != nil { + return err + } + user, err := a.Next([]byte{}, true) + if err != nil { + return err + } + if err == nil && mech == "CRAM-MD5" && string(user) == "user 6f6f78664432e7632bb899845c4782ba" { + return nil + } + return fmt.Errorf("auth error") +} + +func (sc *mockSMTPClient) Mail(from string) error { + if from == "Meno Abels " { + return nil + } + return fmt.Errorf("mail error") +} + +func (sc *mockSMTPClient) Rcpt(to string) error { + if to == "to@smtp.world" { + return nil + } + return fmt.Errorf("mail error") +} + +type bufWriteCloser struct { + buf *bytes.Buffer +} + +func (b *bufWriteCloser) Write(p []byte) (n int, err error) { + return b.buf.Write(p) +} + +func (b *bufWriteCloser) Close() error { + return nil +} + +func (sc *mockSMTPClient) Data() (io.WriteCloser, error) { + return &bufWriteCloser{buf: &sc.buf}, nil +} + +func (sc *mockSMTPClient) Quit() error { + if sc.buf.String() == "testBody" { + return nil + } + return fmt.Errorf("quit error") +} + +func (sc *mockSMTPClient) SendMail(from string, to []string, msg io.Reader) error { + var err error + if from == "Meno Abels " && to[0] == "to@smtp.world" { + buf := make([]byte, 1024) + len, err := msg.Read(buf) + if string(buf[:len]) == "testBody" && err == nil { + return nil + } + } + return fmt.Errorf("sendmail error: %v", err) +} + +type mockSMTP struct { +} + +func (s *mockSMTP) Dial(addr string) (SMTPClient, error) { + if addr == "host:25" { + return &mockSMTPClient{}, nil + // return &mockSMPClient + } + return nil, fmt.Errorf("dial error") +} + +func (s *mockSMTP) DialTLS(addr string, tls *tls.Config) (SMTPClient, error) { + if addr == "host:25" { + return &mockSMTPClient{}, nil + // return &mockSMPClient + } + return nil, fmt.Errorf("dial error") +} + +func (m *mockSQSClient) ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) { + if *params.QueueUrl == "q://testQ" && + params.MaxNumberOfMessages == 10 && + params.VisibilityTimeout == 10 { + return &sqs.ReceiveMessageOutput{ + Messages: []sqsTypes.Message{ + { + Body: strPtr(` + { + "Type" : "Notification", + "MessageId" : "302f5414-c911-5774-acb0-086f045e295c", + "TopicArn" : "arn:aws:sns:us-east-1:973800055156:smtp-q", + "Subject" : "Amazon SES Email Receipt Notification", + "Message" : "{\"notificationType\":\"Received\",\"mail\":{\"timestamp\":\"2023-03-03T10:25:18.793Z\",\"source\":\"test@test.ipv6\",\"messageId\":\"nrk5vlqu9usuh476ffj0j3is23okmot9h029da01\",\"destination\":[\"dest@lurks.com\"],\"headersTruncated\":false,\"headers\":[{\"name\":\"Return-Path\",\"value\":\"\"},{\"name\":\"Received\",\"value\":\"from mail-ua1-f52.google.com (mail-ua1-f52.google.com [209.85.222.52]) by inbound-smtp.us-east-1.amazonaws.com with SMTP id nrk5vlqu9usuh476ffj0j3is23okmot9h029da01 for jhuhdvh@sdkfkjfkdd.dodo; Fri, 03 Mar 2023 10:25:18 +0000 (UTC)\"},{\"name\":\"X-SES-Spam-Verdict\",\"value\":\"PASS\"},{\"name\":\"X-SES-Virus-Verdict\",\"value\":\"PASS\"},{\"name\":\"Received-SPF\",\"value\":\"pass (spfCheck: domain of _spf.google.com designates 209.85.222.52 as permitted sender) client-ip=209.85.222.52; envelope-from=from@smtp.world; helo=mail-ua1-f52.google.com;\"},{\"name\":\"Authentication-Results\",\"value\":\"amazonses.com; spf=pass (spfCheck: domain of _spf.google.com designates 209.85.222.52 as permitted sender) client-ip=209.85.222.52; envelope-from=from@smtp.world; helo=mail-ua1-f52.google.com; dkim=pass header.i=@gmail.com; dmarc=pass header.from=gmail.com;\"},{\"name\":\"X-SES-RECEIPT\",\"value\":\"AEFBQUFBQUFBQUFGcnE2YU0rN2FyVGhUUHh4Q2pyS3pNRmtXV1hVN1RvSFltcE9ZakdoM3ozRExaWFdhM05MNzBZWG9xSVRUbmg4RmpYOFZvWnNnTys3NFpFSlZJL1ZOaW9KOWFwd3dyZDhwdFM4WTJWOEpsc2VsbUZ2NFlsTHZnYWRRRERheVZGZnd0aEkxTW4zTUI4Q29jVDFoYjRnK2hmYlkySC9xWm8wUVo4MjBEdWdXN0dEazdndnBKa0xUb2VvaHNOa3ZoblM5MU1HRnhyZitpT0oxYlZITWlLcFlVNUMzNkxkc0RUbDg1bXQ3My93cWZzNitWaTRBM2ZJMGcrTDVwSkRKei90eUdybW9hN1VOQjg5R1JzQUFSUXFhMnJkSGFCWXJJYWI4VWRpRXBLV0lCY0E9PQ==\"},{\"name\":\"X-SES-DKIM-SIGNATURE\",\"value\":\"a=rsa-sha256; q=dns/txt; b=CXFYhALt4tfdyrB7s8fEBNtD1htsVN9R25Szm4LCI9is4apzx5Gdu9iiExL1MYBcamzMMk0mamLCxNosLC7HCgOmp5IPjTG2hFNf9UAkbg+3jS3mlAY6fSWw96s/dujH8gZoXvinkfUDlf0HYvYuETYOSVYRzNmXtLiLHqbdoqQ=; c=relaxed/simple; s=6gbrjpgwjskckoa6a5zn6fwqkn67xbtw; d=amazonses.com; t=1677839119; v=1; bh=BCp5hxcYf0BCkCwMBUE/WxEPF1FnOMQIUcxnNYyPm2I=; h=From:To:Cc:Bcc:Subject:Date:Message-ID:MIME-Version:Content-Type:X-SES-RECEIPT;\"},{\"name\":\"Received\",\"value\":\"by mail-ua1-f52.google.com with SMTP id n4so1266357ual.13 for ; Fri, 03 Mar 2023 02:25:18 -0800 (PST)\"},{\"name\":\"DKIM-Signature\",\"value\":\"v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20210112; t=1677839118; h=to:subject:message-id:date:from:mime-version:from:to:cc:subject:date:message-id:reply-to; bh=BCp5hxcYf0BCkCwMBUE/WxEPF1FnOMQIUcxnNYyPm2I=; b=omWvzm0ZX8KPQd1JJKSvZoHm1MES89nEFjzIUJly22fqcfusPuJvOl7t5lNUlfxuiRewN7ZLjfvhKNmx6twlqp2OxI8GZaPFDoshptLEVYRcmzRv8S01bUrRdhGTlvQ/ayaghAADZq/VDJVeWw8cj0woJ1GwTEIPyRP3L2wmqm1G2NReXts0Yq6BrikRBNT3MVRFUlpdsHs0GWgRCLPPZAlyui29ig3BcWowYCVATFkO8i0vlmX8FkdFMSapo8RtrMD43x0zFZ8FgmMBascqx0BruBqcOyqFU0zj+56sKQPkcGrdgRyvh2Sxy//QSJsWry7XJeQUXyoh0ZQmI0X7Zw==\"},{\"name\":\"X-Google-DKIM-Signature\",\"value\":\"v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20210112; t=1677839118; h=to:subject:message-id:date:from:mime-version:x-gm-message-state :from:to:cc:subject:date:message-id:reply-to; bh=BCp5hxcYf0BCkCwMBUE/WxEPF1FnOMQIUcxnNYyPm2I=; b=kJFq/D3qY/eCttS85FW+ktATEk+C4Fen3Jrn9sloZ0peqxVWA3S3X7t2rFduxsab1h syK1i9gzLluwCx3ExDr3o2OaQrzHOZwzf4PS6x7ON/NJ4GgQv4HK6rNY80xEoGErxJDQ PMw4A8k/UUCbTEej+3yL4Dticl/hIY08W6Y0yp3gXe1o426BsA/WRR5UXR02MagidOSL siz892AcvurH02GPRJezj/LNx6Mqeqtzv0fpiBpy4r0TRO7JeLBPCSbInndV4he3uZC3 qwuLZURFkQ7e7lugfNGvIuYi+473JvOWWVlqkpfV4vJxaRarioslX2O9jrM4pt83Uesm nHWQ==\"},{\"name\":\"X-Gm-Message-State\",\"value\":\"AO0yUKUSN5Ddt3VxfzBRaVlzT9BMAJqc2+iYXZYmbrEr6FqLd+vNWdQh LPuCi9VffmKPiYdU+aa1ziGOPrByk+VqZ8XPwhRQZfqA\"},{\"name\":\"X-Google-Smtp-Source\",\"value\":\"AK7set8pXO/RqbyoqOESJPr8IWGbCTnfvowIa5MsDmDCCZwED6lgsH8iru4WddjFLwV6XbGC0vn6RWJAEHPcDinJsko=\"},{\"name\":\"X-Received\",\"value\":\"by 2002:a1f:cec4:0:b0:40e:fee9:667a with SMTP id e187-20020a1fcec4000000b0040efee9667amr966128vkg.3.1677839118135; Fri, 03 Mar 2023 02:25:18 -0800 (PST)\"},{\"name\":\"MIME-Version\",\"value\":\"1.0\"},{\"name\":\"From\",\"value\":\"Meno Abels \"},{\"name\":\"Date\",\"value\":\"Fri, 3 Mar 2023 11:25:07 +0100\"},{\"name\":\"Message-ID\",\"value\":\"\"},{\"name\":\"Subject\",\"value\":\"hallo\"},{\"name\":\"To\",\"value\":\"to@smtp.world\"},{\"name\":\"Content-Type\",\"value\":\"multipart/alternative; boundary=\\\"000000000000a0746805f5fc5c6c\\\"\"}],\"commonHeaders\":{\"returnPath\":\"from@smtp.world\",\"from\":[\"Meno Abels \"],\"date\":\"Fri, 3 Mar 2023 11:25:07 +0100\",\"to\":[\"to@smtp.world\"],\"messageId\":\"\",\"subject\":\"hallo\"}},\"receipt\":{\"timestamp\":\"2023-03-03T10:25:18.793Z\",\"processingTimeMillis\":751,\"recipients\":[\"to@smtp.world\"],\"spamVerdict\":{\"status\":\"PASS\"},\"virusVerdict\":{\"status\":\"PASS\"},\"spfVerdict\":{\"status\":\"PASS\"},\"dkimVerdict\":{\"status\":\"PASS\"},\"dmarcVerdict\":{\"status\":\"PASS\"},\"action\":{\"type\":\"S3\",\"topicArn\":\"arn:aws:sns:us-east-1:973800055156:smtp-q\",\"bucketName\":\"adviser-smtp-q\",\"objectKey\":\"nrk5vlqu9usuh476ffj0j3is23okmot9h029da01\"}}}", + "Timestamp" : "2023-03-03T10:25:19.559Z", + "SignatureVersion" : "1", + "Signature" : "iWyTC5N4JaIIwBZgKQJ+Whzk4aOd+Iu0O+ubwVRWJsstlkrWE/v2n+vjcapLMGa4n98JmnCyMGMwoa3LiR17MOD5r+ScW4zaskShQzSpV3454xggPNy24DcwQz2UlUjSoawUxkkfgBrvjcScEx229W5k2Cm36S9WWRnxQ8ZXkVO0MJNwHK02/mnsXokGVMkIml8b4uKvO+9KaPmtYgLBx3SnUzS2SSyOdS+cYjPBwWu4eEeol29hGEkGJ5IjX44ANlG4mTcz5ZPaolD82qjXXCo7YHvFiOiKrvyDU8BS2tsy8pvxxyCWfYKNkDvmd7auQfVBzk7FwXMyIchmXRw3ew==", + "SigningCertURL" : "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-56e67fcb41f6fec09b0196692625d385.pem", + "UnsubscribeURL" : "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:973800055156:smtp-q:f88c920b-6a88-450d-9319-471bcffb5c2d" + } + `), + ReceiptHandle: strPtr("SomeHandle"), + }, + }, + }, nil + } + return nil, fmt.Errorf("queue not found") +} + +func TestObserver(t *testing.T) { + cli := FlagCliArgs + cli.enableObserver = boolPtr(true) + cli.queueName = strPtr("testQ") + cli.queueSmtpHost = strPtr("host") + cli.queueS3Bucket = strPtr("bucket") + cli.queueS3Prefix = strPtr("prefix/") + cli.queueSmtpIdentity = strPtr("identity") + cli.queueSmtpUser = strPtr("user") + cli.queueSmtpPass = strPtr("pass") + cfg, err := ConfigureObserver(cli) + if err != nil { + t.Errorf("Unexpected error: %s", err) + } + + obs, err := NewAWSSESObserver(cfg) + if err != nil { + t.Error(err) + } + obs.SQS.Client = &mockSQSClient{} + err = obs.InitSQS() + if err != nil { + t.Error(err) + } + if *obs.SQS.SQSQueueURL != "q://testQ" { + t.Errorf("expected q://testQ, got %s", *obs.SQS.SQSQueueURL) + } + obs.S3Client = &mockS3Client{} + obs.Smtp = &mockSMTP{} + err = obs.Observe(1) + if err != nil { + t.Error(err) + } + +} + +func TestNotEnabledObserver(t *testing.T) { + cli := FlagCliArgs + _, err := ConfigureObserver(cli) + if err != nil { + t.Errorf("Unexpected error: %s", err) + } +} + +func TestEnabledObserver(t *testing.T) { + cli := FlagCliArgs + cli.enableObserver = boolPtr(true) + _, err := ConfigureObserver(cli) + if err == nil { + t.Errorf("Unexpected error: %v", err) + } +} + +func TestConfiguredDefault(t *testing.T) { + cli := FlagCliArgs + cli.enableObserver = boolPtr(true) + cli.queueName = strPtr("queueName") + cli.queueS3Bucket = strPtr("bucket") + cli.queueSmtpHost = strPtr("host") + cli.queueSmtpPass = strPtr("pass") + obs, err := ConfigureObserver(cli) + if err != nil { + t.Errorf("Unexpected error: %s", err) + } + if obs.QueueName != "queueName" { + t.Errorf("Unexpected queue name: %s", obs.QueueName) + } + if obs.Timeout != 10 { + t.Errorf("Unexpected timeout: %d", obs.Timeout) + } + if obs.MaxMessages != 10 { + t.Errorf("Unexpected max messages: %d", obs.MaxMessages) + } + if obs.Bucket != "bucket" { + t.Errorf("Unexpected bucket: %s", obs.Bucket) + } + if obs.KeyPrefix != "" { + t.Errorf("Unexpected KeyPrefix: %s", obs.KeyPrefix) + } + if obs.SMTP.Host != "host" { + t.Errorf("Unexpected SMTP host: %s", obs.SMTP.Host) + } + if obs.SMTP.Port != 25 { + t.Errorf("Unexpected SMTP port: %d", obs.SMTP.Port) + } + if *obs.SMTP.Pass != "pass" { + t.Errorf("Unexpected SMTP pass: %s", *obs.SMTP.Pass) + } + if obs.SMTP.ConnectionTLS != false { + t.Errorf("Unexpected SMTP connection TLS: %t", obs.SMTP.ConnectionTLS) + } + if obs.SMTP.ForceSTARTTLS != true { + t.Errorf("Unexpected SMTP force STARTTLS: %t", obs.SMTP.ForceSTARTTLS) + } + if obs.SMTP.InsecureTLS != true { + t.Errorf("Unexpected SMTP insecure TLS: %t", obs.SMTP.InsecureTLS) + } + if *obs.SMTP.Identity != "" { + t.Errorf("Unexpected SMTP identity: %s", *obs.SMTP.Identity) + } + if *obs.SMTP.MyName != "AWS-SMTP-Relay-Observer" { + t.Errorf("Unexpected SMTP my name: %s", *obs.SMTP.MyName) + } +} + +func TestConfiguredSet(t *testing.T) { + cli := FlagCliArgs + cli.enableObserver = boolPtr(true) + cli.queueName = strPtr("queueName") + cli.queueS3Bucket = strPtr("bucket") + cli.queueS3Prefix = strPtr("prefix") + cli.queueSmtpHost = strPtr("host") + cli.queueSmtpPort = intPtr(27) + cli.queueSmtpConnectionTLS = boolPtr(true) + cli.queueSmtpForceSTARTTLS = boolPtr(false) + cli.queueSmtpInsecureTLS = boolPtr(false) + cli.queueSmtpMyName = strPtr("myName") + cli.queueSmtpIdentity = strPtr("identity") + os.Setenv("QUEUE_SMTP_PASS", "pass") + obs, err := ConfigureObserver(cli) + os.Unsetenv("QUEUE_SMTP_PASS") + if err != nil { + t.Errorf("Unexpected error: %s", err) + } + if obs.QueueName != "queueName" { + t.Errorf("Unexpected queue name: %s", obs.QueueName) + } + if obs.Timeout != 10 { + t.Errorf("Unexpected timeout: %d", obs.Timeout) + } + if obs.MaxMessages != 10 { + t.Errorf("Unexpected max messages: %d", obs.MaxMessages) + } + if obs.Bucket != "bucket" { + t.Errorf("Unexpected bucket: %s", obs.Bucket) + } + if obs.KeyPrefix != "prefix" { + t.Errorf("Unexpected KeyPrefix: %s", obs.KeyPrefix) + } + if obs.SMTP.Host != "host" { + t.Errorf("Unexpected SMTP host: %s", obs.SMTP.Host) + } + if obs.SMTP.Port != 27 { + t.Errorf("Unexpected SMTP port: %d", obs.SMTP.Port) + } + if *obs.SMTP.Pass != "pass" { + t.Errorf("Unexpected SMTP pass: %s", *obs.SMTP.Pass) + } + if obs.SMTP.ConnectionTLS != true { + t.Errorf("Unexpected SMTP connection TLS: %t", obs.SMTP.ConnectionTLS) + } + if obs.SMTP.ForceSTARTTLS != false { + t.Errorf("Unexpected SMTP force STARTTLS: %t", obs.SMTP.ForceSTARTTLS) + } + if obs.SMTP.InsecureTLS != false { + t.Errorf("Unexpected SMTP insecure TLS: %t", obs.SMTP.InsecureTLS) + } + if *obs.SMTP.Identity != "identity" { + t.Errorf("Unexpected SMTP identity: %s", *obs.SMTP.Identity) + } + if *obs.SMTP.MyName != "myName" { + t.Errorf("Unexpected SMTP my name: %s", *obs.SMTP.MyName) + } +} diff --git a/internal/receiver/aws_ses/types.go b/internal/receiver/aws_ses/types.go new file mode 100644 index 0000000..e006540 --- /dev/null +++ b/internal/receiver/aws_ses/types.go @@ -0,0 +1,77 @@ +package receiver + +import ( + "fmt" + "strings" + "time" +) + +type AwsSesMessage struct { + Type string + MessageId string + TopicArn string + Subject string + Message string + Timestamp string + SignatureVersion string + Signature string + SigningCertURL string + UnsubscribeURL string +} + +type AwsSESStatus struct { + Status string `json:"status"` +} +type AwsSESHeader struct { + Name string `json:"name"` + Value string `json:"value"` +} + +type AWSJsonTime struct { + time.Time +} + +func (ct *AWSJsonTime) UnmarshalJSON(b []byte) (err error) { + s := strings.Trim(string(b), "\"") + if s == "null" { + ct.Time = time.Time{} + return + } + ct.Time, err = time.Parse(time.RFC3339, s) + return +} + +func (ct *AWSJsonTime) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("\"%s\"", ct.Time.Format(time.RFC3339))), nil +} + +type AwsSesNotification struct { + NotificationType string `json:"notificationType"` + Mail struct { + Timestamp AWSJsonTime `json:"timestamp"` + Source string `json:"source"` + MessageId string `json:"messageId"` + Destination []string `json:"destination"` + HeadersTruncated bool `json:"headersTruncated"` + Headers []AwsSESHeader `json:"headers"` + CommonHeaders struct { + ReturnPath string `json:"returnPath"` + From []string `json:"from"` + Date string `json:"date"` + To []string `json:"to"` + MessageId string `json:"messageId"` + Subject string `json:"subject"` + } `json:"commonHeaders"` + } `json:"mail"` + Receipt struct { + Timestamp AWSJsonTime `json:"timestamp"` + ProcessingTimeMillis uint64 `json:"processingTimeMillis"` + Recipients []string `json:"recipients"` + SpamVerdict AwsSESStatus `json:"spamVerdict"` + VirusVerdict AwsSESStatus `json:"virusVerdict"` + SpfVerdict AwsSESStatus `json:"spfVerdict"` + DkimVerdict AwsSESStatus `json:"dkimVerdict"` + DmarcVerdict AwsSESStatus `json:"dmarcVerdict"` + Action AwsSESStatus `json:"action"` + } `json:"receipt"` +} diff --git a/internal/relay/pinpoint/relay.go b/internal/relay/pinpoint/relay.go index 4fc8740..c3956eb 100644 --- a/internal/relay/pinpoint/relay.go +++ b/internal/relay/pinpoint/relay.go @@ -1,18 +1,22 @@ package relay import ( + "context" "net" "regexp" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/pinpointemail" - "github.com/aws/aws-sdk-go/service/pinpointemail/pinpointemailiface" + "github.com/aws/aws-sdk-go-v2/service/pinpointemail" + pinpointemailtypes "github.com/aws/aws-sdk-go-v2/service/pinpointemail/types" "github.com/blueimp/aws-smtp-relay/internal/relay" ) +type PinpointEmailClient interface { + SendEmail(context.Context, *pinpointemail.SendEmailInput, ...func(*pinpointemail.Options)) (*pinpointemail.SendEmailOutput, error) +} + // Client implements the Relay interface. type Client struct { - pinpointAPI pinpointemailiface.PinpointEmailAPI + pinpointClient PinpointEmailClient setName *string allowFromRegExp *regexp.Regexp denyToRegExp *regexp.Regexp @@ -32,22 +36,19 @@ func (c Client) Send( c.denyToRegExp, ) if err != nil { - relay.Log(origin, &from, deniedRecipients, err) + relay.Log(origin, from, deniedRecipients, err) } if len(allowedRecipients) > 0 { - _, err := c.pinpointAPI.SendEmail(&pinpointemail.SendEmailInput{ - ConfigurationSetName: c.setName, - FromEmailAddress: &from, - Destination: &pinpointemail.Destination{ - ToAddresses: allowedRecipients, - }, - Content: &pinpointemail.EmailContent{ - Raw: &pinpointemail.RawMessage{ - Data: data, - }, - }, + _, err := c.pinpointClient.SendEmail(context.Background(), &pinpointemail.SendEmailInput{ + Content: &pinpointemailtypes.EmailContent{Raw: &pinpointemailtypes.RawMessage{Data: data}}, + Destination: &pinpointemailtypes.Destination{ToAddresses: allowedRecipients}, + ConfigurationSetName: c.setName, + EmailTags: []pinpointemailtypes.MessageTag{}, + FeedbackForwardingEmailAddress: new(string), + FromEmailAddress: &from, + ReplyToAddresses: to, }) - relay.Log(origin, &from, allowedRecipients, err) + relay.Log(origin, from, allowedRecipients, err) if err != nil { return err } @@ -62,7 +63,7 @@ func New( denyToRegExp *regexp.Regexp, ) Client { return Client{ - pinpointAPI: pinpointemail.New(session.Must(session.NewSession())), + pinpointClient: pinpointemail.New(pinpointemail.Options{}), setName: configurationSetName, allowFromRegExp: allowFromRegExp, denyToRegExp: denyToRegExp, diff --git a/internal/relay/pinpoint/relay_test.go b/internal/relay/pinpoint/relay_test.go index df6c9a7..b96f149 100644 --- a/internal/relay/pinpoint/relay_test.go +++ b/internal/relay/pinpoint/relay_test.go @@ -1,6 +1,7 @@ package relay import ( + "context" "errors" "io/ioutil" "net" @@ -8,8 +9,7 @@ import ( "regexp" "testing" - "github.com/aws/aws-sdk-go/service/pinpointemail" - "github.com/aws/aws-sdk-go/service/pinpointemail/pinpointemailiface" + "github.com/aws/aws-sdk-go-v2/service/pinpointemail" "github.com/blueimp/aws-smtp-relay/internal/relay" ) @@ -19,18 +19,12 @@ var testData = struct { }{} type mockPinpointEmailClient struct { - pinpointemailiface.PinpointEmailAPI -} - -func (m *mockPinpointEmailClient) CreateConfigurationSet( - input *pinpointemail.CreateConfigurationSetInput, -) (*pinpointemail.CreateConfigurationSetOutput, error) { - return &pinpointemail.CreateConfigurationSetOutput{}, nil } func (m *mockPinpointEmailClient) SendEmail( + context context.Context, input *pinpointemail.SendEmailInput, -) (*pinpointemail.SendEmailOutput, error) { + fns ...func(*pinpointemail.Options)) (*pinpointemail.SendEmailOutput, error) { testData.input = input return nil, testData.err } @@ -59,7 +53,7 @@ func sendHelper( os.Stderr = errWriter func() { c := Client{ - pinpointAPI: &mockPinpointEmailClient{}, + pinpointClient: &mockPinpointEmailClient{}, setName: configurationSetName, allowFromRegExp: allowFromRegExp, denyToRegExp: denyToRegExp, @@ -95,10 +89,10 @@ func TestSend(t *testing.T) { 1, ) } - if *input.Destination.ToAddresses[0] != to[0] { + if input.Destination.ToAddresses[0] != to[0] { t.Errorf( "Unexpected destination: %s. Expected: %s", - *input.Destination.ToAddresses[0], + input.Destination.ToAddresses[0], to[0], ) } @@ -128,10 +122,10 @@ func TestSendWithMultipleRecipients(t *testing.T) { 2, ) } - if *input.Destination.ToAddresses[0] != to[0] { + if input.Destination.ToAddresses[0] != to[0] { t.Errorf( "Unexpected destination: %s. Expected: %s", - *input.Destination.ToAddresses[0], + input.Destination.ToAddresses[0], to[0], ) } @@ -184,10 +178,10 @@ func TestSendWithDeniedRecipient(t *testing.T) { 1, ) } - if *input.Destination.ToAddresses[0] != to[1] { + if input.Destination.ToAddresses[0] != to[1] { t.Errorf( "Unexpected destination: %s. Expected: %s", - *input.Destination.ToAddresses[0], + input.Destination.ToAddresses[0], to[1], ) } @@ -224,10 +218,10 @@ func TestSendWithApiError(t *testing.T) { 1, ) } - if *input.Destination.ToAddresses[0] != to[0] { + if input.Destination.ToAddresses[0] != to[0] { t.Errorf( "Unexpected destination: %s. Expected: %s", - *input.Destination.ToAddresses[0], + input.Destination.ToAddresses[0], to[0], ) } diff --git a/internal/relay/relay.go b/internal/relay/relay.go index 2246933..19c5d33 100644 --- a/internal/relay/relay.go +++ b/internal/relay/relay.go @@ -34,18 +34,18 @@ type Client interface { type logEntry struct { Time time.Time - IP *string - From *string - To []*string + IP string + From string + To []string Error *string } // Log creates a log entry and prints it as JSON to STDOUT. -func Log(origin net.Addr, from *string, to []*string, err error) { +func Log(origin net.Addr, from string, to []string, err error) { ip := origin.(*net.TCPAddr).IP.String() entry := &logEntry{ Time: time.Now().UTC(), - IP: &ip, + IP: ip, From: from, To: to, } @@ -67,9 +67,9 @@ func FilterAddresses( to []string, allowFromRegExp *regexp.Regexp, denyToRegExp *regexp.Regexp, -) (allowedRecipients []*string, deniedRecipients []*string, err error) { - allowedRecipients = []*string{} - deniedRecipients = []*string{} +) (allowedRecipients []string, deniedRecipients []string, err error) { + allowedRecipients = []string{} + deniedRecipients = []string{} if allowFromRegExp != nil && !allowFromRegExp.MatchString(from) { err = ErrDeniedSender } @@ -78,9 +78,9 @@ func FilterAddresses( // Deny all recipients if the sender address is not allowed if err != nil || (denyToRegExp != nil && denyToRegExp.MatchString(*recipient)) { - deniedRecipients = append(deniedRecipients, recipient) + deniedRecipients = append(deniedRecipients, *recipient) } else { - allowedRecipients = append(allowedRecipients, recipient) + allowedRecipients = append(allowedRecipients, *recipient) } } if err == nil && len(deniedRecipients) > 0 { diff --git a/internal/relay/relay_test.go b/internal/relay/relay_test.go index 49a0ddd..d23fb93 100644 --- a/internal/relay/relay_test.go +++ b/internal/relay/relay_test.go @@ -11,17 +11,7 @@ import ( "time" ) -func pointersToValues(pointers []*string) []string { - values := []string{} - for k := range pointers { - if pointers[k] != nil { - values = append(values, *(pointers)[k]) - } - } - return values -} - -func logHelper(addr net.Addr, from *string, to []*string, err error) ( +func logHelper(addr net.Addr, from string, to []string, err error) ( []byte, []byte, ) { @@ -52,8 +42,8 @@ func TestLog(t *testing.T) { "bob@example.org", "charlie@example.org", } - from := &emails[0] - to := []*string{&emails[1], &emails[2]} + from := emails[0] + to := []string{emails[1], emails[2]} timeBefore := time.Now() out, err := logHelper(&origin, from, to, nil) timeAfter := time.Now() @@ -65,18 +55,14 @@ func TestLog(t *testing.T) { if entry.Time.After(timeAfter) { t.Errorf("Unexpected 'Time' log: %s", entry.Time) } - if entry.IP == nil { - t.Errorf("Unexpected 'IP' log: %v. Expected: %s", nil, "127.0.0.1") - } else if *entry.IP != "127.0.0.1" { - t.Errorf("Unexpected 'IP' log: %s. Expected: %s", *entry.IP, "127.0.0.1") + if entry.IP != "127.0.0.1" { + t.Errorf("Unexpected 'IP' log: %s. Expected: %s", entry.IP, "127.0.0.1") } - if entry.From == nil { - t.Errorf("Unexpected 'From' log: %v. Expected: %s", nil, *from) - } else if *entry.From != *from { - t.Errorf("Unexpected 'From' log: %s. Expected: %s", *entry.From, *from) + if entry.From != from { + t.Errorf("Unexpected 'From' log: %s. Expected: %s", entry.From, from) } - toVals := pointersToValues(entry.To) - expectedToVals := pointersToValues(to) + toVals := entry.To + expectedToVals := to if len(toVals) != len(expectedToVals) || toVals[0] != expectedToVals[0] || toVals[1] != expectedToVals[1] { t.Errorf("Unexpected 'To' log: %s. Expected: %s", toVals, expectedToVals) @@ -98,15 +84,15 @@ func TestLogWithOriginIPv6(t *testing.T) { "bob@example.org", "charlie@example.org", } - from := &emails[0] - to := []*string{&emails[1], &emails[2]} + from := emails[0] + to := []string{emails[1], emails[2]} out, err := logHelper(&origin, from, to, nil) var entry logEntry json.Unmarshal(out, &entry) - if *entry.IP != "2001:4860:0:2001::68" { + if entry.IP != "2001:4860:0:2001::68" { t.Errorf( "Unexpected 'IP' log: %s. Expected: %s", - *entry.IP, + entry.IP, "2001:4860:0:2001::68", ) } @@ -125,8 +111,8 @@ func TestLogWithError(t *testing.T) { "bob@example.org", "charlie@example.org", } - from := &emails[0] - to := []*string{&emails[1], &emails[2]} + from := emails[0] + to := []string{emails[1], emails[2]} out, err := logHelper(&origin, from, to, errors.New("ERROR")) var entry logEntry json.Unmarshal(out, &entry) @@ -155,7 +141,7 @@ func TestFilterAddresses(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %s. Expected: %v", err, nil) } - allowedRecipientsValues := pointersToValues(allowedRecipients) + allowedRecipientsValues := allowedRecipients if len(allowedRecipients) != 2 || allowedRecipientsValues[0] != to[0] || allowedRecipientsValues[1] != to[1] { t.Errorf( @@ -167,7 +153,7 @@ func TestFilterAddresses(t *testing.T) { if len(deniedRecipients) != 0 { t.Errorf( "Unexpected denied recipients: %s. Expected: %s", - pointersToValues(deniedRecipients), + deniedRecipients, []string{}, ) } @@ -200,11 +186,11 @@ func TestFilterAddressesWithDeniedSender(t *testing.T) { if len(allowedRecipients) != 0 { t.Errorf( "Unexpected allowed recipients: %s. Expected: %s", - pointersToValues(allowedRecipients), + allowedRecipients, []string{}, ) } - deniedRecipientsValues := pointersToValues(deniedRecipients) + deniedRecipientsValues := deniedRecipients if len(deniedRecipients) != 2 || deniedRecipientsValues[0] != to[0] || deniedRecipientsValues[1] != to[1] { t.Errorf( @@ -239,7 +225,7 @@ func TestFilterAddressesWithDeniedRecipients(t *testing.T) { expectedError.Error(), ) } - allowedRecipientsValues := pointersToValues(allowedRecipients) + allowedRecipientsValues := allowedRecipients if len(allowedRecipients) != 1 || allowedRecipientsValues[0] != to[1] { t.Errorf( "Unexpected allowed recipients: %s. Expected: %s", @@ -247,7 +233,7 @@ func TestFilterAddressesWithDeniedRecipients(t *testing.T) { []string{"charlie@example.org"}, ) } - deniedRecipientsValues := pointersToValues(deniedRecipients) + deniedRecipientsValues := deniedRecipients if len(deniedRecipients) != 1 || deniedRecipientsValues[0] != to[0] { t.Errorf( "Unexpected denied recipients: %s. Expected: %s", diff --git a/internal/relay/ses/relay.go b/internal/relay/ses/relay.go index cada4f6..1460dfb 100644 --- a/internal/relay/ses/relay.go +++ b/internal/relay/ses/relay.go @@ -1,18 +1,22 @@ package relay import ( + "context" "net" "regexp" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/ses" - "github.com/aws/aws-sdk-go/service/ses/sesiface" + "github.com/aws/aws-sdk-go-v2/service/ses" + sestypes "github.com/aws/aws-sdk-go-v2/service/ses/types" "github.com/blueimp/aws-smtp-relay/internal/relay" ) +type SESEmailClient interface { + SendRawEmail(context.Context, *ses.SendRawEmailInput, ...func(*ses.Options)) (*ses.SendRawEmailOutput, error) +} + // Client implements the Relay interface. type Client struct { - sesAPI sesiface.SESAPI + sesClient SESEmailClient setName *string allowFromRegExp *regexp.Regexp denyToRegExp *regexp.Regexp @@ -32,16 +36,20 @@ func (c Client) Send( c.denyToRegExp, ) if err != nil { - relay.Log(origin, &from, deniedRecipients, err) + relay.Log(origin, from, deniedRecipients, err) } if len(allowedRecipients) > 0 { - _, err := c.sesAPI.SendRawEmail(&ses.SendRawEmailInput{ + _, err := c.sesClient.SendRawEmail(context.Background(), &ses.SendRawEmailInput{ + RawMessage: &sestypes.RawMessage{Data: data}, ConfigurationSetName: c.setName, - Source: &from, Destinations: allowedRecipients, - RawMessage: &ses.RawMessage{Data: data}, + FromArn: new(string), + ReturnPathArn: new(string), + Source: &from, + SourceArn: new(string), + Tags: []sestypes.MessageTag{}, }) - relay.Log(origin, &from, allowedRecipients, err) + relay.Log(origin, from, allowedRecipients, err) if err != nil { return err } @@ -56,7 +64,7 @@ func New( denyToRegExp *regexp.Regexp, ) Client { return Client{ - sesAPI: ses.New(session.Must(session.NewSession())), + sesClient: ses.New(ses.Options{}), setName: configurationSetName, allowFromRegExp: allowFromRegExp, denyToRegExp: denyToRegExp, diff --git a/internal/relay/ses/relay_test.go b/internal/relay/ses/relay_test.go index 5210cbc..5a85170 100644 --- a/internal/relay/ses/relay_test.go +++ b/internal/relay/ses/relay_test.go @@ -1,6 +1,7 @@ package relay import ( + "context" "errors" "io/ioutil" "net" @@ -8,8 +9,7 @@ import ( "regexp" "testing" - "github.com/aws/aws-sdk-go/service/ses" - "github.com/aws/aws-sdk-go/service/ses/sesiface" + "github.com/aws/aws-sdk-go-v2/service/ses" "github.com/blueimp/aws-smtp-relay/internal/relay" ) @@ -18,11 +18,10 @@ var testData = struct { err error }{} -type mockSESAPI struct { - sesiface.SESAPI +type mockSESClient struct { } -func (m *mockSESAPI) SendRawEmail(input *ses.SendRawEmailInput) ( +func (m mockSESClient) SendRawEmail(ctx context.Context, input *ses.SendRawEmailInput, optFns ...func(*ses.Options)) ( *ses.SendRawEmailOutput, error, ) { @@ -54,7 +53,7 @@ func sendHelper( os.Stderr = errWriter func() { c := Client{ - sesAPI: &mockSESAPI{}, + sesClient: &mockSESClient{}, setName: configurationSetName, allowFromRegExp: allowFromRegExp, denyToRegExp: denyToRegExp, @@ -90,10 +89,10 @@ func TestSend(t *testing.T) { 1, ) } - if *input.Destinations[0] != to[0] { + if input.Destinations[0] != to[0] { t.Errorf( "Unexpected destination: %s. Expected: %s", - *input.Destinations[0], + input.Destinations[0], to[0], ) } @@ -123,10 +122,10 @@ func TestSendWithMultipleRecipients(t *testing.T) { 2, ) } - if *input.Destinations[0] != to[0] { + if input.Destinations[0] != to[0] { t.Errorf( "Unexpected destination: %s. Expected: %s", - *input.Destinations[0], + input.Destinations[0], to[0], ) } @@ -179,10 +178,10 @@ func TestSendWithDeniedRecipient(t *testing.T) { 1, ) } - if *input.Destinations[0] != to[1] { + if input.Destinations[0] != to[1] { t.Errorf( "Unexpected destination: %s. Expected: %s", - *input.Destinations[0], + input.Destinations[0], to[1], ) } @@ -219,10 +218,10 @@ func TestSendWithApiError(t *testing.T) { 1, ) } - if *input.Destinations[0] != to[0] { + if input.Destinations[0] != to[0] { t.Errorf( "Unexpected destination: %s. Expected: %s", - *input.Destinations[0], + input.Destinations[0], to[0], ) } diff --git a/main.go b/main.go index 10797a6..48ac6ec 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/blueimp/aws-smtp-relay/internal/auth" + receiver "github.com/blueimp/aws-smtp-relay/internal/receiver/aws_ses" "github.com/blueimp/aws-smtp-relay/internal/relay" pinpointrelay "github.com/blueimp/aws-smtp-relay/internal/relay/pinpoint" sesrelay "github.com/blueimp/aws-smtp-relay/internal/relay/ses" @@ -95,18 +96,41 @@ func configure() error { } bcryptHash = []byte(os.Getenv("BCRYPT_HASH")) password = []byte(os.Getenv("PASSWORD")) + return nil } func main() { flag.Parse() - var srv *smtpd.Server err := configure() + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + observeCfg, err := receiver.ConfigureObserver() + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + if observeCfg != nil { + go func() { + obs, err := receiver.NewAWSSESObserver(observeCfg) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + err = obs.InitSQS() + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + obs.Observe() + }() + } + srv, err := server() if err == nil { - srv, err = server() - if err == nil { - err = srv.ListenAndServe() - } + err = srv.ListenAndServe() } if err != nil { fmt.Fprintln(os.Stderr, err)