Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions internal/impl/salesforce/processor_salesforce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package salesforce provides a Benthos salesforceProcessor that integrates with the Salesforce APIs
// to fetch data based on input messages. It allows querying Salesforce resources
// such as .... TODO

package salesforce

import (
"context"
"errors"
"net/http"
"net/url"

"github.com/redpanda-data/connect/v4/internal/impl/salesforce/salesforcehttp"

"github.com/redpanda-data/benthos/v4/public/service"
)

// salesforceProcessor is the Benthos salesforceProcessor implementation for Salesforce queries.
// It holds the client state and orchestrates calls into the salesforcehttp package.
type salesforceProcessor struct {
log *service.Logger
client *salesforcehttp.Client
}

// SObjectList is the response from all the available sObjects
type SObjectList struct {
Encoding string `json:"encoding"`
MaxBatchSize int `json:"maxBatchSize"`
Sobjects []SObject `json:"sobjects"`
}

// SObject is the minimal representation of an sObject
type SObject struct {
Name string `json:"name"`
}

// DescribeResult sObject result
type DescribeResult struct {
Fields []struct {
Name string `json:"name"`
} `json:"fields"`
}

// QueryResult of the salesforce search query
type QueryResult struct {
TotalSize int `json:"totalSize"`
Done bool `json:"done"`
}

func init() {
if err := service.RegisterProcessor(
"salesforce", newSalesforceProcessorConfigSpec(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
return newSalesforceProcessor(conf, mgr)
},
); err != nil {
panic(err)
}
}

// newSalesforceProcessorConfigSpec creates a new Configuration specification for the Salesforce processor
func newSalesforceProcessorConfigSpec() *service.ConfigSpec {
return service.NewConfigSpec().
Summary("Fetches data from Salesforce based on input messages").
Description(`This salesforceProcessor takes input messages containing Salesforce queries and returns Salesforce data.

Supports the following Salesforce resources:
- todo

Configuration examples:

` + "```configYAML" + `
# Minimal configuration
pipeline:
processors:
- salesforce:
org_url: "https://your-domain.salesforce.com"
client_id: "${SALESFORCE_CLIENT_ID}"
client_secret: "${SALESFORCE_CLIENT_SECRET}"

# Full configuration
pipeline:
processors:
- salesforce:
org_url: "https://your-domain.salesforce.com"
client_id: "${SALESFORCE_CLIENT_ID}"
client_secret: "${SALESFORCE_CLIENT_SECRET}"
restapi_version: "v64.0"
request_timeout: "30s"
max_retries: 50
` + "```").
Field(service.NewStringField("org_url").
Description("Salesforce instance base URL (e.g., https://your-domain.salesforce.com)")).
Field(service.NewStringField("client_id").
Description("Client ID for the Salesforce Connected App")).
Field(service.NewStringField("client_secret").
Description("Client Secret for the Salesforce Connected App").
Secret()).
Field(service.NewStringField("restapi_version").
Description("Salesforce REST API version to use (example: v64.0). Default: v65.0").
Default("v65.0")).
Field(service.NewDurationField("request_timeout").
Description("HTTP request timeout").
Default("30s")).
Field(service.NewIntField("max_retries").
Description("Maximum number of retries in case of 429 HTTP Status Code").
Default(10))
}

func newSalesforceProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*salesforceProcessor, error) {
orgURL, err := conf.FieldString("org_url")
if err != nil {
return nil, err
}

if _, err := url.ParseRequestURI(orgURL); err != nil {
return nil, errors.New("org_url is not a valid URL")
}

clientID, err := conf.FieldString("client_id")
if err != nil {
return nil, err
}

clientSecret, err := conf.FieldString("client_secret")
if err != nil {
return nil, err
}

apiVersion, err := conf.FieldString("restapi_version")
if err != nil {
return nil, err
}

timeout, err := conf.FieldDuration("request_timeout")
if err != nil {
return nil, err
}

maxRetries, err := conf.FieldInt("max_retries")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably validate this for negative retries.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err != nil {
return nil, err
}
if maxRetries < 0 {
return nil, errors.New("max_retries must not be negative")
}

httpClient := &http.Client{Timeout: timeout}

salesforceHttp, err := salesforcehttp.NewClient(orgURL, clientID, clientSecret, apiVersion, maxRetries, httpClient, mgr.Logger(), mgr.Metrics())
if err != nil {
return nil, err
}

return &salesforceProcessor{
client: salesforceHttp,
log: mgr.Logger(),
}, nil
}

func (s *salesforceProcessor) Process(ctx context.Context, _ *service.Message) (service.MessageBatch, error) {
var batch service.MessageBatch

res, err := s.client.GetAvailableResources(ctx)
if err != nil {
return nil, err
}

m := service.NewMessage(res)
batch = append(batch, m)

return batch, nil
}

func (*salesforceProcessor) Close(context.Context) error { return nil }
149 changes: 149 additions & 0 deletions internal/impl/salesforce/processor_salesforce_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package salesforce

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/benthos/v4/public/service"
)

func TestSalesforceProcessorConfigValidation(t *testing.T) {
t.Parallel()

tests := []struct {
name string
configYAML string
wantErrSub string
}{
{
name: "missing org_url",
configYAML: `
client_id: "abc"
client_secret: "xyz"
`,
wantErrSub: "org_url",
},
{
name: "invalid org_url",
configYAML: `
org_url: "not a url"
client_id: "abc"
client_secret: "xyz"
`,
wantErrSub: "org_url",
},
{
name: "missing client_id",
configYAML: `
org_url: "https://example.com"
client_secret: "xyz"
`,
wantErrSub: "client_id",
},
{
name: "missing client_secret",
configYAML: `
org_url: "https://example.com"
client_id: "abc"
`,
wantErrSub: "client_secret",
},
{
name: "invalid restapi_version type",
configYAML: `
org_url: "https://example.com"
client_id: "abc"
client_secret: "xyz"
restapi_version: 123
`,
wantErrSub: "restapi_version",
},
{
name: "invalid request_timeout",
configYAML: `
org_url: "https://example.com"
client_id: "abc"
client_secret: "xyz"
request_timeout: "not-a-duration"
`,
wantErrSub: "request_timeout",
},
{
name: "invalid max_retries",
configYAML: `
org_url: "https://example.com"
client_id: "abc"
client_secret: "xyz"
max_retries: "not-an-int"
`,
wantErrSub: "max_retries",
},
{
name: "valid minimal config",
configYAML: `
org_url: "https://example.com"
client_id: "abc"
client_secret: "xyz"
`,
wantErrSub: "",
},
{
name: "valid full config",
configYAML: `
org_url: "https://example.com"
client_id: "abc"
client_secret: "xyz"
restapi_version: "v64.0"
request_timeout: "10s"
max_retries: 5
`,
wantErrSub: "",
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
env := service.NewEnvironment()
spec := newSalesforceProcessorConfigSpec()

conf, err := spec.ParseYAML(tc.configYAML, env)

var proc service.Processor
var procErr error
if err == nil {
proc, procErr = newSalesforceProcessor(conf, conf.Resources())
}

if tc.wantErrSub == "" {
require.NoError(t, err, "expected config to be valid")
require.NoError(t, procErr, "expected processor to initialize")
assert.NotNil(t, proc)
} else {
// Either config parsing OR processor creation must fail
if err != nil {
require.Contains(t, err.Error(), tc.wantErrSub)
}
if procErr != nil {
require.Contains(t, procErr.Error(), tc.wantErrSub)
}
}
})
}
}
Loading