Skip to content

Conversation

@etsenake
Copy link

Summary

This MR implements a Salesforce Bulk Upload destination as an open source contribution to RudderStack. This enables high-volume bulk synchronization of customer data to Salesforce using the Bulk API 2.0, optimized for RETL (Reverse ETL) warehouse syncs.

Motivation

Fullscript needs to sync large volumes of data (100K-500K+ records) from our data warehouse to Salesforce. The existing real-time Salesforce destination uses REST API for per-event syncs, which is inefficient for bulk operations due to:

  • High API call volume (one call per record)
  • Rate limiting issues at scale
  • Poor throughput for bulk operations
  • Increased API usage costs

What This Implements

Core AsyncDestinationManager Implementation

New package: router/batchrouter/asyncdestinationmanager/salesforce-bulk/

Key Features

Salesforce Bulk API 2.0 integration

  • Job creation, CSV upload, job closing
  • Status polling and result retrieval
  • Handles all job states (Open, InProgress, JobComplete, Failed, Aborted)

OAuth 2.0 authentication

  • Integrates with RudderStack's OAuth v2 service
  • Token caching with expiry tracking
  • Automatic token refresh on 401 errors

CSV generation

  • Extracts transformed Salesforce fields from processor output
  • 100MB file size limit (Bulk API 2.0 max)
  • Overflow handling for large datasets
  • Hash-based job tracking for success/failure matching

Error handling

  • Categorized errors: RefreshToken, RateLimit, BadRequest, ServerError
  • Proper retry vs abort logic
  • Detailed error messages

Dual use case support

  • RETL warehouse syncs (primary): Uses VDM field mapping, extracts object type from context
  • Event streams (secondary): Uses config object type, batch processing

Integration Points

Registered in:

  • asyncdestinationmanager/manager.go - Manager factory routing
  • asyncdestinationmanager/common/utils.go - Added to asyncDestinations list
  • utils/misc/misc.go - Added to BatchDestinations() list

Testing

All tests passing

  • Transform method (payload extraction)
  • Config parsing with validation
  • Object info extraction (VDM and event stream)
  • CSV file creation
  • Hash calculation for tracking
  • Poll status handling (all job states)
  • API error categorization
  • Manager factory

Test approach:

  • Table-driven tests following warehouse testing guidelines
  • Uses require for assertions
  • t.Parallel() for concurrent execution
  • Proper cleanup with t.Cleanup()
  • Manual mocks (no external dependencies)

Transformer Reuse

We reuse the existing Salesforce transformer instead of creating a new one. This is a proven pattern in RudderStack:

// Already exists:
salesforce_oauth  'salesforce' transformer
salesforce_oauth_sandbox  'salesforce' transformer  

// We add:
salesforce_bulk_upload  'salesforce' transformer

Benefits:

  • ✅ VDM v1 support for free (visual field mapping UI)
  • ✅ VDM v2 support for free (with recordTransform.js in companion MR)
  • ✅ Zero transformer code duplication
  • ✅ Consistent UX with regular Salesforce destination
  • ✅ Future transformer improvements automatically benefit bulk upload

What's NOT in This MR

Requires RudderStack Team Collaboration

Control Plane changes (not in this repo):

  1. Backend config destination definition
  2. OAuth Connected App setup (can likely reuse existing Salesforce OAuth)
  3. UI configuration schema with VDM flags:
    • supportsVisualMapper: true
    • supportedSourceTypes: ["warehouse", "cloud"]
    • supportedMessageTypes: {"cloud": ["identify", "track", "record"]}
  4. Destination metadata (icon, description, category)

Companion MR in rudder-transformer: See eng/hercules/rudder-transformer!1

Testing Instructions

Run Unit Tests

cd router/batchrouter/asyncdestinationmanager/salesforce-bulk
go test -v

Expected: All tests pass ✅

Integration Testing

Requires:

  • Salesforce sandbox environment
  • OAuth account configured in Control Plane
  • Backend config with destination definition

Related Documentation

Checklist

  • Code follows AsyncDestinationManager pattern
  • All unit tests passing
  • Follows warehouse testing guidelines
  • OAuth v2 service integration
  • Error handling for all scenarios
  • Proper logging and metrics hooks
  • Documentation (RFC) included
  • Integration tests (requires Control Plane setup)
  • Code review from RudderStack team
  • Control Plane configuration coordinated

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

josh.etsenake added 21 commits October 16, 2025 06:05
…pload destination

- Add DestinationConfig with minimal fields for RETL use case
- Add SalesforceBulkUploader struct implementing AsyncDestinationManager
- Add OAuth service interfaces for token management
- Add Salesforce Bulk API 2.0 service interfaces
- Add job response and request types

Following AsyncDestinationManager pattern similar to Marketo/Eloqua.
VDM field mapping will be handled by existing Salesforce transformer.
- Implement NewManager factory function
- Parse minimal config (rudderAccountId, operation, apiVersion)
- Initialize OAuth v2 client integration
- Initialize auth and API services
- Validate operation type
- Default to v57.0 API and insert operation

Config is minimal for RETL - VDM handles field mapping.
- Add Transform method to extract already-transformed payload from body.JSON
- Implement Upload method with Bulk API 2.0 workflow:
  * Read jobs from file
  * Extract object info from VDM context
  * Create CSV file
  * Create Salesforce job
  * Upload data
  * Close job for processing
- Implement Poll method for job status checking
- Implement GetUploadStats for success/failure tracking
- Add error handling for OAuth, rate limits, bad requests
- Add result matching via hash tracking

