diff --git a/mc2mc/internal/config/config.go b/mc2mc/internal/config/config.go index 1590d77..5d3300c 100644 --- a/mc2mc/internal/config/config.go +++ b/mc2mc/internal/config/config.go @@ -16,6 +16,7 @@ type ConfigEnv struct { LoadMethod string `env:"LOAD_METHOD" envDefault:"APPEND"` QueryFilePath string `env:"QUERY_FILE_PATH" envDefault:"/data/in/query.sql"` DestinationTableID string `env:"DESTINATION_TABLE_ID"` + CostAttributionTeam string `env:"COST_ATTRIBUTION_TEAM"` DStart string `env:"DSTART"` DEnd string `env:"DEND"` ExecutionProject string `env:"EXECUTION_PROJECT"` diff --git a/mc2mc/internal/query/builder.go b/mc2mc/internal/query/builder.go index 8678a0e..c27b0bb 100644 --- a/mc2mc/internal/query/builder.go +++ b/mc2mc/internal/query/builder.go @@ -22,10 +22,11 @@ type Builder struct { query string - method Method - destinationTableID string - orderedColumns []string - overridedValues map[string]string + method Method + destinationTableID string + costAttributionTeam string + orderedColumns []string + overridedValues map[string]string enableAutoPartition bool enablePartitionValue bool @@ -54,6 +55,10 @@ func (b *Builder) SetOptions(options ...Option) *Builder { return b } +func getCostAttributionComment(teamName string) string { + return fmt.Sprintf("--cost_attribution_team=%s\n", teamName) +} + // Build constructs the final query with the given options func (b *Builder) Build() (string, error) { if b.query == "" { @@ -64,9 +69,12 @@ func (b *Builder) Build() (string, error) { // split query components hrs, vars, queries := SplitQueryComponents(b.query) if len(queries) <= 1 { + if b.costAttributionTeam != "" { + return fmt.Sprintf("%s\n%s", b.query, getCostAttributionComment(b.costAttributionTeam)), nil + } return b.query, nil } - query := b.constructMergeQuery(hrs, vars, queries) + query := b.constructMergeQuery(hrs, vars, queries, b.costAttributionTeam) return query, nil } @@ -143,7 +151,11 @@ func (b *Builder) Build() (string, error) { if varsAndUDFs != "" { varsAndUDFs += "\n" } + query = fmt.Sprintf("%s%s%s%s", hr, drops, varsAndUDFs, query) + if b.costAttributionTeam != "" { + query = fmt.Sprintf("%s\n%s\n", query, getCostAttributionComment(b.costAttributionTeam)) + } return query, nil } @@ -188,7 +200,7 @@ func (b *Builder) constructOverridedValues(query string) (string, error) { } // constructMergeQueries constructs merge queries with headers and variables -func (b *Builder) constructMergeQuery(hrs, vars, queries []string) string { +func (b *Builder) constructMergeQuery(hrs, vars, queries []string, costAttributionTeam string) string { builder := strings.Builder{} for i, q := range queries { q = strings.TrimSpace(q) @@ -204,6 +216,9 @@ func (b *Builder) constructMergeQuery(hrs, vars, queries []string) string { builder.WriteString(fmt.Sprintf("%s\n", variables)) } builder.WriteString(fmt.Sprintf("%s\n;", q)) + if costAttributionTeam != "" { + builder.WriteString(fmt.Sprintf("\n%s\n", getCostAttributionComment(costAttributionTeam))) + } if i < len(queries)-1 { builder.WriteString(fmt.Sprintf("\n%s\n", BREAK_MARKER)) } diff --git a/mc2mc/internal/query/option.go b/mc2mc/internal/query/option.go index 2dca360..3c03d4c 100644 --- a/mc2mc/internal/query/option.go +++ b/mc2mc/internal/query/option.go @@ -46,3 +46,9 @@ func WithOverridedValue(field, value string) Option { b.overridedValues[field] = value } } + +func WithCostAttributionLabel(teamName string) Option { + return func(b *Builder) { + b.costAttributionTeam = teamName + } +} diff --git a/mc2mc/mc2mc.go b/mc2mc/mc2mc.go index 98ebb69..ca9cf78 100644 --- a/mc2mc/mc2mc.go +++ b/mc2mc/mc2mc.go @@ -82,6 +82,7 @@ func mc2mc(envs []string) error { query.WithOverridedValue("_partitiondate", fmt.Sprintf("DATE(timestamp('%s'))", dstart)), query.WithAutoPartition(cfg.DevEnableAutoPartition == "true"), query.WithPartitionValue(cfg.DevEnablePartitionValue == "true"), + query.WithCostAttributionLabel(cfg.CostAttributionTeam), query.WithColumnOrder(), ).Build() if err != nil { @@ -97,6 +98,7 @@ func mc2mc(envs []string) error { query.WithDestination(cfg.DestinationTableID), query.WithAutoPartition(cfg.DevEnableAutoPartition == "true"), query.WithPartitionValue(cfg.DevEnablePartitionValue == "true"), + query.WithCostAttributionLabel(cfg.CostAttributionTeam), query.WithColumnOrder(), ) @@ -151,6 +153,7 @@ func mc2mc(envs []string) error { l, client.NewODPSClient(l, cfg.GenOdps()), query.WithQuery(string(raw)), + query.WithCostAttributionLabel(cfg.CostAttributionTeam), query.WithMethod(query.MERGE), ).Build() if err != nil {