Skip to content

Commit c083ecc

Browse files
authored
feat: upgrade to otel collector v0.111.0 (#140)
* feat: upgrade to otel collector v0.110.0 * use otel collector metrics instead of opencensus ones * Upgrade to otel collector v0.111.0 * test metrics and fix metrics labels issue * shorten long line to fix lint
1 parent d6e6668 commit c083ecc

File tree

79 files changed

+3298
-1955
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+3298
-1955
lines changed

cmd/collector/main.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@ import (
1515
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver"
1616
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
1717
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
18-
"go.opencensus.io/stats/view"
1918
"go.opentelemetry.io/collector/component"
19+
"go.opentelemetry.io/collector/confmap"
20+
"go.opentelemetry.io/collector/confmap/provider/envprovider"
21+
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
22+
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
2023
"go.opentelemetry.io/collector/exporter"
21-
"go.opentelemetry.io/collector/exporter/loggingexporter"
24+
"go.opentelemetry.io/collector/exporter/debugexporter"
2225
"go.opentelemetry.io/collector/exporter/otlpexporter"
2326
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
2427
"go.opentelemetry.io/collector/extension"
25-
"go.opentelemetry.io/collector/extension/ballastextension"
2628
"go.opentelemetry.io/collector/extension/zpagesextension"
2729
"go.opentelemetry.io/collector/otelcol"
2830
"go.opentelemetry.io/collector/processor"
@@ -40,17 +42,29 @@ import (
4042
)
4143

4244
func main() {
43-
if err := registerMetricViews(); err != nil {
44-
log.Fatal(err)
45-
}
46-
4745
info := component.BuildInfo{
4846
Command: "collector",
4947
Description: "Hypertrace Collector",
5048
Version: BuildVersion,
5149
}
5250

53-
if err := run(otelcol.CollectorSettings{BuildInfo: info, Factories: components}); err != nil {
51+
cfgProviderSettings := otelcol.ConfigProviderSettings{
52+
ResolverSettings: confmap.ResolverSettings{
53+
ProviderFactories: []confmap.ProviderFactory{
54+
fileprovider.NewFactory(),
55+
envprovider.NewFactory(),
56+
yamlprovider.NewFactory(),
57+
},
58+
},
59+
}
60+
61+
collectorSettings := otelcol.CollectorSettings{
62+
BuildInfo: info,
63+
Factories: components,
64+
ConfigProviderSettings: cfgProviderSettings,
65+
}
66+
67+
if err := run(collectorSettings); err != nil {
5468
log.Fatal(err)
5569
}
5670
}
@@ -121,21 +135,13 @@ func run(settings otelcol.CollectorSettings) error {
121135
return nil
122136
}
123137

124-
func registerMetricViews() error {
125-
views := tenantidprocessor.MetricViews()
126-
views = append(views, spancounter.MetricViews()...)
127-
views = append(views, ratelimiter.MetricViews()...)
128-
return view.Register(views...)
129-
}
130-
131138
// defaultComponents() is defined here since service/defaultcomponents pkg was
132139
// removed in the otel collector repo.
133140
func defaultComponents() (otelcol.Factories, error) {
134141
var errs error
135142

136143
extensions, err := extension.MakeFactoryMap(
137144
zpagesextension.NewFactory(),
138-
ballastextension.NewFactory(),
139145
)
140146
errs = multierr.Append(errs, err)
141147

@@ -145,7 +151,7 @@ func defaultComponents() (otelcol.Factories, error) {
145151
errs = multierr.Append(errs, err)
146152

147153
exporters, err := exporter.MakeFactoryMap(
148-
loggingexporter.NewFactory(),
154+
debugexporter.NewFactory(),
149155
otlpexporter.NewFactory(),
150156
otlphttpexporter.NewFactory(),
151157
)

exporter/kafkaexporter/README.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
**IMPORTANT:** This component is copied from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.102.0/exporter/kafkaexporter and
1+
**IMPORTANT:** This component is copied from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.111.0/exporter/kafkaexporter and
22
adapted to accept compression settings and also do span curing on large spans.
33
# Kafka Exporter
44

@@ -26,8 +26,8 @@ The following settings can be optionally configured:
2626
- `brokers` (default = localhost:9092): The list of kafka brokers.
2727
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
2828
- `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
29-
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
30-
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. This option, when set, will take precedence over the default topic. If `topic_from_attribute` is not set, the message's topic will be set to the value of the configuration option `topic` instead.
29+
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the default kafka topic to export to. See [Destination Topic](#destination-topic) below for more details.
30+
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. See [Destination Topic](#destination-topic) below for more details.
3131
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
3232
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
3333
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
@@ -40,6 +40,7 @@ The following settings can be optionally configured:
4040
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
4141
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
4242
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
43+
- `partition_logs_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
4344
- `auth`
4445
- `plain_text`
4546
- `username`: The username to use.
@@ -70,6 +71,7 @@ The following settings can be optionally configured:
7071
- `password`: The Kerberos password used for authenticate with KDC
7172
- `config_file`: Path to Kerberos configuration. i.e /etc/krb5.conf
7273
- `keytab_file`: Path to keytab file. i.e /etc/security/kafka.keytab
74+
- `disable_fast_negotiation`: Disable PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation. This is set to `false` by default.
7375
- `metadata`
7476
- `full` (default = true): Whether to maintain a full set of metadata. When
7577
disabled, the client does not make the initial request to broker at the
@@ -105,3 +107,9 @@ exporters:
105107
- localhost:9092
106108
protocol_version: 2.0.0
107109
```
110+
111+
## Destination Topic
112+
The destination topic can be defined in a few different ways and takes priority in the following order:
113+
1. When `topic_from_attribute` is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
114+
2. If a prior component in the collector pipeline sets the topic on the context via the `topic.WithTopic` function (from the `github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic` package), the value set in the context is used.
115+
3. Finally, the `topic` configuration is used as a default/fallback destination.

exporter/kafkaexporter/config.go_original

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import (
1717

1818
// Config defines configuration for Kafka exporter.
1919
type Config struct {
20-
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
21-
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
22-
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
20+
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
21+
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
22+
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
2323

2424
// The list of kafka brokers (default localhost:9092)
2525
Brokers []string `mapstructure:"brokers"`
@@ -53,6 +53,8 @@ type Config struct {
5353

5454
PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`
5555

56+
PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`
57+
5658
// Metadata is the namespace for metadata management properties used by the
5759
// Client, and shared by the Producer/Consumer.
5860
Metadata Metadata `mapstructure:"metadata"`

exporter/kafkaexporter/config.modified.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import (
1717

1818
// Config defines configuration for Kafka exporter.
1919
type Config struct {
20-
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
21-
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
22-
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
20+
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
21+
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
22+
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
2323

2424
// The list of kafka brokers (default localhost:9092)
2525
Brokers []string `mapstructure:"brokers"`
@@ -53,6 +53,8 @@ type Config struct {
5353

5454
PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`
5555

56+
PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`
57+
5658
// Metadata is the namespace for metadata management properties used by the
5759
// Client, and shared by the Producer/Consumer.
5860
Metadata Metadata `mapstructure:"metadata"`

exporter/kafkaexporter/config_modified_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestLoadConfig(t *testing.T) {
3939
// intentionally left blank so we use default config
4040
},
4141
expected: &Config{
42-
TimeoutSettings: exporterhelper.TimeoutSettings{
42+
TimeoutSettings: exporterhelper.TimeoutConfig{
4343
Timeout: 10 * time.Second,
4444
},
4545
BackOffConfig: configretry.BackOffConfig{
@@ -50,7 +50,7 @@ func TestLoadConfig(t *testing.T) {
5050
RandomizationFactor: backoff.DefaultRandomizationFactor,
5151
Multiplier: backoff.DefaultMultiplier,
5252
},
53-
QueueSettings: exporterhelper.QueueSettings{
53+
QueueSettings: exporterhelper.QueueConfig{
5454
Enabled: true,
5555
NumConsumers: 2,
5656
QueueSize: 10,
@@ -59,6 +59,7 @@ func TestLoadConfig(t *testing.T) {
5959
Encoding: "otlp_proto",
6060
PartitionTracesByID: true,
6161
PartitionMetricsByResourceAttributes: true,
62+
PartitionLogsByResourceAttributes: true,
6263
Brokers: []string{"foo:123", "bar:456"},
6364
ClientID: "test_client_id",
6465
Authentication: kafka.Authentication{
@@ -104,7 +105,7 @@ func TestLoadConfig(t *testing.T) {
104105
}
105106
},
106107
expected: &Config{
107-
TimeoutSettings: exporterhelper.TimeoutSettings{
108+
TimeoutSettings: exporterhelper.TimeoutConfig{
108109
Timeout: 10 * time.Second,
109110
},
110111
BackOffConfig: configretry.BackOffConfig{
@@ -115,7 +116,7 @@ func TestLoadConfig(t *testing.T) {
115116
RandomizationFactor: backoff.DefaultRandomizationFactor,
116117
Multiplier: backoff.DefaultMultiplier,
117118
},
118-
QueueSettings: exporterhelper.QueueSettings{
119+
QueueSettings: exporterhelper.QueueConfig{
119120
Enabled: true,
120121
NumConsumers: 2,
121122
QueueSize: 10,
@@ -124,6 +125,7 @@ func TestLoadConfig(t *testing.T) {
124125
Encoding: "otlp_proto",
125126
PartitionTracesByID: true,
126127
PartitionMetricsByResourceAttributes: true,
128+
PartitionLogsByResourceAttributes: true,
127129
Brokers: []string{"foo:123", "bar:456"},
128130
ClientID: "test_client_id",
129131
Authentication: kafka.Authentication{
@@ -168,7 +170,7 @@ func TestLoadConfig(t *testing.T) {
168170
conf.ResolveCanonicalBootstrapServersOnly = true
169171
},
170172
expected: &Config{
171-
TimeoutSettings: exporterhelper.TimeoutSettings{
173+
TimeoutSettings: exporterhelper.TimeoutConfig{
172174
Timeout: 10 * time.Second,
173175
},
174176
BackOffConfig: configretry.BackOffConfig{
@@ -179,7 +181,7 @@ func TestLoadConfig(t *testing.T) {
179181
RandomizationFactor: backoff.DefaultRandomizationFactor,
180182
Multiplier: backoff.DefaultMultiplier,
181183
},
182-
QueueSettings: exporterhelper.QueueSettings{
184+
QueueSettings: exporterhelper.QueueConfig{
183185
Enabled: true,
184186
NumConsumers: 2,
185187
QueueSize: 10,
@@ -188,6 +190,7 @@ func TestLoadConfig(t *testing.T) {
188190
Encoding: "otlp_proto",
189191
PartitionTracesByID: true,
190192
PartitionMetricsByResourceAttributes: true,
193+
PartitionLogsByResourceAttributes: true,
191194
Brokers: []string{"foo:123", "bar:456"},
192195
ClientID: "test_client_id",
193196
ResolveCanonicalBootstrapServersOnly: true,
@@ -229,7 +232,7 @@ func TestLoadConfig(t *testing.T) {
229232

230233
sub, err := cm.Sub(tt.id.String())
231234
require.NoError(t, err)
232-
require.NoError(t, component.UnmarshalConfig(sub, cfg))
235+
require.NoError(t, sub.Unmarshal(cfg))
233236

234237
assert.NoError(t, component.ValidateConfig(cfg))
235238
assert.Equal(t, tt.expected, cfg)
@@ -362,8 +365,8 @@ func Test_saramaProducerCompressionCodec(t *testing.T) {
362365
for name, test := range tests {
363366
t.Run(name, func(t *testing.T) {
364367
c, err := saramaProducerCompressionCodec(test.compression)
365-
assert.Equal(t, c, test.expectedCompression)
366-
assert.Equal(t, err, test.expectedError)
368+
assert.Equal(t, test.expectedCompression, c)
369+
assert.Equal(t, test.expectedError, err)
367370
})
368371
}
369372
}

exporter/kafkaexporter/config_test.go_original

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestLoadConfig(t *testing.T) {
3939
// intentionally left blank so we use default config
4040
},
4141
expected: &Config{
42-
TimeoutSettings: exporterhelper.TimeoutSettings{
42+
TimeoutSettings: exporterhelper.TimeoutConfig{
4343
Timeout: 10 * time.Second,
4444
},
4545
BackOffConfig: configretry.BackOffConfig{
@@ -50,7 +50,7 @@ func TestLoadConfig(t *testing.T) {
5050
RandomizationFactor: backoff.DefaultRandomizationFactor,
5151
Multiplier: backoff.DefaultMultiplier,
5252
},
53-
QueueSettings: exporterhelper.QueueSettings{
53+
QueueSettings: exporterhelper.QueueConfig{
5454
Enabled: true,
5555
NumConsumers: 2,
5656
QueueSize: 10,
@@ -59,6 +59,7 @@ func TestLoadConfig(t *testing.T) {
5959
Encoding: "otlp_proto",
6060
PartitionTracesByID: true,
6161
PartitionMetricsByResourceAttributes: true,
62+
PartitionLogsByResourceAttributes: true,
6263
Brokers: []string{"foo:123", "bar:456"},
6364
ClientID: "test_client_id",
6465
Authentication: kafka.Authentication{
@@ -94,7 +95,7 @@ func TestLoadConfig(t *testing.T) {
9495
}
9596
},
9697
expected: &Config{
97-
TimeoutSettings: exporterhelper.TimeoutSettings{
98+
TimeoutSettings: exporterhelper.TimeoutConfig{
9899
Timeout: 10 * time.Second,
99100
},
100101
BackOffConfig: configretry.BackOffConfig{
@@ -105,7 +106,7 @@ func TestLoadConfig(t *testing.T) {
105106
RandomizationFactor: backoff.DefaultRandomizationFactor,
106107
Multiplier: backoff.DefaultMultiplier,
107108
},
108-
QueueSettings: exporterhelper.QueueSettings{
109+
QueueSettings: exporterhelper.QueueConfig{
109110
Enabled: true,
110111
NumConsumers: 2,
111112
QueueSize: 10,
@@ -114,6 +115,7 @@ func TestLoadConfig(t *testing.T) {
114115
Encoding: "otlp_proto",
115116
PartitionTracesByID: true,
116117
PartitionMetricsByResourceAttributes: true,
118+
PartitionLogsByResourceAttributes: true,
117119
Brokers: []string{"foo:123", "bar:456"},
118120
ClientID: "test_client_id",
119121
Authentication: kafka.Authentication{
@@ -148,7 +150,7 @@ func TestLoadConfig(t *testing.T) {
148150
conf.ResolveCanonicalBootstrapServersOnly = true
149151
},
150152
expected: &Config{
151-
TimeoutSettings: exporterhelper.TimeoutSettings{
153+
TimeoutSettings: exporterhelper.TimeoutConfig{
152154
Timeout: 10 * time.Second,
153155
},
154156
BackOffConfig: configretry.BackOffConfig{
@@ -159,7 +161,7 @@ func TestLoadConfig(t *testing.T) {
159161
RandomizationFactor: backoff.DefaultRandomizationFactor,
160162
Multiplier: backoff.DefaultMultiplier,
161163
},
162-
QueueSettings: exporterhelper.QueueSettings{
164+
QueueSettings: exporterhelper.QueueConfig{
163165
Enabled: true,
164166
NumConsumers: 2,
165167
QueueSize: 10,
@@ -168,6 +170,7 @@ func TestLoadConfig(t *testing.T) {
168170
Encoding: "otlp_proto",
169171
PartitionTracesByID: true,
170172
PartitionMetricsByResourceAttributes: true,
173+
PartitionLogsByResourceAttributes: true,
171174
Brokers: []string{"foo:123", "bar:456"},
172175
ClientID: "test_client_id",
173176
ResolveCanonicalBootstrapServersOnly: true,
@@ -199,7 +202,7 @@ func TestLoadConfig(t *testing.T) {
199202

200203
sub, err := cm.Sub(tt.id.String())
201204
require.NoError(t, err)
202-
require.NoError(t, component.UnmarshalConfig(sub, cfg))
205+
require.NoError(t, sub.Unmarshal(cfg))
203206

204207
assert.NoError(t, component.ValidateConfig(cfg))
205208
assert.Equal(t, tt.expected, cfg)
@@ -332,8 +335,8 @@ func Test_saramaProducerCompressionCodec(t *testing.T) {
332335
for name, test := range tests {
333336
t.Run(name, func(t *testing.T) {
334337
c, err := saramaProducerCompressionCodec(test.compression)
335-
assert.Equal(t, c, test.expectedCompression)
336-
assert.Equal(t, err, test.expectedError)
338+
assert.Equal(t, test.expectedCompression, c)
339+
assert.Equal(t, test.expectedError, err)
337340
})
338341
}
339342
}

0 commit comments

Comments
 (0)