diff --git a/experimental/air/cmd/list.go b/experimental/air/cmd/list.go index 360fc97e67..8c716f5556 100644 --- a/experimental/air/cmd/list.go +++ b/experimental/air/cmd/list.go @@ -57,13 +57,11 @@ type listedRun struct { taskRunID int64 } -// listQuery holds the resolved inputs to listAirRuns. +// listQuery holds the resolved inputs to a runFetcher. type listQuery struct { activeOnly bool - allUsers bool userFilter string filters listFilters - limit int fetchMLflow bool } @@ -113,34 +111,47 @@ func newListCommand() *cobra.Command { userFilter = me.UserName } - rows, err := listAirRuns(ctx, w, listQuery{ + fetcher := newRunFetcher(ctx, w, listQuery{ activeOnly: active, - allUsers: allUsers, userFilter: userFilter, filters: f, - limit: limit, fetchMLflow: root.OutputType(cmd) == flags.OutputText, }) - if err != nil { - return err - } - // JSON keeps the air envelope; text renders the table (interactive and - // navigable in a terminal, printed once when piped). + // JSON prints the newest `limit` runs once. Text renders the table: + // navigable in a terminal (paging in older runs on demand), printed once + // when piped. if root.OutputType(cmd) == flags.OutputJSON { + rows, err := fetcher.next(limit) + if err != nil { + return err + } + warnIfTruncated(ctx, fetcher) return renderEnvelope(ctx, listData{Rows: rows}) } - return renderListText(cmd, rows) + return renderListText(cmd, fetcher, limit) } return cmd } -// listAirRuns pages through Jobs runs/list, keeps the AIR runs that match the -// user and filters, and stops once it has enough matches or has scanned -// maxListScan runs. Detail comes straight from runs/list (expand_tasks), so no -// per-run calls are needed. -func listAirRuns(ctx context.Context, w *databricks.WorkspaceClient, q listQuery) ([]listRow, error) { +// runFetcher pages Jobs runs/list on demand, yielding AIR runs that match the +// user and filters. It buffers a page's leftover runs so successive next() calls +// resume where the last stopped — driving both one-shot output and lazy paging. +type runFetcher struct { + ctx context.Context + w *databricks.WorkspaceClient + query map[string]any + userFilter string + filters listFilters + fetchMLflow bool + + pending []jobRun // runs from the last page not yet inspected + scanned int + exhausted bool +} + +func newRunFetcher(ctx context.Context, w *databricks.WorkspaceClient, q listQuery) *runFetcher { query := map[string]any{ "run_type": "SUBMIT_RUN", "expand_tasks": true, @@ -149,51 +160,55 @@ func listAirRuns(ctx context.Context, w *databricks.WorkspaceClient, q listQuery if q.activeOnly { query["active_only"] = true } + return &runFetcher{ + ctx: ctx, + w: w, + query: query, + userFilter: q.userFilter, + filters: q.filters, + fetchMLflow: q.fetchMLflow, + } +} +// next returns up to want more matching rows, paging runs/list (and buffering the +// leftover runs of a page) until it has enough, the server has no more pages, or +// it has scanned maxListScan runs. MLflow links are filled in for text output. +func (f *runFetcher) next(want int) ([]listRow, error) { var entries []listedRun - scanned := 0 - done := false - for !done && scanned < maxListScan { - resp, err := fetchJobRunsPage(ctx, w, query) - if err != nil { - return nil, err - } - - for i := range resp.Runs { - run := &resp.Runs[i] - scanned++ - - if !isAirRun(run) { - continue - } - if q.userFilter != "" && run.CreatorUserName != q.userFilter { - continue - } - if !q.filters.matches(run) { - continue - } - - entries = append(entries, listedRun{row: buildListRow(run), taskRunID: taskRunID(run)}) - if len(entries) >= q.limit { - done = true + for len(entries) < want { + if len(f.pending) == 0 { + if f.exhausted || f.scanned >= maxListScan { break } + if err := f.fetchPage(); err != nil { + return nil, err + } + continue } - if done || resp.NextPageToken == "" { - break + run := &f.pending[0] + f.pending = f.pending[1:] + f.scanned++ + + if !isAirRun(run) { + continue } - query["page_token"] = resp.NextPageToken + if f.userFilter != "" && run.CreatorUserName != f.userFilter { + continue + } + if !f.filters.matches(run) { + continue + } + entries = append(entries, listedRun{row: buildListRow(run), taskRunID: taskRunID(run)}) } - - if !done && scanned >= maxListScan { - log.Warnf(ctx, "air list: stopped after scanning %d runs; results may be incomplete", maxListScan) + if f.scanned >= maxListScan { + f.exhausted = true } // MLflow links appear only in the text table, so the per-run get-output // lookups are skipped for JSON output (which omits the column anyway). - if q.fetchMLflow { - setMLflowLinks(ctx, w, entries) + if f.fetchMLflow { + setMLflowLinks(f.ctx, f.w, entries) } rows := make([]listRow, len(entries)) @@ -203,6 +218,30 @@ func listAirRuns(ctx context.Context, w *databricks.WorkspaceClient, q listQuery return rows, nil } +// fetchPage loads the next runs/list page into the pending buffer, marking the +// fetcher exhausted once the server reports no further pages. +func (f *runFetcher) fetchPage() error { + resp, err := fetchJobRunsPage(f.ctx, f.w, f.query) + if err != nil { + return err + } + f.pending = resp.Runs + if resp.NextPageToken == "" { + f.exhausted = true + } else { + f.query["page_token"] = resp.NextPageToken + } + return nil +} + +// warnIfTruncated logs when the scan hit maxListScan, so one-shot output signals +// its results may be incomplete. +func warnIfTruncated(ctx context.Context, f *runFetcher) { + if f.scanned >= maxListScan { + log.Warnf(ctx, "air list: stopped after scanning %d runs; results may be incomplete", maxListScan) + } +} + // setMLflowLinks fills in each row's MLflow link in parallel, best-effort: a row // whose IDs can't be resolved keeps its "-" placeholder. func setMLflowLinks(ctx context.Context, w *databricks.WorkspaceClient, entries []listedRun) { diff --git a/experimental/air/cmd/list_format.go b/experimental/air/cmd/list_format.go index c728ca3194..5dcb21bfc6 100644 --- a/experimental/air/cmd/list_format.go +++ b/experimental/air/cmd/list_format.go @@ -5,9 +5,8 @@ import ( "time" ) -// buildListRow extracts the columns shown for one run. Optional text columns fall -// back to "-" so the table stays aligned. MLflow links aren't carried by -// runs/list, so the column shows "-". +// buildListRow extracts the columns shown for one run. Optional cells fall back +// to "-"; MLflowURL starts as "-" and setMLflowLinks fills it in for text output. func buildListRow(run *jobRun) listRow { experiment := "-" if e := jobExperiment(run); e != "" { diff --git a/experimental/air/cmd/list_test.go b/experimental/air/cmd/list_test.go index a885893765..11185528a2 100644 --- a/experimental/air/cmd/list_test.go +++ b/experimental/air/cmd/list_test.go @@ -66,10 +66,9 @@ func TestListAirRunsFiltersUserAndType(t *testing.T) { } srv := runsServer(t, runsListBody(t, "", runs...)) - rows, err := listAirRuns(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{ + rows, err := newRunFetcher(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{ userFilter: "me@example.com", - limit: 10, - }) + }).next(10) require.NoError(t, err) require.Len(t, rows, 2) assert.Equal(t, "1", rows[0].RunID) @@ -83,10 +82,9 @@ func TestListAirRunsExperimentFilter(t *testing.T) { } srv := runsServer(t, runsListBody(t, "", runs...)) - rows, err := listAirRuns(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{ - limit: 10, + rows, err := newRunFetcher(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{ filters: listFilters{Experiment: "qwen*"}, - }) + }).next(10) require.NoError(t, err) require.Len(t, rows, 1) assert.Equal(t, "1", rows[0].RunID) @@ -100,7 +98,7 @@ func TestListAirRunsLimitTruncates(t *testing.T) { } srv := runsServer(t, runsListBody(t, "", runs...)) - rows, err := listAirRuns(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{limit: 2}) + rows, err := newRunFetcher(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{}).next(2) require.NoError(t, err) require.Len(t, rows, 2) assert.Equal(t, "1", rows[0].RunID) @@ -112,13 +110,42 @@ func TestListAirRunsPaginates(t *testing.T) { page2 := runsListBody(t, "", airJobRun(2, "me@example.com", "GPU_1xH100", 1, "exp-b")) srv := runsServer(t, page1, page2) - rows, err := listAirRuns(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{limit: 10}) + rows, err := newRunFetcher(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{}).next(10) require.NoError(t, err) require.Len(t, rows, 2) assert.Equal(t, "1", rows[0].RunID) assert.Equal(t, "2", rows[1].RunID) } +// TestRunFetcherResumesAcrossCalls covers the lazy paging the interactive table +// relies on: a next() that stops mid-page must buffer the rest and hand it back on +// the following call, then report exhaustion — without re-fetching. +func TestRunFetcherResumesAcrossCalls(t *testing.T) { + runs := []jobRun{ + airJobRun(1, "me@example.com", "GPU_1xH100", 1, "exp-a"), + airJobRun(2, "me@example.com", "GPU_1xH100", 1, "exp-b"), + airJobRun(3, "me@example.com", "GPU_1xH100", 1, "exp-c"), + } + srv := runsServer(t, runsListBody(t, "", runs...)) + f := newRunFetcher(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{}) + + first, err := f.next(2) + require.NoError(t, err) + require.Len(t, first, 2) + assert.Equal(t, "1", first[0].RunID) + assert.Equal(t, "2", first[1].RunID) + + second, err := f.next(2) + require.NoError(t, err) + require.Len(t, second, 1) // only the buffered leftover remains + assert.Equal(t, "3", second[0].RunID) + assert.True(t, f.exhausted) + + third, err := f.next(2) + require.NoError(t, err) + assert.Empty(t, third) +} + // TestFetchJobRunsParsesAiRuntimeTask pins the raw parse against the real // runs/get shape, since the typed SDK omits ai_runtime_task. func TestFetchJobRunsParsesAiRuntimeTask(t *testing.T) { diff --git a/experimental/air/cmd/list_tui.go b/experimental/air/cmd/list_tui.go index 369205f599..5a2b1e128e 100644 --- a/experimental/air/cmd/list_tui.go +++ b/experimental/air/cmd/list_tui.go @@ -13,9 +13,10 @@ import ( "github.com/spf13/cobra" ) -// renderListText renders the table for text output: an inline navigable table -// in a terminal, otherwise printed once. JSON is handled by the caller. -func renderListText(cmd *cobra.Command, rows []listRow) error { +// renderListText renders the table for text output: an inline navigable table in +// a terminal (paging in older runs on demand), otherwise printed once. JSON is +// handled by the caller. +func renderListText(cmd *cobra.Command, f *runFetcher, limit int) error { ctx := cmd.Context() out := cmd.OutOrStdout() @@ -25,17 +26,24 @@ func renderListText(cmd *cobra.Command, rows []listRow) error { r.SetColorProfile(termenv.Ascii) } - // Navigate only with a full color TTY, at least one row, and no explicit - // --limit (which means "just print these N"). Everything else — piped, - // NO_COLOR, --limit, empty — prints once. - interactive := len(rows) > 0 && - color && + // Navigate only with a full color TTY and no explicit --limit (which means + // "just print these N"). Everything else — piped, NO_COLOR, --limit — prints + // once. + interactive := color && cmdio.IsPagerSupported(ctx) && !cmd.Flags().Changed("limit") if interactive { - _, err := tea.NewProgram( - newListModel(r, rows, color), + first, err := f.next(listPageRows) + if err != nil { + return err + } + if len(first) == 0 { + _, err := io.WriteString(out, "No runs found.\n") + return err + } + _, err = tea.NewProgram( + newListModel(r, f, first, color), tea.WithContext(ctx), tea.WithInput(cmd.InOrStdin()), tea.WithOutput(out), @@ -43,7 +51,12 @@ func renderListText(cmd *cobra.Command, rows []listRow) error { return err } - _, err := io.WriteString(out, staticListTable(r, rows, color)) + rows, err := f.next(limit) + if err != nil { + return err + } + warnIfTruncated(ctx, f) + _, err = io.WriteString(out, staticListTable(r, rows, color)) return err } @@ -65,29 +78,64 @@ func staticListTable(r *lipgloss.Renderer, rows []listRow, links bool) string { return b.String() } -// listModel is the inline, navigable runs table. +// listModel is the inline, navigable runs table. It lazily pages older runs from +// the fetcher as the cursor nears the end of the loaded rows. fetcher is nil for +// a fixed, non-paging table (e.g. in tests). type listModel struct { - rows []listRow - styles listStyles - cols listCols - links bool + rows []listRow + styles listStyles + cols listCols + links bool + fetcher *runFetcher + loading bool + loadErr error cursor int offset int // index of the first visible row height int // terminal height, for windowing } -func newListModel(r *lipgloss.Renderer, rows []listRow, links bool) listModel { +func newListModel(r *lipgloss.Renderer, f *runFetcher, rows []listRow, links bool) listModel { return listModel{ - rows: rows, - styles: newListStyles(r), - cols: computeListCols(rows), - links: links, + rows: rows, + styles: newListStyles(r), + cols: computeListCols(rows), + links: links, + fetcher: f, } } func (m listModel) Init() tea.Cmd { return nil } +// moreRowsMsg carries a lazily fetched batch of rows, or the error that ended paging. +type moreRowsMsg struct { + rows []listRow + err error +} + +// fetchCmd pulls the next batch of rows in the background; guarded by loading so +// only one runs at a time. +func (m *listModel) fetchCmd() tea.Cmd { + m.loading = true + f := m.fetcher + return func() tea.Msg { + rows, err := f.next(listPageRows) + return moreRowsMsg{rows: rows, err: err} + } +} + +// maybeFetch starts a fetch when the cursor nears the end of the loaded rows and +// more runs may still exist. +func (m *listModel) maybeFetch() tea.Cmd { + if m.fetcher == nil || m.loading || m.loadErr != nil || m.fetcher.exhausted { + return nil + } + if m.cursor < len(m.rows)-m.visibleCount() { + return nil + } + return m.fetchCmd() +} + // listPageRows is the most rows shown per page. const listPageRows = 20 @@ -106,6 +154,22 @@ func (m listModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { case tea.WindowSizeMsg: m.height = msg.Height m.offset = m.clampedOffset() + return m, m.maybeFetch() + + case moreRowsMsg: + m.loading = false + if msg.err != nil { + m.loadErr = msg.err + return m, nil + } + m.rows = append(m.rows, msg.rows...) + m.cols = computeListCols(m.rows) + m.offset = m.clampedOffset() + // A page with no matches but more to scan: keep paging so the cursor isn't + // stuck at the end of the loaded rows. + if len(msg.rows) == 0 && !m.fetcher.exhausted { + return m, m.fetchCmd() + } return m, nil case tea.KeyMsg: @@ -130,13 +194,12 @@ func (m listModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.cursor = len(m.rows) - 1 case "enter": // Open the selected run's MLflow page in the browser. - if len(m.rows) > 0 { - if url := m.rows[m.cursor].MLflowURL; url != "" && url != "-" { - return m, openURL(url) - } + if url := m.rows[m.cursor].MLflowURL; url != "" && url != "-" { + return m, openURL(url) } } m.offset = m.clampedOffset() + return m, m.maybeFetch() } return m, nil } @@ -165,13 +228,16 @@ func (m listModel) View() string { return strings.Join(lines, "\n") + "\n" } -// renderHint is the faint one-line key legend, with a scroll position when the -// list is windowed. +// renderHint is the faint one-line key legend, with the cursor position and the +// paging state (loading / load failed). func (m listModel) renderHint() string { faint := m.styles.r.NewStyle().Foreground(colN7) - hint := "↑/↓ navigate · ←/→ page · ↵ mlflow · q quit" - if m.visibleCount() < len(m.rows) { - hint += fmt.Sprintf(" · row %d/%d", m.cursor+1, len(m.rows)) + hint := fmt.Sprintf("↑/↓ navigate · ←/→ page · ↵ mlflow · q quit · row %d/%d", m.cursor+1, len(m.rows)) + switch { + case m.loadErr != nil: + hint += " (load failed)" + case m.loading: + hint += " (loading…)" } return faint.Render(hint) } diff --git a/experimental/air/cmd/list_tui_test.go b/experimental/air/cmd/list_tui_test.go index b5b7ca2cbc..c5b1cd00f0 100644 --- a/experimental/air/cmd/list_tui_test.go +++ b/experimental/air/cmd/list_tui_test.go @@ -25,7 +25,7 @@ func testListRows() []listRow { func testListModel() listModel { r := lipgloss.NewRenderer(io.Discard) r.SetColorProfile(termenv.Ascii) - return newListModel(r, testListRows(), false) + return newListModel(r, nil, testListRows(), false) } func key(t *testing.T, m listModel, s string) listModel { @@ -69,7 +69,7 @@ func TestListModelPageCap(t *testing.T) { } r := lipgloss.NewRenderer(io.Discard) r.SetColorProfile(termenv.Ascii) - m := newListModel(r, rows, false) + m := newListModel(r, nil, rows, false) // A tall terminal still shows at most listPageRows per page. next, _ := m.Update(tea.WindowSizeMsg{Width: 80, Height: 100}) @@ -83,7 +83,7 @@ func TestListModelPaging(t *testing.T) { } r := lipgloss.NewRenderer(io.Discard) r.SetColorProfile(termenv.Ascii) - m := newListModel(r, rows, false) + m := newListModel(r, nil, rows, false) // Height 7 leaves a 4-row window (header + hint reserved). next, _ := m.Update(tea.WindowSizeMsg{Width: 80, Height: 7}) diff --git a/experimental/air/cmd/mlflow.go b/experimental/air/cmd/mlflow.go index 10609e97d7..02a7956d7b 100644 --- a/experimental/air/cmd/mlflow.go +++ b/experimental/air/cmd/mlflow.go @@ -12,8 +12,8 @@ import ( "github.com/databricks/databricks-sdk-go/service/jobs" ) -// getRunOutputResponse is the slice of the jobs runs/get-output response we care -// about. The MLflow identifiers live under ai_runtime_task_output (current) or +// getRunOutputResponse is the part of the jobs runs/get-output response we use. +// The MLflow identifiers live under ai_runtime_task_output (current) or // gen_ai_compute_output.run_info (legacy), neither modeled by the typed SDK, so // we call the endpoint directly and parse just these fields. type getRunOutputResponse struct {