Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 33 additions & 119 deletions apis/fluentd/v1alpha1/plugins/common/buffer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,19 @@ func (b *Buffer) Name() string {

func (b *Buffer) Params(_ plugins.SecretLoader) (*params.PluginStore, error) {
ps := params.NewPluginStore(b.Name())
if b.Id != nil {
ps.InsertPairs("@id", fmt.Sprint(*b.Id))
}
params.InsertPairs(ps, "@id", b.Id)
if b.Type != nil {
ps.InsertType(fmt.Sprint(*b.Type))
}
if b.LogLevel != nil {
ps.InsertPairs("@log_level", fmt.Sprint(*b.LogLevel))
}
params.InsertPairs(ps, "@log_level", b.LogLevel)

if b.FileBuffer != nil && b.PathSuffix != nil {
ps.InsertPairs("path_suffix", *b.PathSuffix)
}

if b.FileSingleBuffer != nil {
if b.CalcNumRecords != nil {
ps.InsertPairs("calc_num_records", *b.CalcNumRecords)
}
if b.ChunkFormat != nil {
ps.InsertPairs("chunk_format", *b.ChunkFormat)
}
params.InsertPairs(ps, "calc_num_records", b.CalcNumRecords)
params.InsertPairs(ps, "chunk_format", b.ChunkFormat)
}

if b.Path != nil {
Expand All @@ -193,123 +185,45 @@ func (b *Buffer) Params(_ plugins.SecretLoader) (*params.PluginStore, error) {
}
}

if b.TimeKey != nil {
ps.InsertPairs("timekey", fmt.Sprint(*b.TimeKey))
}

if b.TimeKeyWait != nil {
ps.InsertPairs("timekey_wait", fmt.Sprint(*b.TimeKeyWait))
}
params.InsertPairs(ps, "timekey", b.TimeKey)
params.InsertPairs(ps, "timekey_wait", b.TimeKeyWait)

ps.InsertPairs("tag", b.Tag)

if b.ChunkLimitSize != nil {
ps.InsertPairs("chunk_limit_size", *b.ChunkLimitSize)
}

if b.ChunkLimitRecords != nil {
ps.InsertPairs("chunk_limit_records", *b.ChunkLimitRecords)
}

if b.TotalLimitSize != nil {
ps.InsertPairs("total_limit_size", *b.TotalLimitSize)
}

if b.QueueLimitLength != nil {
ps.InsertPairs("queue_limit_length", *b.QueueLimitLength)
}

if b.QueuedChunksLimitSize != nil {
ps.InsertPairs("queued_chunks_limit_size", fmt.Sprint(*b.QueuedChunksLimitSize))
}

if b.Compress != nil {
ps.InsertPairs("compress", fmt.Sprint(*b.Compress))
}
params.InsertPairs(ps, "chunk_limit_size", b.ChunkLimitSize)
params.InsertPairs(ps, "chunk_limit_records", b.ChunkLimitRecords)
params.InsertPairs(ps, "total_limit_size", b.TotalLimitSize)
params.InsertPairs(ps, "queue_limit_length", b.QueueLimitLength)
params.InsertPairs(ps, "queued_chunks_limit_size", b.QueuedChunksLimitSize)
params.InsertPairs(ps, "compress", b.Compress)

if b.FlushAtShutdown != nil && *b.FlushAtShutdown {
ps.InsertPairs("flush_at_shutdown", fmt.Sprint(*b.FlushAtShutdown))
}

if b.FlushMode != nil {
ps.InsertPairs("flush_mode", fmt.Sprint(*b.FlushMode))
}

if b.FlushInterval != nil {
ps.InsertPairs("flush_interval", fmt.Sprint(*b.FlushInterval))
}

if b.FlushThreadCount != nil {
ps.InsertPairs("flush_thread_count", fmt.Sprint(*b.FlushThreadCount))
}

if b.DelayedCommitTimeout != nil {
ps.InsertPairs("delayed_commit_timeout", fmt.Sprint(*b.DelayedCommitTimeout))
}

if b.OverflowAction != nil {
ps.InsertPairs("overflow_action", fmt.Sprint(*b.OverflowAction))
}

if b.RetryTimeout != nil {
ps.InsertPairs("retry_timeout", fmt.Sprint(*b.RetryTimeout))
}

if b.RetrySecondaryThreshold != nil {
ps.InsertPairs("retry_secondary_threshold", fmt.Sprint(*b.RetrySecondaryThreshold))
}

if b.RetryType != nil {
ps.InsertPairs("retry_type", fmt.Sprint(*b.RetryType))
}

if b.RetryMaxTimes != nil {
ps.InsertPairs("retry_max_times", fmt.Sprint(*b.RetryMaxTimes))
}

if b.RetryForever != nil {
ps.InsertPairs("retry_forever", fmt.Sprint(*b.RetryForever))
}

if b.RetryWait != nil {
ps.InsertPairs("retry_wait", fmt.Sprint(*b.RetryWait))
}

if b.RetryExponentialBackoffBase != nil {
ps.InsertPairs("retry_exponential_backoff_base", fmt.Sprint(*b.RetryExponentialBackoffBase))
}

if b.RetryMaxInterval != nil {
ps.InsertPairs("retry_max_interval", fmt.Sprint(*b.RetryMaxInterval))
}

if b.RetryRandomize != nil {
ps.InsertPairs("retry_randomize", fmt.Sprint(*b.RetryRandomize))
}

if b.DisableChunkBackup != nil {
ps.InsertPairs("disable_chunk_backup", fmt.Sprint(*b.DisableChunkBackup))
}
params.InsertPairs(ps, "flush_mode", b.FlushMode)
params.InsertPairs(ps, "flush_interval", b.FlushInterval)
params.InsertPairs(ps, "flush_thread_count", b.FlushThreadCount)
params.InsertPairs(ps, "delayed_commit_timeout", b.DelayedCommitTimeout)
params.InsertPairs(ps, "overflow_action", b.OverflowAction)
params.InsertPairs(ps, "retry_timeout", b.RetryTimeout)
params.InsertPairs(ps, "retry_secondary_threshold", b.RetrySecondaryThreshold)
params.InsertPairs(ps, "retry_type", b.RetryType)
params.InsertPairs(ps, "retry_max_times", b.RetryMaxTimes)
params.InsertPairs(ps, "retry_forever", b.RetryForever)
params.InsertPairs(ps, "retry_wait", b.RetryWait)
params.InsertPairs(ps, "retry_exponential_backoff_base", b.RetryExponentialBackoffBase)
params.InsertPairs(ps, "retry_max_interval", b.RetryMaxInterval)
params.InsertPairs(ps, "retry_randomize", b.RetryRandomize)
params.InsertPairs(ps, "disable_chunk_backup", b.DisableChunkBackup)

if b.Time != nil {
if b.Time.TimeType != nil {
ps.InsertPairs("time_type", fmt.Sprint(*b.Time.TimeType))
}
if b.Time.TimeFormat != nil {
ps.InsertPairs("time_format", fmt.Sprint(*b.Time.TimeFormat))
}
if b.Time.Localtime != nil {
ps.InsertPairs("localtime", fmt.Sprint(*b.Time.Localtime))
}
if b.Time.UTC != nil {
ps.InsertPairs("utc", fmt.Sprint(*b.Time.UTC))
}
if b.Time.Timezone != nil {
ps.InsertPairs("timezone", fmt.Sprint(*b.Time.Timezone))
}
if b.Time.TimeFormatFallbacks != nil {
ps.InsertPairs("time_format_fallbacks", fmt.Sprint(*b.Time.TimeFormatFallbacks))
}
params.InsertPairs(ps, "time_type", b.Time.TimeType)
params.InsertPairs(ps, "time_format", b.Time.TimeFormat)
params.InsertPairs(ps, "localtime", b.Time.Localtime)
params.InsertPairs(ps, "utc", b.Time.UTC)
params.InsertPairs(ps, "timezone", b.Time.Timezone)
params.InsertPairs(ps, "time_format_fallbacks", b.Time.TimeFormatFallbacks)
}

return ps, nil
Expand Down
16 changes: 10 additions & 6 deletions apis/fluentd/v1alpha1/plugins/filter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,30 @@ func (f *Filter) Params(loader plugins.SecretLoader) (*params.PluginStore, error

}

func (f *Filter) grepPlugin(parent *params.PluginStore, loader plugins.SecretLoader) *params.PluginStore {
childs := make([]*params.PluginStore, 0)
if len(f.Grep.Regexps) > 0 {
for _, r := range f.Grep.Regexps {
func appendChild(childs []*params.PluginStore, loader plugins.SecretLoader, grep *Grep) []*params.PluginStore {
if len(grep.Regexps) > 0 {
for _, r := range grep.Regexps {
if r != nil && r.Key != nil && r.Pattern != nil {
child, _ := r.Params(loader)
childs = append(childs, child)
}
}
}

if len(f.Grep.Excludes) > 0 {
for _, e := range f.Grep.Excludes {
if len(grep.Excludes) > 0 {
for _, e := range grep.Excludes {
if e != nil && e.Key != nil && e.Pattern != nil {
child, _ := e.Params(loader)
childs = append(childs, child)
}
}
}
return childs
}

func (f *Filter) grepPlugin(parent *params.PluginStore, loader plugins.SecretLoader) *params.PluginStore {
childs := make([]*params.PluginStore, 0)
childs = appendChild(childs, loader, f.Grep)
if len(f.Grep.Ands) > 0 {
for _, e := range f.Grep.Ands {
if e != nil && (e.Regexp != nil || e.Exclude != nil) {
Expand Down
Loading
Loading