From 61b8b0627fd755d7843766dbd6a89d16c59b2fdc Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Tue, 9 Dec 2025 14:43:28 +0700 Subject: [PATCH 1/2] feat: add priority set capability --- mc2mc/internal/client/client.go | 1 + mc2mc/internal/client/odps.go | 23 ++++++++++++++++++++++- mc2mc/internal/client/setup.go | 11 +++++++++++ mc2mc/internal/config/config.go | 1 + mc2mc/mc2mc.go | 1 + 5 files changed, 36 insertions(+), 1 deletion(-) diff --git a/mc2mc/internal/client/client.go b/mc2mc/internal/client/client.go index d14ae1e..ff02a2d 100644 --- a/mc2mc/internal/client/client.go +++ b/mc2mc/internal/client/client.go @@ -19,6 +19,7 @@ type OdpsClient interface { SetLogViewRetentionInDays(days int) SetDryRun(dryRun bool) SetRetry(max int, backoffMs int) + SetPriority(priority int) } type Client struct { diff --git a/mc2mc/internal/client/odps.go b/mc2mc/internal/client/odps.go index 788ee00..019d9e8 100644 --- a/mc2mc/internal/client/odps.go +++ b/mc2mc/internal/client/odps.go @@ -10,6 +10,7 @@ import ( "time" "github.com/aliyun/aliyun-odps-go-sdk/odps" + "github.com/aliyun/aliyun-odps-go-sdk/odps/options" "github.com/pkg/errors" ) @@ -17,6 +18,7 @@ type odpsClient struct { logger *slog.Logger client *odps.Odps + priority int logViewRetentionInDays int isDryRun bool retry func(f func() error) error @@ -45,7 +47,7 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string, additionalHints hints := addHints(additionalHints, query) - taskIns, err := c.client.ExecSQlWithHints(query, hints) + taskIns, err := c.execSQLWithHintsAndPriority(query, hints) if err != nil { return errors.WithStack(err) } @@ -99,6 +101,11 @@ func (c *odpsClient) SetRetry(max int, backoffMs int) { } } +// SetPriority sets the priority for the odps client +func (c *odpsClient) SetPriority(priority int) { + c.priority = priority +} + // GetPartitionNames returns the partition names of the given table // by querying the table schema. func (c *odpsClient) GetPartitionNames(_ context.Context, tableID string) ([]string, error) { @@ -129,6 +136,20 @@ func (c *odpsClient) GetOrderedColumns(tableID string) ([]string, error) { return columnNames, nil } +// execSQLWithHintsAndPriority executes the given query with hints and priority +// ref: https://github.com/aliyun/aliyun-odps-go-sdk/blob/4d1188c6ac989acc9cacc9b3e2ed0f3901a3b3ef/odps/odps.go#L131 +func (c *odpsClient) execSQLWithHintsAndPriority(query string, hints map[string]string) (*odps.Instance, error) { + if c.client.DefaultProjectName() == "" { + err := errors.New("default project is not set") + return nil, errors.WithStack(err) + } + option := options.NewSQLTaskOptions() + option.Hints = hints + option.InstanceOption.Priority = c.priority // add priority to instance option + taskIns, err := c.client.ExecSQlWithOption(query, option) + return taskIns, errors.WithStack(err) +} + // generateLogView generates the log view for the given task instance func (c *odpsClient) generateLogView(taskIns *odps.Instance) (string, error) { u, err := c.client.LogView().GenerateLogView(taskIns, c.logViewRetentionInDays*24) diff --git a/mc2mc/internal/client/setup.go b/mc2mc/internal/client/setup.go index 8657f50..1530053 100644 --- a/mc2mc/internal/client/setup.go +++ b/mc2mc/internal/client/setup.go @@ -87,3 +87,14 @@ func SetupRetry(max int, backoffMs int) SetupFn { return nil } } + +func SetupPriority(priority int) SetupFn { + return func(c *Client) error { + if priority < 0 || priority > 9 { + err := errors.New("priority must be between 0 and 9") + return errors.WithStack(err) + } + c.OdpsClient.SetPriority(priority) + return nil + } +} diff --git a/mc2mc/internal/config/config.go b/mc2mc/internal/config/config.go index 1590d77..aff55f9 100644 --- a/mc2mc/internal/config/config.go +++ b/mc2mc/internal/config/config.go @@ -26,6 +26,7 @@ type ConfigEnv struct { DryRun bool `env:"DRY_RUN" envDefault:"false"` RetryMax int `env:"RETRY_MAX" envDefault:"3"` RetryBackoffMs int `env:"RETRY_BACKOFF_MS" envDefault:"1000"` + Priority int `env:"PRIORITY" envDefault:"9"` // TODO: delete this DevEnablePartitionValue string `env:"DEV__ENABLE_PARTITION_VALUE" envDefault:"false"` DevEnableAutoPartition string `env:"DEV__ENABLE_AUTO_PARTITION" envDefault:"false"` diff --git a/mc2mc/mc2mc.go b/mc2mc/mc2mc.go index 98ebb69..546efad 100644 --- a/mc2mc/mc2mc.go +++ b/mc2mc/mc2mc.go @@ -47,6 +47,7 @@ func mc2mc(envs []string) error { client.SetUpLogViewRetentionInDays(cfg.LogViewRetentionInDays), client.SetupDryRun(cfg.DryRun), client.SetupRetry(cfg.RetryMax, cfg.RetryBackoffMs), + client.SetupPriority(cfg.Priority), ) if err != nil { return errors.WithStack(err) From fcb46c02fe774759390156e2de9e1b51aa0b608e Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Tue, 9 Dec 2025 16:58:58 +0700 Subject: [PATCH 2/2] fix: uninitialized instance option --- mc2mc/internal/client/odps.go | 1 + 1 file changed, 1 insertion(+) diff --git a/mc2mc/internal/client/odps.go b/mc2mc/internal/client/odps.go index 019d9e8..9bc2953 100644 --- a/mc2mc/internal/client/odps.go +++ b/mc2mc/internal/client/odps.go @@ -145,6 +145,7 @@ func (c *odpsClient) execSQLWithHintsAndPriority(query string, hints map[string] } option := options.NewSQLTaskOptions() option.Hints = hints + option.InstanceOption = options.NewCreateInstanceOptions() option.InstanceOption.Priority = c.priority // add priority to instance option taskIns, err := c.client.ExecSQlWithOption(query, option) return taskIns, errors.WithStack(err)