Skip to content

exploration: how many profiles are buffered if the agent endpoint is down? #3524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions profiler/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (
// DefaultUploadTimeout specifies the default timeout for uploading profiles.
// It can be overwritten using the DD_PROFILING_UPLOAD_TIMEOUT env variable
// or the WithUploadTimeout option.
DefaultUploadTimeout = 10 * time.Second
DefaultUploadTimeout = 10 * time.Second // <--
)

const (
Expand Down Expand Up @@ -195,8 +195,8 @@ func defaultConfig() (*config, error) {
cpuDuration: DefaultDuration,
blockRate: DefaultBlockRate,
mutexFraction: DefaultMutexFraction,
uploadTimeout: DefaultUploadTimeout,
maxGoroutinesWait: 1000, // arbitrary value, should limit STW to ~30ms
uploadTimeout: DefaultUploadTimeout, // <--
maxGoroutinesWait: 1000, // arbitrary value, should limit STW to ~30ms
deltaProfiles: internal.BoolEnv("DD_PROFILING_DELTA", true),
logStartup: internal.BoolEnv("DD_TRACE_STARTUP_LOGS", true),
endpointCountEnabled: internal.BoolEnv(traceprof.EndpointCountEnvVar, false),
Expand Down
21 changes: 18 additions & 3 deletions profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

// outChannelSize specifies the size of the profile output channel.
const outChannelSize = 5
const outChannelSize = 5 // <--

// customProfileLabelLimit is the maximum number of pprof labels which can
// be used as custom attributes in the profiler UI
Expand Down Expand Up @@ -200,7 +200,7 @@ func newProfiler(opts ...Option) (*profiler, error) {
//
// see similar discussion: https://github.com/golang/go/issues/39177
if cfg.uploadTimeout <= 0 {
return nil, fmt.Errorf("invalid upload timeout, must be > 0: %s", cfg.uploadTimeout)
return nil, fmt.Errorf("invalid upload timeout, must be > 0: %s", cfg.uploadTimeout) // <-- mentioned in doRequest
}
for pt := range cfg.types {
if _, ok := profileTypes[pt]; !ok {
Expand Down Expand Up @@ -236,7 +236,7 @@ func newProfiler(opts ...Option) (*profiler, error) {

p := profiler{
cfg: cfg,
out: make(chan batch, outChannelSize),
out: make(chan batch, outChannelSize), // <-- 5
exit: make(chan struct{}),
met: newMetrics(),
deltas: make(map[ProfileType]*fastDeltaProfiler),
Expand Down Expand Up @@ -444,6 +444,9 @@ func (p *profiler) enabledProfileTypes() []ProfileType {
// enqueueUpload pushes a batch of profiles onto the queue to be uploaded. If there is no room, it will
// evict the oldest profile to make some. Typically a batch would be one of each enabled profile.
func (p *profiler) enqueueUpload(bat batch) {
// explanation: p.out is a channel with a capacity of 5. we keep buffering
// profile batches into the channel until it's full. After that, we start
// evicting the oldest profile batches until there's room in the channel.
for {
select {
case p.out <- bat:
Expand All @@ -465,6 +468,18 @@ func (p *profiler) enqueueUpload(bat batch) {

// send takes profiles from the output queue and uploads them.
func (p *profiler) send() {
// explanation: this loop drains the p.out channel and uploads the profiles.
// in the worst case our uploadFunc takes a long time to complete [1], so
// p.out could hold 5 profiles. together with the profile held by uploadFunc
// and the profile currently being collected, this would result in 7
// profiles being buffered in memory. profiles are typically ~5 MiB (we have
// a hard limit of 30 MiB on the intake queue IIRC), so I'd be very
// surprised if this ends up buffering more than 100 MiB of profiles in
// practice even if the agent endpoint is down.
//
// [1] uploadFunc should not take longer than 20 seconds (2 attempts with
// 10s timeout) and our profile production rate is one per 60s. So this case
// should not happen, but let's entertain the possibility that it does.
for {
select {
case <-p.exit:
Expand Down
6 changes: 5 additions & 1 deletion profiler/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

// maxRetries specifies the maximum number of retries to have when an error occurs.
const maxRetries = 2
const maxRetries = 2 // <-- this should probably be renamed to maxUploadAttempts

var errOldAgent = errors.New("Datadog Agent is not accepting profiles. Agent-based profiling deployments " +
"require Datadog Agent >= 7.20")
Expand All @@ -34,6 +34,9 @@ var errOldAgent = errors.New("Datadog Agent is not accepting profiles. Agent-bas
func (p *profiler) upload(bat batch) error {
statsd := p.cfg.statsd
var err error
// explanation: this is the profiler.uploadFunc that is called by the
// profiler.send. It retries the upload of a profile up to 2 times before
// giving up.
for i := 0; i < maxRetries; i++ {
select {
case <-p.exit:
Expand Down Expand Up @@ -96,6 +99,7 @@ func (p *profiler) doRequest(bat batch) error {
funcExit := make(chan struct{})
defer close(funcExit)
// uploadTimeout is guaranteed to be >= 0, see newProfiler.
// p.cfg.uploadTimeout is 10 seconds by default
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.uploadTimeout)
go func() {
select {
Expand Down
Loading