diff --git a/.changeset/cursor-pagination-loadsubset.md b/.changeset/cursor-pagination-loadsubset.md new file mode 100644 index 000000000..cba93c7ab --- /dev/null +++ b/.changeset/cursor-pagination-loadsubset.md @@ -0,0 +1,38 @@ +--- +'@tanstack/db': patch +'@tanstack/electric-db-collection': patch +'@tanstack/query-db-collection': patch +--- + +Enhanced LoadSubsetOptions with separate cursor expressions and offset for flexible pagination. + +**⚠️ Breaking Change for Custom Sync Layers / Query Collections:** + +`LoadSubsetOptions.where` no longer includes cursor expressions for pagination. If you have a custom sync layer or query collection that implements `loadSubset`, you must now handle pagination separately: + +- **Cursor-based pagination:** Use the new `cursor` property (`cursor.whereFrom` and `cursor.whereCurrent`) and combine them with `where` yourself +- **Offset-based pagination:** Use the new `offset` property + +Previously, cursor expressions were baked into the `where` clause. Now they are passed separately so sync layers can choose their preferred pagination strategy. + +**Changes:** + +- Added `CursorExpressions` type with `whereFrom`, `whereCurrent`, and optional `lastKey` properties +- Added `cursor` to `LoadSubsetOptions` for cursor-based pagination (separate from `where`) +- Added `offset` to `LoadSubsetOptions` for offset-based pagination support +- Electric sync layer now makes two parallel `requestSnapshot` calls when cursor is present: + - One for `whereCurrent` (all ties at boundary, no limit) + - One for `whereFrom` (rows after cursor, with limit) +- Query collection serialization now includes `offset` for query key generation +- Added `truncate` event to collections, emitted when synced data is truncated (e.g., after `must-refetch`) +- Fixed `setWindow` pagination: cursor expressions are now correctly built when paging through results +- Fixed offset tracking: `loadNextItems` now passes the correct window offset to prevent incorrect deduplication +- `CollectionSubscriber` now listens for `truncate` events to reset cursor tracking state + +**Benefits:** + +- Sync layers can choose between cursor-based or offset-based pagination strategies +- Electric can efficiently handle tie-breaking with two targeted requests +- Better separation of concerns between filtering (`where`) and pagination (`cursor`/`offset`) +- `setWindow` correctly triggers backend loading for subsequent pages in multi-column orderBy queries +- Cursor state is properly reset after truncation, preventing stale cursor data from being used diff --git a/docs/collections/query-collection.md b/docs/collections/query-collection.md index 67331e126..cf13291a8 100644 --- a/docs/collections/query-collection.md +++ b/docs/collections/query-collection.md @@ -521,7 +521,7 @@ All direct write methods are available on `collection.utils`: ## QueryFn and Predicate Push-Down -When using `syncMode: 'on-demand'`, the collection automatically pushes down query predicates (where clauses, orderBy, and limit) to your `queryFn`. This allows you to fetch only the data needed for each specific query, rather than fetching the entire dataset. +When using `syncMode: 'on-demand'`, the collection automatically pushes down query predicates (where clauses, orderBy, limit, and offset) to your `queryFn`. This allows you to fetch only the data needed for each specific query, rather than fetching the entire dataset. ### How LoadSubsetOptions Are Passed @@ -530,9 +530,13 @@ LoadSubsetOptions are passed to your `queryFn` via the query context's `meta` pr ```typescript queryFn: async (ctx) => { // Extract LoadSubsetOptions from the context - const { limit, where, orderBy } = ctx.meta.loadSubsetOptions + const { limit, offset, where, orderBy } = ctx.meta.loadSubsetOptions // Use these to fetch only the data you need + // - where: filter expression (AST) + // - orderBy: sort expression (AST) + // - limit: maximum number of rows + // - offset: number of rows to skip (for pagination) // ... } ``` @@ -572,7 +576,7 @@ const productsCollection = createCollection( syncMode: 'on-demand', // Enable predicate push-down queryFn: async (ctx) => { - const { limit, where, orderBy } = ctx.meta.loadSubsetOptions + const { limit, offset, where, orderBy } = ctx.meta.loadSubsetOptions // Parse the expressions into simple format const parsed = parseLoadSubsetOptions({ where, orderBy, limit }) @@ -605,6 +609,11 @@ const productsCollection = createCollection( params.set('limit', String(parsed.limit)) } + // Add offset for pagination + if (offset) { + params.set('offset', String(offset)) + } + const response = await fetch(`/api/products?${params}`) return response.json() }, @@ -629,6 +638,7 @@ const affordableElectronics = createLiveQueryCollection({ // This triggers a queryFn call with: // GET /api/products?category=electronics&price_lt=100&sort=price:asc&limit=10 +// When paginating, offset is included: &offset=20 ``` ### Custom Handlers for Complex APIs @@ -731,10 +741,11 @@ queryFn: async (ctx) => { Convenience function that parses all LoadSubsetOptions at once. Good for simple use cases. ```typescript -const { filters, sorts, limit } = parseLoadSubsetOptions(ctx.meta?.loadSubsetOptions) +const { filters, sorts, limit, offset } = parseLoadSubsetOptions(ctx.meta?.loadSubsetOptions) // filters: [{ field: ['category'], operator: 'eq', value: 'electronics' }] // sorts: [{ field: ['price'], direction: 'asc', nulls: 'last' }] // limit: 10 +// offset: 20 (for pagination) ``` #### `parseWhereExpression(expr, options)` diff --git a/packages/db/src/collection/events.ts b/packages/db/src/collection/events.ts index 19c28d5ea..f82a42f2c 100644 --- a/packages/db/src/collection/events.ts +++ b/packages/db/src/collection/events.ts @@ -43,10 +43,19 @@ export interface CollectionLoadingSubsetChangeEvent { loadingSubsetTransition: `start` | `end` } +/** + * Event emitted when the collection is truncated (all data cleared) + */ +export interface CollectionTruncateEvent { + type: `truncate` + collection: Collection +} + export type AllCollectionEvents = { 'status:change': CollectionStatusChangeEvent 'subscribers:change': CollectionSubscribersChangeEvent 'loadingSubset:change': CollectionLoadingSubsetChangeEvent + truncate: CollectionTruncateEvent } & { [K in CollectionStatus as `status:${K}`]: CollectionStatusEvent } @@ -56,6 +65,7 @@ export type CollectionEvent = | CollectionStatusChangeEvent | CollectionSubscribersChangeEvent | CollectionLoadingSubsetChangeEvent + | CollectionTruncateEvent export type CollectionEventHandler = ( event: AllCollectionEvents[T], diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index ce112024f..ecd8d70bf 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -365,6 +365,7 @@ export class CollectionImpl< lifecycle: this._lifecycle, changes: this._changes, indexes: this._indexes, + events: this._events, }) this._sync.setDeps({ collection: this, // Required for passing to config.sync callback diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index f183aa84e..b76580c19 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -11,6 +11,7 @@ import type { CollectionImpl } from './index.js' import type { CollectionLifecycleManager } from './lifecycle' import type { CollectionChangesManager } from './changes' import type { CollectionIndexesManager } from './indexes' +import type { CollectionEventsManager } from './events' interface PendingSyncedTransaction< T extends object = Record, @@ -37,6 +38,7 @@ export class CollectionStateManager< public lifecycle!: CollectionLifecycleManager public changes!: CollectionChangesManager public indexes!: CollectionIndexesManager + private _events!: CollectionEventsManager // Core state - make public for testing public transactions: SortedMap> @@ -79,11 +81,13 @@ export class CollectionStateManager< lifecycle: CollectionLifecycleManager changes: CollectionChangesManager indexes: CollectionIndexesManager + events: CollectionEventsManager }) { this.collection = deps.collection this.lifecycle = deps.lifecycle this.changes = deps.changes this.indexes = deps.indexes + this._events = deps.events } /** @@ -525,6 +529,12 @@ export class CollectionStateManager< for (const key of changedKeys) { currentVisibleState.delete(key) } + + // 4) Emit truncate event so subscriptions can reset their cursor tracking state + this._events.emit(`truncate`, { + type: `truncate`, + collection: this.collection, + }) } for (const operation of transaction.operations) { diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 045eb5042..44981d460 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -34,6 +34,8 @@ type RequestLimitedSnapshotOptions = { limit: number /** All column values for cursor (first value used for local index, all values for sync layer) */ minValues?: Array + /** Row offset for offset-based pagination (passed to sync layer) */ + offset?: number } type CollectionSubscriptionOptions = { @@ -63,6 +65,12 @@ export class CollectionSubscription // Keep track of the keys we've sent (needed for join and orderBy optimizations) private sentKeys = new Set() + // Track the count of rows sent via requestLimitedSnapshot for offset-based pagination + private limitedSnapshotRowCount = 0 + + // Track the last key sent via requestLimitedSnapshot for cursor-based pagination + private lastSentKey: string | number | undefined + private filteredCallback: (changes: Array>) => void private orderByIndex: IndexInterface | undefined @@ -258,6 +266,7 @@ export class CollectionSubscription orderBy, limit, minValues, + offset, }: RequestLimitedSnapshotOptions) { if (!limit) throw new Error(`limit is required`) @@ -354,77 +363,75 @@ export class CollectionSubscription keys = index.take(valuesNeeded(), biggestObservedValue, filterFn) } + // Track row count for offset-based pagination (before sending to callback) + // Use the current count as the offset for this load + const currentOffset = this.limitedSnapshotRowCount + this.callback(changes) - // Build the WHERE filter for sync layer loadSubset - // buildCursor handles both single-column and multi-column cases - let whereWithValueFilter = where + // Update the row count and last key after sending (for next call's offset/cursor) + this.limitedSnapshotRowCount += changes.length + if (changes.length > 0) { + this.lastSentKey = changes[changes.length - 1]!.key + } + + // Build cursor expressions for sync layer loadSubset + // The cursor expressions are separate from the main where clause + // so the sync layer can choose cursor-based or offset-based pagination + let cursorExpressions: + | { + whereFrom: BasicExpression + whereCurrent: BasicExpression + lastKey?: string | number + } + | undefined + if (minValues !== undefined && minValues.length > 0) { - const cursor = buildCursor(orderBy, minValues) - if (cursor) { - whereWithValueFilter = where ? and(where, cursor) : cursor + const whereFromCursor = buildCursor(orderBy, minValues) + + if (whereFromCursor) { + const { expression } = orderBy[0]! + const minValue = minValues[0] + + // Build the whereCurrent expression for the first orderBy column + // For Date values, we need to handle precision differences between JS (ms) and backends (μs) + // A JS Date represents a 1ms range, so we query for all values within that range + let whereCurrentCursor: BasicExpression + if (minValue instanceof Date) { + const minValuePlus1ms = new Date(minValue.getTime() + 1) + whereCurrentCursor = and( + gte(expression, new Value(minValue)), + lt(expression, new Value(minValuePlus1ms)), + ) + } else { + whereCurrentCursor = eq(expression, new Value(minValue)) + } + + cursorExpressions = { + whereFrom: whereFromCursor, + whereCurrent: whereCurrentCursor, + lastKey: this.lastSentKey, + } } } // Request the sync layer to load more data // don't await it, we will load the data into the collection when it comes in - const loadOptions1: LoadSubsetOptions = { - where: whereWithValueFilter, + // Note: `where` does NOT include cursor expressions - they are passed separately + // The sync layer can choose to use cursor-based or offset-based pagination + const loadOptions: LoadSubsetOptions = { + where, // Main filter only, no cursor limit, orderBy, + cursor: cursorExpressions, // Cursor expressions passed separately + offset: offset ?? currentOffset, // Use provided offset, or auto-tracked offset subscription: this, } - const syncResult = this.collection._sync.loadSubset(loadOptions1) + const syncResult = this.collection._sync.loadSubset(loadOptions) // Track this loadSubset call - this.loadedSubsets.push(loadOptions1) - - // Make parallel loadSubset calls for values equal to minValue and values greater than minValue - const promises: Array> = [] - - // First promise: load all values equal to minValue - if (typeof minValue !== `undefined`) { - const { expression } = orderBy[0]! - - // For Date values, we need to handle precision differences between JS (ms) and backends (μs) - // A JS Date represents a 1ms range, so we query for all values within that range - let exactValueFilter - if (minValue instanceof Date) { - const minValuePlus1ms = new Date(minValue.getTime() + 1) - exactValueFilter = and( - gte(expression, new Value(minValue)), - lt(expression, new Value(minValuePlus1ms)), - ) - } else { - exactValueFilter = eq(expression, new Value(minValue)) - } - - const loadOptions2: LoadSubsetOptions = { - where: exactValueFilter, - subscription: this, - } - const equalValueResult = this.collection._sync.loadSubset(loadOptions2) - - // Track this loadSubset call - this.loadedSubsets.push(loadOptions2) - - if (equalValueResult instanceof Promise) { - promises.push(equalValueResult) - } - } - - // Second promise: load values greater than minValue - if (syncResult instanceof Promise) { - promises.push(syncResult) - } - - // Track the combined promise - if (promises.length > 0) { - const combinedPromise = Promise.all(promises).then(() => {}) - this.trackLoadSubsetPromise(combinedPromise) - } else { - this.trackLoadSubsetPromise(syncResult) - } + this.loadedSubsets.push(loadOptions) + this.trackLoadSubsetPromise(syncResult) } // TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function diff --git a/packages/db/src/query/index.ts b/packages/db/src/query/index.ts index 979e694a8..e798b283a 100644 --- a/packages/db/src/query/index.ts +++ b/packages/db/src/query/index.ts @@ -65,6 +65,7 @@ export { minusWherePredicates, isOrderBySubset, isLimitSubset, + isOffsetLimitSubset, isPredicateSubset, } from './predicate-utils.js' diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 8c7a01d0f..f368562cf 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -190,6 +190,17 @@ export class CollectionSubscriber< whereExpression, }) + // Listen for truncate events to reset cursor tracking state + // This ensures that after a must-refetch/truncate, we don't use stale cursor data + const truncateUnsubscribe = this.collection.on(`truncate`, () => { + this.biggest = undefined + }) + + // Clean up truncate listener when subscription is unsubscribed + subscription.on(`unsubscribed`, () => { + truncateUnsubscribe() + }) + // Normalize the orderBy clauses such that the references are relative to the collection const normalizedOrderBy = normalizeOrderByPaths(orderBy, this.alias) @@ -285,7 +296,7 @@ export class CollectionSubscriber< if (!orderByInfo) { return } - const { orderBy, valueExtractorForRawRow } = orderByInfo + const { orderBy, valueExtractorForRawRow, offset } = orderByInfo const biggestSentRow = this.biggest // Extract all orderBy column values from the biggest sent row @@ -306,10 +317,12 @@ export class CollectionSubscriber< const normalizedOrderBy = normalizeOrderByPaths(orderBy, this.alias) // Take the `n` items after the biggest sent value + // Pass the current window offset to ensure proper deduplication subscription.requestLimitedSnapshot({ orderBy: normalizedOrderBy, limit: n, minValues, + offset, }) } @@ -338,10 +351,13 @@ export class CollectionSubscriber< comparator: (a: any, b: any) => number, ) { for (const change of changes) { - if (!this.biggest) { - this.biggest = change.value - } else if (comparator(this.biggest, change.value) < 0) { - this.biggest = change.value + // Only track inserts/updates for cursor positioning, not deletes + if (change.type !== `delete`) { + if (!this.biggest) { + this.biggest = change.value + } else if (comparator(this.biggest, change.value) < 0) { + this.biggest = change.value + } } yield change diff --git a/packages/db/src/query/predicate-utils.ts b/packages/db/src/query/predicate-utils.ts index 672078bfd..96162e868 100644 --- a/packages/db/src/query/predicate-utils.ts +++ b/packages/db/src/query/predicate-utils.ts @@ -756,6 +756,9 @@ export function isOrderBySubset( * Check if one limit is a subset of another. * Returns true if the subset limit requirements are satisfied by the superset limit. * + * Note: This function does NOT consider offset. For offset-aware subset checking, + * use `isOffsetLimitSubset` instead. + * * @example * isLimitSubset(10, 20) // true (requesting 10 items when 20 are available) * isLimitSubset(20, 10) // false (requesting 20 items when only 10 are available) @@ -785,7 +788,57 @@ export function isLimitSubset( } /** - * Check if one predicate (where + orderBy + limit) is a subset of another. + * Check if one offset+limit range is a subset of another. + * Returns true if the subset range is fully contained within the superset range. + * + * A query with `{limit: 10, offset: 0}` loads rows [0, 10). + * A query with `{limit: 10, offset: 20}` loads rows [20, 30). + * + * For subset to be satisfied by superset: + * - Superset must start at or before subset (superset.offset <= subset.offset) + * - Superset must end at or after subset (superset.offset + superset.limit >= subset.offset + subset.limit) + * + * @example + * isOffsetLimitSubset({ offset: 0, limit: 5 }, { offset: 0, limit: 10 }) // true + * isOffsetLimitSubset({ offset: 5, limit: 5 }, { offset: 0, limit: 10 }) // true (rows 5-9 within 0-9) + * isOffsetLimitSubset({ offset: 5, limit: 10 }, { offset: 0, limit: 10 }) // false (rows 5-14 exceed 0-9) + * isOffsetLimitSubset({ offset: 20, limit: 10 }, { offset: 0, limit: 10 }) // false (rows 20-29 outside 0-9) + * + * @param subset - The offset+limit requirements to check + * @param superset - The offset+limit that might satisfy the requirements + * @returns true if subset range is fully contained within superset range + */ +export function isOffsetLimitSubset( + subset: { offset?: number; limit?: number }, + superset: { offset?: number; limit?: number }, +): boolean { + const subsetOffset = subset.offset ?? 0 + const supersetOffset = superset.offset ?? 0 + + // Superset must start at or before subset + if (supersetOffset > subsetOffset) { + return false + } + + // If superset is unlimited, it covers everything from its offset onwards + if (superset.limit === undefined) { + return true + } + + // If subset is unlimited but superset has a limit, subset can't be satisfied + if (subset.limit === undefined) { + return false + } + + // Both have limits - check if subset range is within superset range + const subsetEnd = subsetOffset + subset.limit + const supersetEnd = supersetOffset + superset.limit + + return subsetEnd <= supersetEnd +} + +/** + * Check if one predicate (where + orderBy + limit + offset) is a subset of another. * Returns true if all aspects of the subset predicate are satisfied by the superset. * * @example @@ -813,9 +866,9 @@ export function isPredicateSubset( // The top 10 items matching 'search%' might include items outside the overall top 10. // // However, if the where clauses are equal, then the subset relationship can - // be determined by orderBy and limit alone: - // Example: superset = {where: status='active', limit: 10, orderBy: desc} - // subset = {where: status='active', limit: 5, orderBy: desc} + // be determined by orderBy, limit, and offset: + // Example: superset = {where: status='active', limit: 10, offset: 0, orderBy: desc} + // subset = {where: status='active', limit: 5, offset: 0, orderBy: desc} // The top 5 active items ARE contained in the top 10 active items. if (superset.limit !== undefined) { // For limited supersets, where clauses must be equal @@ -824,15 +877,17 @@ export function isPredicateSubset( } return ( isOrderBySubset(subset.orderBy, superset.orderBy) && - isLimitSubset(subset.limit, superset.limit) + isOffsetLimitSubset(subset, superset) ) } // For unlimited supersets, use the normal subset logic + // Still need to consider offset - an unlimited query with offset only covers + // rows from that offset onwards return ( isWhereSubset(subset.where, superset.where) && isOrderBySubset(subset.orderBy, superset.orderBy) && - isLimitSubset(subset.limit, superset.limit) + isOffsetLimitSubset(subset, superset) ) } diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index b6380e7e7..22f0fd75b 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -253,13 +253,52 @@ export interface Subscription extends EventEmitter { readonly status: SubscriptionStatus } +/** + * Cursor expressions for pagination, passed separately from the main `where` clause. + * The sync layer can choose to use cursor-based pagination (combining these with the where) + * or offset-based pagination (ignoring these and using the `offset` parameter). + * + * Neither expression includes the main `where` clause - they are cursor-specific only. + */ +export type CursorExpressions = { + /** + * Expression for rows greater than (after) the cursor value. + * For multi-column orderBy, this is a composite cursor using OR of conditions. + * Example for [col1 ASC, col2 DESC] with values [v1, v2]: + * or(gt(col1, v1), and(eq(col1, v1), lt(col2, v2))) + */ + whereFrom: BasicExpression + /** + * Expression for rows equal to the current cursor value (first orderBy column only). + * Used to handle tie-breaking/duplicates at the boundary. + * Example: eq(col1, v1) or for Dates: and(gte(col1, v1), lt(col1, v1+1ms)) + */ + whereCurrent: BasicExpression + /** + * The key of the last item that was loaded. + * Can be used by sync layers for tracking or deduplication. + */ + lastKey?: string | number +} + export type LoadSubsetOptions = { - /** The where expression to filter the data */ + /** The where expression to filter the data (does NOT include cursor expressions) */ where?: BasicExpression /** The order by clause to sort the data */ orderBy?: OrderBy /** The limit of the data to load */ limit?: number + /** + * Cursor expressions for cursor-based pagination. + * These are separate from `where` - the sync layer should combine them if using cursor-based pagination. + * Neither expression includes the main `where` clause. + */ + cursor?: CursorExpressions + /** + * Row offset for offset-based pagination. + * The sync layer can use this instead of `cursor` if it prefers offset-based pagination. + */ + offset?: number /** * The subscription that triggered the load. * Advanced sync implementations can use this for: diff --git a/packages/db/tests/query/order-by.test.ts b/packages/db/tests/query/order-by.test.ts index b9d28f84d..d5025c1e9 100644 --- a/packages/db/tests/query/order-by.test.ts +++ b/packages/db/tests/query/order-by.test.ts @@ -2568,6 +2568,7 @@ describe(`OrderBy with duplicate values`, () => { it(`should correctly advance window when there are duplicate values loaded from sync layer`, async () => { // Create test data that reproduces the specific bug described: // Items with many duplicates at value 5, then normal progression + // Note: loadSubset now receives cursor expressions (whereFrom/whereCurrent) separately from where const allTestData: Array = [ { id: 1, a: 1, keep: true }, { id: 2, a: 2, keep: true }, @@ -2590,6 +2591,7 @@ describe(`OrderBy with duplicate values`, () => { // Start with only the first 5 items in the local collection const initialData = allTestData.slice(0, 5) let loadSubsetCallCount = 0 + const loadSubsetCursors: Array = [] const duplicateCollection = createCollection( mockSyncCollectionOptions({ @@ -2614,6 +2616,7 @@ describe(`OrderBy with duplicate values`, () => { return { loadSubset: (options) => { loadSubsetCallCount++ + loadSubsetCursors.push(options.cursor) // Simulate async loading from remote source return new Promise((resolve) => { @@ -2640,11 +2643,57 @@ describe(`OrderBy with duplicate values`, () => { } } - // Return a slice from 0 to limit + // Apply cursor expressions if present (cursor-based pagination) + // For proper cursor-based pagination: + // - whereCurrent should load ALL ties (no limit) + // - whereFrom should load with remaining limit + if (options.cursor) { + const { whereFrom, whereCurrent } = options.cursor + const { limit } = options + try { + // Get ALL rows matching whereCurrent (no limit for ties) + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = + filteredData.filter(whereCurrentFn) + + // Get rows matching whereFrom with limit (for next page data) + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filteredData.filter(whereFromFn) + const limitedFromData = limit + ? fromData.slice(0, limit) + : fromData + + // Combine: current rows + from rows (deduplicated) + const seenIds = new Set() + filteredData = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + for (const item of limitedFromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + // Re-sort after combining + filteredData.sort((a, b) => a.a - b.a) + } catch (error) { + console.log(`Error applying cursor:`, error) + } + } + + // Apply limit for initial page load (no cursor). + // When cursor is present, limit was already applied in the cursor block above. const { limit } = options - const dataToLoad = limit - ? filteredData.slice(0, limit) - : filteredData + const dataToLoad = + limit && !options.cursor + ? filteredData.slice(0, limit) + : filteredData dataToLoad.forEach((item) => { write({ @@ -2692,8 +2741,10 @@ describe(`OrderBy with duplicate values`, () => { { id: 5, a: 5, keep: true }, ]) expect(loadSubsetCallCount).toBe(1) + // First loadSubset call (initial page at offset 0) has no cursor + expect(loadSubsetCursors[0]).toBeUndefined() - // Now move to next page (offset 5, limit 5) - this should trigger loadSubset + // Now move to next page (offset 5, limit 5) - this should trigger loadSubset with a cursor const moveToSecondPage = collection.utils.setWindow({ offset: 5, limit: 5, @@ -2710,8 +2761,12 @@ describe(`OrderBy with duplicate values`, () => { { id: 9, a: 5, keep: true }, { id: 10, a: 5, keep: true }, ]) - // we expect 2 new loadSubset calls (1 for data equal to max value and one for data greater than max value) - expect(loadSubsetCallCount).toBe(3) + // we expect 1 new loadSubset call (cursor expressions for whereFrom/whereCurrent are now combined in single call) + expect(loadSubsetCallCount).toBe(2) + // Second loadSubset call (pagination) has a cursor with whereFrom and whereCurrent + expect(loadSubsetCursors[1]).toBeDefined() + expect(loadSubsetCursors[1]).toHaveProperty(`whereFrom`) + expect(loadSubsetCursors[1]).toHaveProperty(`whereCurrent`) // Now move to third page (offset 10, limit 5) // It should advance past the duplicate 5s @@ -2739,12 +2794,13 @@ describe(`OrderBy with duplicate values`, () => { // We expect no more loadSubset calls because when we loaded the previous page // we asked for all data equal to max value and LIMIT values greater than max value // and the LIMIT values greater than max value already loaded the next page - expect(loadSubsetCallCount).toBe(3) + expect(loadSubsetCallCount).toBe(2) }) it(`should correctly advance window when there are duplicate values loaded from both local collection and sync layer`, async () => { // Create test data that reproduces the specific bug described: // Items with many duplicates at value 5, then normal progression + // Note: loadSubset now receives cursor expressions (whereFrom/whereCurrent) separately from where const allTestData: Array = [ { id: 1, a: 1, keep: true }, { id: 2, a: 2, keep: true }, @@ -2764,9 +2820,10 @@ describe(`OrderBy with duplicate values`, () => { { id: 16, a: 16, keep: true }, ] - // Start with only the first 5 items in the local collection + // Start with the first 10 items in the local collection (includes all duplicates) const initialData = allTestData.slice(0, 10) let loadSubsetCallCount = 0 + const loadSubsetCursors: Array = [] const duplicateCollection = createCollection( mockSyncCollectionOptions({ @@ -2791,6 +2848,7 @@ describe(`OrderBy with duplicate values`, () => { return { loadSubset: (options) => { loadSubsetCallCount++ + loadSubsetCursors.push(options.cursor) // Simulate async loading from remote source return new Promise((resolve) => { @@ -2817,11 +2875,57 @@ describe(`OrderBy with duplicate values`, () => { } } - // Return a slice from 0 to limit + // Apply cursor expressions if present (cursor-based pagination) + // For proper cursor-based pagination: + // - whereCurrent should load ALL ties (no limit) + // - whereFrom should load with remaining limit + if (options.cursor) { + const { whereFrom, whereCurrent } = options.cursor + const { limit } = options + try { + // Get ALL rows matching whereCurrent (no limit for ties) + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = + filteredData.filter(whereCurrentFn) + + // Get rows matching whereFrom with limit (for next page data) + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filteredData.filter(whereFromFn) + const limitedFromData = limit + ? fromData.slice(0, limit) + : fromData + + // Combine: current rows + from rows (deduplicated) + const seenIds = new Set() + filteredData = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + for (const item of limitedFromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + // Re-sort after combining + filteredData.sort((a, b) => a.a - b.a) + } catch (error) { + console.log(`Error applying cursor:`, error) + } + } + + // Apply limit for initial page load (no cursor). + // When cursor is present, limit was already applied in the cursor block above. const { limit } = options - const dataToLoad = limit - ? filteredData.slice(0, limit) - : filteredData + const dataToLoad = + limit && !options.cursor + ? filteredData.slice(0, limit) + : filteredData dataToLoad.forEach((item) => { write({ @@ -2869,8 +2973,10 @@ describe(`OrderBy with duplicate values`, () => { { id: 5, a: 5, keep: true }, ]) expect(loadSubsetCallCount).toBe(1) + // First loadSubset call (initial page at offset 0) has no cursor + expect(loadSubsetCursors[0]).toBeUndefined() - // Now move to next page (offset 5, limit 5) - this should trigger loadSubset + // Now move to next page (offset 5, limit 5) - this should trigger loadSubset with a cursor const moveToSecondPage = collection.utils.setWindow({ offset: 5, limit: 5, @@ -2887,8 +2993,12 @@ describe(`OrderBy with duplicate values`, () => { { id: 9, a: 5, keep: true }, { id: 10, a: 5, keep: true }, ]) - // we expect 2 new loadSubset calls (1 for data equal to max value and one for data greater than max value) - expect(loadSubsetCallCount).toBe(3) + // we expect 1 new loadSubset call (cursor expressions for whereFrom/whereCurrent are now combined in single call) + expect(loadSubsetCallCount).toBe(2) + // Second loadSubset call (pagination) has a cursor with whereFrom and whereCurrent + expect(loadSubsetCursors[1]).toBeDefined() + expect(loadSubsetCursors[1]).toHaveProperty(`whereFrom`) + expect(loadSubsetCursors[1]).toHaveProperty(`whereCurrent`) // Now move to third page (offset 10, limit 5) // It should advance past the duplicate 5s @@ -2916,7 +3026,7 @@ describe(`OrderBy with duplicate values`, () => { // We expect no more loadSubset calls because when we loaded the previous page // we asked for all data equal to max value and LIMIT values greater than max value // and the LIMIT values greater than max value already loaded the next page - expect(loadSubsetCallCount).toBe(3) + expect(loadSubsetCallCount).toBe(2) }) }) } @@ -2960,8 +3070,9 @@ describe(`OrderBy with Date values and precision differences`, () => { const initialData = testData.slice(0, 5) - // Track the WHERE clauses sent to loadSubset - const loadSubsetWhereClauses: Array = [] + // Track the cursor expressions sent to loadSubset + // Note: cursor expressions are now passed separately from where (whereFrom/whereCurrent/lastKey) + const loadSubsetCursors: Array = [] const sourceCollection = createCollection( mockSyncCollectionOptions({ @@ -2981,8 +3092,8 @@ describe(`OrderBy with Date values and precision differences`, () => { return { loadSubset: (options) => { - // Capture the WHERE clause for inspection - loadSubsetWhereClauses.push(options.where) + // Capture the cursor for inspection (now contains whereFrom/whereCurrent/lastKey) + loadSubsetCursors.push(options.cursor) return new Promise((resolve) => { setTimeout(() => { @@ -3003,6 +3114,42 @@ describe(`OrderBy with Date values and precision differences`, () => { } } + // Apply cursor expressions if present + if (options.cursor) { + const { whereFrom, whereCurrent } = options.cursor + try { + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filteredData.filter(whereFromFn) + + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = filteredData.filter(whereCurrentFn) + + // Combine and deduplicate + const seenIds = new Set() + filteredData = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + for (const item of fromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + filteredData.sort( + (a, b) => + a.createdAt.getTime() - b.createdAt.getTime(), + ) + } catch (error) { + console.log(`Error applying cursor:`, error) + } + } + const { limit } = options const dataToLoad = limit ? filteredData.slice(0, limit) @@ -3042,21 +3189,22 @@ describe(`OrderBy with Date values and precision differences`, () => { const results = Array.from(collection.values()).sort((a, b) => a.id - b.id) expect(results.map((r) => r.id)).toEqual([1, 2, 3, 4, 5]) - // Clear tracked clauses before moving to next page - loadSubsetWhereClauses.length = 0 + // Clear tracked cursors before moving to next page + loadSubsetCursors.length = 0 // Move to next page - this should trigger the Date precision handling const moveToSecondPage = collection.utils.setWindow({ offset: 5, limit: 5 }) await moveToSecondPage - // Find the WHERE clause that queries for the "equal values" (the minValue query) - // With the fix, this should be: and(gte(createdAt, baseTime), lt(createdAt, baseTime+1ms)) + // Find the cursor that contains the "whereCurrent" expression (the minValue query) + // With the fix, whereCurrent should be: and(gte(createdAt, baseTime), lt(createdAt, baseTime+1ms)) // Without the fix, this would be: eq(createdAt, baseTime) - const equalValuesQuery = loadSubsetWhereClauses.find((clause) => { - if (!clause) return false - // Check if it's an 'and' with 'gte' and 'lt' (the fix) - if (clause.name === `and` && clause.args?.length === 2) { - const [first, second] = clause.args + const cursorWithDateRange = loadSubsetCursors.find((cursor) => { + if (!cursor?.whereCurrent) return false + const whereCurrent = cursor.whereCurrent + // Check if whereCurrent is an 'and' with 'gte' and 'lt' (the fix) + if (whereCurrent.name === `and` && whereCurrent.args?.length === 2) { + const [first, second] = whereCurrent.args return first?.name === `gte` && second?.name === `lt` } return false @@ -3064,7 +3212,8 @@ describe(`OrderBy with Date values and precision differences`, () => { // The fix should produce a range query (and(gte, lt)) for Date values // instead of an exact equality query (eq) - expect(equalValuesQuery).toBeDefined() + expect(cursorWithDateRange).toBeDefined() + const equalValuesQuery = cursorWithDateRange.whereCurrent expect(equalValuesQuery.name).toBe(`and`) expect(equalValuesQuery.args[0].name).toBe(`gte`) expect(equalValuesQuery.args[1].name).toBe(`lt`) diff --git a/packages/db/tests/query/predicate-utils.test.ts b/packages/db/tests/query/predicate-utils.test.ts index 907581fe6..0cb14ec20 100644 --- a/packages/db/tests/query/predicate-utils.test.ts +++ b/packages/db/tests/query/predicate-utils.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it } from 'vitest' import { isLimitSubset, + isOffsetLimitSubset, isOrderBySubset, isPredicateSubset, isWhereSubset, @@ -647,6 +648,86 @@ describe(`isLimitSubset`, () => { }) }) +describe(`isOffsetLimitSubset`, () => { + it(`should return true when subset range is within superset range (same offset)`, () => { + expect( + isOffsetLimitSubset({ offset: 0, limit: 5 }, { offset: 0, limit: 10 }), + ).toBe(true) + expect( + isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 0, limit: 10 }), + ).toBe(true) + }) + + it(`should return true when subset starts later but is still within superset range`, () => { + // superset loads rows [0, 10), subset loads rows [5, 10) - subset is within superset + expect( + isOffsetLimitSubset({ offset: 5, limit: 5 }, { offset: 0, limit: 10 }), + ).toBe(true) + }) + + it(`should return false when subset extends beyond superset range`, () => { + // superset loads rows [0, 10), subset loads rows [5, 15) - subset extends beyond + expect( + isOffsetLimitSubset({ offset: 5, limit: 10 }, { offset: 0, limit: 10 }), + ).toBe(false) + }) + + it(`should return false when subset is completely outside superset range`, () => { + // superset loads rows [0, 10), subset loads rows [20, 30) - no overlap + expect( + isOffsetLimitSubset({ offset: 20, limit: 10 }, { offset: 0, limit: 10 }), + ).toBe(false) + }) + + it(`should return false when superset starts after subset`, () => { + // superset loads rows [10, 20), subset loads rows [0, 10) - superset starts too late + expect( + isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 10, limit: 10 }), + ).toBe(false) + }) + + it(`should return true when superset is unlimited`, () => { + expect(isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 0 })).toBe( + true, + ) + expect(isOffsetLimitSubset({ offset: 20, limit: 10 }, { offset: 0 })).toBe( + true, + ) + }) + + it(`should return false when superset is unlimited but starts after subset`, () => { + // superset loads rows [10, ∞), subset loads rows [0, 10) - superset starts too late + expect(isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 10 })).toBe( + false, + ) + }) + + it(`should return false when subset is unlimited but superset has a limit`, () => { + expect(isOffsetLimitSubset({ offset: 0 }, { offset: 0, limit: 10 })).toBe( + false, + ) + }) + + it(`should return true when both are unlimited and superset starts at or before subset`, () => { + expect(isOffsetLimitSubset({ offset: 10 }, { offset: 0 })).toBe(true) + expect(isOffsetLimitSubset({ offset: 10 }, { offset: 10 })).toBe(true) + }) + + it(`should return false when both are unlimited but superset starts after subset`, () => { + expect(isOffsetLimitSubset({ offset: 0 }, { offset: 10 })).toBe(false) + }) + + it(`should default offset to 0 when undefined`, () => { + expect(isOffsetLimitSubset({ limit: 5 }, { limit: 10 })).toBe(true) + expect(isOffsetLimitSubset({ offset: 0, limit: 5 }, { limit: 10 })).toBe( + true, + ) + expect(isOffsetLimitSubset({ limit: 5 }, { offset: 0, limit: 10 })).toBe( + true, + ) + }) +}) + describe(`isPredicateSubset`, () => { it(`should check all components for unlimited superset`, () => { // For unlimited supersets, where-subset logic applies @@ -756,6 +837,138 @@ describe(`isPredicateSubset`, () => { } expect(isPredicateSubset(subset, superset)).toBe(false) }) + + describe(`with offset`, () => { + it(`should return true when subset offset+limit is within superset range`, () => { + const sameWhere = gt(ref(`age`), val(10)) + const subset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 5, + limit: 5, + } + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 10, + } + // subset loads rows [5, 10), superset loads rows [0, 10) - subset is within + expect(isPredicateSubset(subset, superset)).toBe(true) + }) + + it(`should return false when subset is at different offset outside superset range`, () => { + const sameWhere = gt(ref(`age`), val(10)) + const subset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 20, + limit: 10, + } + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 10, + } + // subset loads rows [20, 30), superset loads rows [0, 10) - no overlap + expect(isPredicateSubset(subset, superset)).toBe(false) + }) + + it(`should return false when subset extends beyond superset even with same where`, () => { + const sameWhere = gt(ref(`age`), val(10)) + const subset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 5, + limit: 10, + } + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 10, + } + // subset loads rows [5, 15), superset loads rows [0, 10) - subset extends beyond + expect(isPredicateSubset(subset, superset)).toBe(false) + }) + + it(`should return true for unlimited superset with any subset offset`, () => { + const sameWhere = gt(ref(`age`), val(10)) + const subset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 100, + limit: 10, + } + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + // No limit - unlimited + } + expect(isPredicateSubset(subset, superset)).toBe(true) + }) + + it(`should return false when superset has offset that starts after subset needs`, () => { + const sameWhere = gt(ref(`age`), val(10)) + const subset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 10, + } + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 5, + limit: 10, + } + // subset needs rows [0, 10), superset only has rows [5, 15) + expect(isPredicateSubset(subset, superset)).toBe(false) + }) + + it(`should handle pagination correctly - page 2 not subset of page 1`, () => { + const sameWhere = gt(ref(`age`), val(10)) + // Page 1: offset 0, limit 10 + const page1: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 10, + } + // Page 2: offset 10, limit 10 + const page2: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 10, + limit: 10, + } + // Page 2 is NOT a subset of page 1 (different rows) + expect(isPredicateSubset(page2, page1)).toBe(false) + // Page 1 is NOT a subset of page 2 (different rows) + expect(isPredicateSubset(page1, page2)).toBe(false) + }) + + it(`should return true when superset covers multiple pages`, () => { + const sameWhere = gt(ref(`age`), val(10)) + // Superset: offset 0, limit 30 (covers pages 1-3) + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 30, + } + // Page 2: offset 10, limit 10 + const page2: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 10, + limit: 10, + } + // Page 2 IS a subset of superset (rows 10-19 within 0-29) + expect(isPredicateSubset(page2, superset)).toBe(true) + }) + }) }) describe(`minusWherePredicates`, () => { diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 73d1e6c05..d8a0e2bcb 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -6,7 +6,7 @@ import { } from '@electric-sql/client' import { Store } from '@tanstack/store' import DebugModule from 'debug' -import { DeduplicatedLoadSubset } from '@tanstack/db' +import { DeduplicatedLoadSubset, and } from '@tanstack/db' import { ExpectedNumberInAwaitTxIdError, StreamAbortedError, @@ -307,7 +307,12 @@ function hasTxids>( * Creates a deduplicated loadSubset handler for progressive/on-demand modes * Returns null for eager mode, or a DeduplicatedLoadSubset instance for other modes. * Handles fetching snapshots in progressive mode during buffering phase, - * and requesting snapshots in on-demand mode + * and requesting snapshots in on-demand mode. + * + * When cursor expressions are provided (whereFrom/whereCurrent), makes two + * requestSnapshot calls: + * - One for whereFrom (rows > cursor) with limit + * - One for whereCurrent (rows = cursor, for tie-breaking) without limit */ function createLoadSubsetDedupe>({ stream, @@ -382,8 +387,50 @@ function createLoadSubsetDedupe>({ return } else { // On-demand mode: use requestSnapshot - const snapshotParams = compileSQL(opts) - await stream.requestSnapshot(snapshotParams) + // When cursor is provided, make two calls: + // 1. whereCurrent (all ties, no limit) + // 2. whereFrom (rows > cursor, with limit) + const { cursor, where, orderBy, limit } = opts + + if (cursor) { + // Make parallel requests for cursor-based pagination + const promises: Array> = [] + + // Request 1: All rows matching whereCurrent (ties at boundary, no limit) + // Combine main where with cursor.whereCurrent + const whereCurrentOpts: LoadSubsetOptions = { + where: where ? and(where, cursor.whereCurrent) : cursor.whereCurrent, + orderBy, + // No limit - get all ties + } + const whereCurrentParams = compileSQL(whereCurrentOpts) + promises.push(stream.requestSnapshot(whereCurrentParams)) + + debug( + `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereCurrent snapshot (all ties)`, + ) + + // Request 2: Rows matching whereFrom (rows > cursor, with limit) + // Combine main where with cursor.whereFrom + const whereFromOpts: LoadSubsetOptions = { + where: where ? and(where, cursor.whereFrom) : cursor.whereFrom, + orderBy, + limit, + } + const whereFromParams = compileSQL(whereFromOpts) + promises.push(stream.requestSnapshot(whereFromParams)) + + debug( + `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereFrom snapshot (with limit ${limit})`, + ) + + // Wait for both requests to complete + await Promise.all(promises) + } else { + // No cursor - standard single request + const snapshotParams = compileSQL(opts) + await stream.requestSnapshot(snapshotParams) + } } } diff --git a/packages/electric-db-collection/tests/electric-live-query.test.ts b/packages/electric-db-collection/tests/electric-live-query.test.ts index ccaef84f3..bcd83d310 100644 --- a/packages/electric-db-collection/tests/electric-live-query.test.ts +++ b/packages/electric-db-collection/tests/electric-live-query.test.ts @@ -609,9 +609,8 @@ describe.each([ // Limited queries are only deduplicated when their where clauses are equal. // Both queries have the same where clause (active = true), but the second query // with limit 6 needs more data than the first query with limit 2 provided. - // The internal query system makes additional requests as it processes the data. - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(6) + // With cursor-based pagination, initial loads (without cursor) make 1 requestSnapshot call each. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) // Check that first it requested a limit of 2 users (from first query) expect(callArgs(0)).toMatchObject({ @@ -877,9 +876,8 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { ) // For limited queries, only requests with identical where clauses can be deduplicated. - // The internal query system may make additional requests as it processes the data. - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(3) + // With cursor-based pagination, initial loads (without cursor) make 1 requestSnapshot call. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) }) it(`should pass correct WHERE clause to requestSnapshot when live query has filters`, async () => { @@ -1189,9 +1187,8 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) // For limited queries, only requests with identical where clauses can be deduplicated. - // The internal query system may make additional requests as it processes data. - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(3) + // With cursor-based pagination, initial loads (without cursor) make 1 requestSnapshot call. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) // Simulate a must-refetch (which triggers truncate and reset) subscriber([{ headers: { control: `must-refetch` } }]) @@ -1201,8 +1198,8 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) // The existing live query re-requests its data after truncate - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(5) + // After must-refetch, the query requests data again (1 initial + 1 after truncate) + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) // Create the same live query again after reset // This should NOT be deduped because the reset cleared the deduplication state, @@ -1221,8 +1218,8 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) // Should have more calls - the different query triggered a new request - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(6) + // 1 initial + 1 after must-refetch + 1 for new query = 3 + expect(mockRequestSnapshot).toHaveBeenCalledTimes(3) }) it(`should deduplicate unlimited queries regardless of orderBy`, async () => { diff --git a/packages/query-db-collection/e2e/query-filter.ts b/packages/query-db-collection/e2e/query-filter.ts index 4ead26397..43f4d0e87 100644 --- a/packages/query-db-collection/e2e/query-filter.ts +++ b/packages/query-db-collection/e2e/query-filter.ts @@ -69,6 +69,11 @@ export function serializeLoadSubsetOptions( result.limit = options.limit } + // Include offset for pagination support - different offsets need different query keys + if (options.offset !== undefined) { + result.offset = options.offset + } + return JSON.stringify(Object.keys(result).length === 0 ? null : result) } @@ -158,7 +163,7 @@ function isBasicExpression( } /** - * Apply LoadSubsetOptions to data (filter, sort, limit) + * Apply LoadSubsetOptions to data (filter, sort, limit, offset) */ export function applyPredicates( data: Array, @@ -172,6 +177,7 @@ export function applyPredicates( let filters: Array = [] let sorts: Array = [] let limit: number | undefined = undefined + const offset = options.offset ?? 0 // Check if where clause is simple before trying to parse const hasComplexWhere = options.where && !isSimpleExpression(options.where) @@ -232,6 +238,7 @@ export function applyPredicates( expressionSummary: analysis, hasOrderBy: Boolean(orderBy), limit: rawLimit, + offset, filtersCount: filters.length, sortsCount: sorts.length, initialSize: data.length, @@ -261,14 +268,16 @@ export function applyPredicates( } } - // Apply LIMIT - // Note: offset is NOT applied here - it's handled by the live query windowing layer - // The limit passed here already accounts for offset (e.g., offset(20).limit(10) -> limit: 30) - if (limit !== undefined) { - result = result.slice(0, limit) + // Apply OFFSET and LIMIT + // For pagination: offset skips rows, limit caps the result + if (offset > 0 || limit !== undefined) { + const start = offset + const end = limit !== undefined ? offset + limit : undefined + result = result.slice(start, end) if (DEBUG_SUMMARY) { - console.log(`[query-filter] after limit`, { + console.log(`[query-filter] after offset/limit`, { size: result.length, + offset, limit, }) } diff --git a/packages/query-db-collection/src/serialization.ts b/packages/query-db-collection/src/serialization.ts index 0d353db76..9849c4bd3 100644 --- a/packages/query-db-collection/src/serialization.ts +++ b/packages/query-db-collection/src/serialization.ts @@ -1,7 +1,9 @@ import type { IR, LoadSubsetOptions } from '@tanstack/db' /** - * Serializes LoadSubsetOptions into a stable, hashable format for query keys + * Serializes LoadSubsetOptions into a stable, hashable format for query keys. + * Includes where, orderBy, limit, and offset for pagination support. + * Note: cursor expressions are not serialized as they are backend-specific. * @internal */ export function serializeLoadSubsetOptions( @@ -43,6 +45,11 @@ export function serializeLoadSubsetOptions( result.limit = options.limit } + // Include offset for pagination support + if (options.offset !== undefined) { + result.offset = options.offset + } + return Object.keys(result).length === 0 ? undefined : JSON.stringify(result) } diff --git a/packages/react-db/tests/useLiveInfiniteQuery.test.tsx b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx index cd420ccfc..cd95029de 100644 --- a/packages/react-db/tests/useLiveInfiniteQuery.test.tsx +++ b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx @@ -898,8 +898,44 @@ describe(`useLiveInfiniteQuery`, () => { }) } - // Apply limit if provided - if (opts.limit !== undefined) { + // Apply cursor expressions if present (new cursor-based pagination) + if (opts.cursor) { + const { whereFrom, whereCurrent } = opts.cursor + try { + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filtered.filter(whereFromFn) + + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = filtered.filter(whereCurrentFn) + + // Combine current (ties) with from (next page), deduplicate + const seenIds = new Set() + filtered = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filtered.push(item) + } + } + // Apply limit only to fromData + const limitedFromData = opts.limit + ? fromData.slice(0, opts.limit) + : fromData + for (const item of limitedFromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filtered.push(item) + } + } + // Re-sort after combining + filtered.sort((a, b) => b.createdAt - a.createdAt) + } catch { + // Fallback to original filtered if cursor parsing fails + } + } else if (opts.limit !== undefined) { + // Apply limit only if no cursor (cursor handles limit internally) filtered = filtered.slice(0, opts.limit) }