Follows Marketo/Eloqua patterns for async destinations.
- Add readJobsFromFile to parse job file
- Add extractObjectInfo to get object type from VDM context.externalId
- Add createCSVFile for generating CSV from transformed payloads
- Add hash calculation for job ID tracking
- Implement 100MB file size limit (Bulk API 2.0)
- Handle overflow jobs when file size exceeded

Utilities follow patterns from Marketo/Eloqua implementations.
- Add GetAccessToken using RudderStack OAuth v2 service
- Implement token caching with expiry tracking
- Extract instanceUrl from OAuth response
- Add 1-minute buffer for token expiry
- Validate OAuth response fields
- Default 2-hour expiry if not provided

Follows Bing Ads pattern for OAuth v2 integration.
- Add CreateJob for Bulk API 2.0 job creation
- Add UploadData for CSV upload via PUT /batches
- Add CloseJob to trigger job processing
- Add GetJobStatus for polling
- Add GetFailedRecords/GetSuccessfulRecords for result retrieval
- Add DeleteJob for cleanup on errors
- Implement makeRequest with OAuth bearer token authentication
- Add error categorization (RefreshToken, RateLimit, BadRequest, ServerError)
- Handle CSV parsing for success/failure results

All methods use Bulk API 2.0 endpoints with proper error handling.
- Add SALESFORCE_BULK_UPLOAD to asyncDestinations list
- Add case in newRegularManager to route to salesforcebulk.NewManager
- Import salesforcebulk package

Destination is now registered and will be routed to BatchRouter.
- Add SALESFORCE_BULK_UPLOAD to BatchDestinations()
- This routes Salesforce Bulk jobs to BatchRouter instead of regular Router
- Enables batching and async upload workflow
- Remove unused logger import in auth_service.go
- Add missing time import in api_service.go
- Add TestSalesforceBulk_Transform for payload extraction
- Add TestSalesforceBulk_parseDestinationConfig with validation tests
- Add TestSalesforceBulk_extractObjectInfo for VDM context parsing
- Add TestSalesforceBulk_createCSVFile for CSV generation
- Add TestSalesforceBulk_calculateHashCode for hash tracking
- Add TestSalesforceBulk_Poll for job status polling
- Add TestSalesforceBulk_handleAPIError for error handling
- Add TestSalesforceBulk_NewManager for factory function

Following warehouse testing guidelines:
- Use require for assertions
- Table-driven tests
- t.Parallel() for concurrent execution
- Proper cleanup with t.Cleanup()
- Test both success and error scenarios
- Add local_mock.go with manual mock implementations
- Remove mockgen directives (use manual mocks like Marketo does)
- Fix test setup to use manual mocks instead of gomock
- Implement MockSalesforceAPIService with function fields
- Implement MockSalesforceAuthService
- Implement MockBackendConfig

Tests now compile without external mockgen dependency.
- Fix Get method signature to use context.Context
- Fix Identity method to return identity.Identifier
- Fix Subscribe method signature with proper types
- Remove duplicate StartWithIDs
- Add missing imports for context, identity, pubsub

Mocks now properly implement backendconfig.BackendConfig interface.
- Use correct identity package: services/controlplane/identity
- Return identity.NOOP{} from mock Identity method

Mocks now properly compile.
- Fix jobID extraction in Poll test to handle nil jobStatus
- Use default job ID when jobStatus is not provided

Tests now pass without nil pointer panics.
Make externalId optional to support both RETL and event streams:
- Add ObjectType to DestinationConfig for event streams
- Update extractObjectInfo to try VDM path first, fallback to config
- Extract VDM parsing into extractFromVDM helper
- Default to Lead object type for event streams
- Default to Email as external ID field

Event streams now work alongside RETL with same CSV generation logic.
Transformer handles field mapping, we just extract and batch.
- Add test for event stream without externalId
- Add test for default object type (Lead)
- Update missing externalId test to expect success with fallback
- All tests now pass for both RETL and event stream use cases
…e quality

Critical fixes:
- Fix non-deterministic map iteration in CSV header generation (sort headers)
- Fix hash matching by using original CSV headers for result comparison
- Add thread-safety with sync.RWMutex for concurrent access protection
- Update default API version from v57.0 to v62.0 (current)

Improvements:
- Add comprehensive tests for calculateHashFromRecord function
- Add integration tests for upload-to-result matching flow
- Clean up unnecessary comments throughout codebase
- Handle user's sf__ fields correctly (not filtering them out)
- Try multiple error column names (sf__Error, Error) for flexibility

All tests passing (10 test functions, 35 test cases)
Reorganize test files following Marketo pattern:
- Split salesforce_bulk_test.go into 3 files (main, auth, utils)
- Created auth_service_test.go for authentication tests
- Created utils_test.go for utility function tests

Added missing test coverage:
- Upload() method with 3 test cases (success, API error, rate limit)
- GetUploadStats() method with 3 test cases
- Auth service tests (token caching, validation, error handling)
- Total: 13 test functions, 60 test cases (vs Marketo's 53)

Test improvements:
- Better than Marketo: Transform(), config parsing, hash matching
- Equal to Marketo: Upload(), Poll(), GetUploadStats()
- Added comprehensive hash matching tests (7 cases)
- Added VDM/RETL object extraction tests (7 cases)

All tests passing with no linter errors.
@etsenake etsenake force-pushed the feat/salesforce-bulk-upload branch from 37a5885 to 623dd9a Compare October 16, 2025 18:27
@etsenake etsenake force-pushed the feat/salesforce-bulk-upload branch from f044865 to 56d2273 Compare October 17, 2025 09:39
@etsenake etsenake force-pushed the feat/salesforce-bulk-upload branch 2 times, most recently from 67efcd1 to 0fed2ba Compare October 19, 2025 12:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant