Skip to content

fractal-solutions/qflow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

qflow: A Lightweight and Flexible JavaScript Workflow and Agent Library

qflow is a lightweight and flexible JavaScript library designed for creating and managing complex workflows and autonomous agents. It provides a minimalist yet expressive API to define sequences of operations, manage data flow, and orchestrate execution, supporting both synchronous and asynchronous patterns.

Features

  • Modular & Extensible: Easily define custom nodes and compose them into complex, reusable flows.
  • Synchronous & Asynchronous Flows: Supports both blocking and non-blocking execution models.
  • Observability: Monitor workflows with a built-in event system that emits detailed lifecycle events for flows and nodes.
  • Shared State Management: Pass and manipulate data across nodes using a central, mutable shared object.
  • Batch Processing: Efficiently process collections of data through dedicated batch nodes and flows, including parallel execution.
  • Agents: Built upon the qflow core functionality are plug and play agents with extensive tool integrations available.
  • Built-in Integrations: Comes with pre-built nodes for multiple tasks like LLM interactions, browser use, pdf tools, webhooks, spreadsheet manipulation, code interpretation, media manipulation, web scraping, and popular API integrations (GitHub, Git, Open Router, HackerNews, Stripe, Maps).
  • Custom Agent Tools: Build your own Agent tools using the flow registry pattern.

Installation

To get started quickly with a new project, you can use the create-qflow tool (Recommended):

bunx create-qflow@latest <project-name>

or

You can install qflow via npm or Bun:

npm install @fractal-solutions/qflow
# or
bun add @fractal-solutions/qflow

Module Imports

qflow provides different entry points for its core functionalities and built-in nodes to keep your imports clean and specific.

  • Core Classes (Node, Flow, AsyncNode, AsyncFlow, etc.): These are imported directly from the main package:

    import { Node, Flow, AsyncNode, AsyncFlow } from '@fractal-solutions/qflow';
  • Built-in Integration Nodes (LLMNode, DeepSeekLLMNode, GitHubNode, WebScraperNode, etc.): These are imported from the /nodes subpath:

    import { DeepSeekLLMNode, GitHubNode, WebScraperNode } from '@fractal-solutions/qflow/nodes';

Core Abstractions

qflow is built around a few core abstractions that enable powerful and flexible workflow definitions.

Shared State (shared object)

A central, mutable JavaScript object that is passed through the entire flow. Nodes can read from and write to this shared object, making it the primary mechanism for passing data and context between different nodes in a workflow. This is particularly useful for accumulating results or maintaining state across multiple steps.

Node

The fundamental building block of any qflow workflow. A Node represents a single, atomic operation or step in your flow.

  • prep(shared): Prepares data for execution. Receives the shared object.
  • exec(prepRes): Executes the node's primary logic.
  • post(shared, prepRes, execRes): Processes the result of exec. Receives the shared object.
  • setParams(params): Configures the node with specific parameters. Parameters are accessible via this.params.
  • next(node, action = "default"): Chains this node to another, defining the next step in the flow.

Asynchronous Nodes (AsyncNode, AsyncBatchNode, AsyncParallelBatchNode) For operations that involve I/O or are inherently asynchronous, qflow provides AsyncNode and its variants. These nodes leverage async/await for non-blocking execution. When working within AsyncFlows, it's crucial to implement the async versions of the lifecycle methods:

  • prepAsync(shared)
  • execAsync(prepRes, shared)
  • postAsync(shared, prepRes, execRes)
  • preparePrompt(shared) (specifically for LLM nodes, allowing prompt construction based on shared state)

These async methods ensure proper awaiting and data propagation within asynchronous workflows.

Flow

A Flow orchestrates the execution of a sequence of Nodes. It defines the overall path and manages the transitions between nodes.

  • start(startNode): Sets the initial node for the flow. (Note: In practice, you often pass the start node directly to the Flow constructor for conciseness).
  • _orch(shared, params) / _orchAsync(shared, params): Internal methods used to run the flow, especially when passing initial parameters to the starting node. For most use cases, flow.run(sharedState) or await flow.runAsync(sharedState) is sufficient.

Batch Flows (BatchFlow, AsyncBatchFlow, AsyncParallelBatchFlow) These specialized flows are designed to process collections of items. They run a sub-flow for each item in the batch. AsyncParallelBatchFlow is particularly useful for concurrently processing batch items, significantly speeding up operations.

Basic Usage & Examples

1. Simple Node

A basic example of defining and running a single node.

import { Node } from '@fractal-solutions/qflow';

class MySimpleNode extends Node {
  prep(shared) {
    console.log('Preparing data...');
    return shared.inputData * 2;
  }

  exec(prepRes) {
    console.log('Executing with prepared data:', prepRes);
    return prepRes + 10;
  }

  post(shared, prepRes, execRes) {
    console.log('Post-processing result:', execRes);
    return { finalResult: execRes, originalInput: shared.inputData };
  }
}

const node = new MySimpleNode();
const result = node.run({ inputData: 5 });
console.log('Node run result:', result);
// Expected output:
// Preparing data...
// Executing with prepared data: 10
// Post-processing result: 20
// Node run result: { finalResult: 20, originalInput: 5 }

2. Simple Flow

Chaining multiple nodes together to form a basic workflow.

import { Node, Flow } from '@fractal-solutions/qflow';

let count = 0;

class MessageNode extends Node {
    exec() {
        count++;
        console.log(`New Message ${count}`);
        return `default`;
    }
}

class TimeNode extends Node {
    exec() {
        console.log(`Time ${Date.now()}`);
        return `default`;
    }
}

const m1 = new MessageNode();
const t1 = new TimeNode();
const m2 = new MessageNode();

m1.next(t1);
t1.next(m2);

const flow = new Flow(m1);
flow.run({});
// Expected output (approximate):
// New Message 1
// Time <timestamp>
// New Message 2

3. Conditional Flow

Using transition() for dynamic branching based on an action. This example demonstrates configuring a node using setParams for a cleaner API.

import { Node, Flow } from '@fractal-solutions/qflow';

class ConditionalNode extends Node {
  exec() {
    // Access shouldGoLeft from this.params, which is set via setParams
    if (this.params.shouldGoLeft) {
      console.log('ConditionalNode: Going left');
      return 'left';
    } else {
      console.log('ConditionalNode: Going right');
      return 'right';
    }
  }
}

// Helper node for conditional transition test
function MessageNode(message) {
  return new (class extends Node {
    exec() {
      console.log(message);
      return 'default';
    }
  })();
}

const conditionalNode = new ConditionalNode();
conditionalNode.setParams({ shouldGoLeft: true }); // Configure via setParams

const leftNode = MessageNode('Went Left');
const rightNode = MessageNode('Went Right');

conditionalNode.next(leftNode, 'left');
conditionalNode.next(rightNode, 'right');

const conditionalFlow = new Flow(conditionalNode);
conditionalFlow.run({});
// Expected output:
// ConditionalNode: Going left
// Went Left

const conditionalNode2 = new ConditionalNode();
conditionalNode2.setParams({ shouldGoLeft: false }); // Configure via setParams

conditionalNode2.next(leftNode, 'left');
conditionalNode2.next(rightNode, 'right');
const conditionalFlow2 = new Flow(conditionalNode2);
conditionalFlow2.run({});
// Expected output:
// ConditionalNode: Going right
// Went Right

4. Asynchronous Flow

Handling asynchronous operations within a flow.

import { AsyncNode, AsyncFlow } from '@fractal-solutions/qflow';

class MyAsyncNode extends AsyncNode {
  async execAsync() {
    console.log('AsyncNode: Starting...');
    await new Promise(resolve => setTimeout(resolve, 100));
    console.log('AsyncNode: Finished!');
    return 'default';
  }
}

const asyncNode1 = new MyAsyncNode();
const asyncNode2 = new MyAsyncNode();
asyncNode1.next(asyncNode2);

const asyncFlow = new AsyncFlow(asyncNode1);
await asyncFlow.runAsync({});
// Expected output:
// AsyncNode: Starting...
// AsyncNode: Finished!
// AsyncNode: Starting...
// AsyncNode: Finished!

See documentation/examples.md for a full list of examples.

Observability (Event System)

qflow includes a built-in event system for AsyncFlow that provides detailed insights into the execution of your workflows. This allows for powerful, real-time monitoring, logging, and debugging.

You can attach listeners to an AsyncFlow instance to subscribe to key lifecycle events.

Available Events

  • flow:start: Emitted when the flow begins.
    • Payload: { flowId, startTime }
  • flow:end: Emitted when the flow finishes, either successfully or with an error.
    • Payload: { flowId, endTime, duration, status, error }
  • node:start: Emitted just before a node executes.
    • Payload: { flowId, nodeClass, startTime, params }
  • node:end: Emitted right after a node finishes.
    • Payload: { flowId, nodeClass, endTime, duration, status, result, error }

This example demonstrates how to use the built-in event system to monitor the execution of a flow.

import { AsyncFlow, AsyncNode } from '@fractal-solutions/qflow';

const colors = {
    reset: "\x1b[0m",
    cyan: "\x1b[36m",
    green: "\x1b[32m",
    red: "\x1b[31m",
    grey: "\x1b[90m"
};
function colorize(text, color) {
    return `${color}${text}${colors.reset}`;
}

// A simple node that simulates some work
class WorkNode extends AsyncNode {
    async execAsync() {
        const workTime = Math.random() * 100 + 50; // 50-150ms
        await new Promise(resolve => setTimeout(resolve, workTime));
        return { status: 'work_done' };
    }
}

// A node that is designed to fail once before succeeding
let failCount = 0;
class SometimesFailsNode extends AsyncNode {
    constructor() {
        // Configure this node to allow 1 retry (for a total of 2 attempts)
        super(2, 0.1); 
    }
    async execAsync() {
        if (failCount < 1) {
            failCount++;
            throw new Error('Simulating a transient error');
        }
        return { status: 'succeeded_after_failure' };
    }
}

// 1. Define the workflow
const startNode = new WorkNode();
const middleNode = new SometimesFailsNode();
const endNode = new WorkNode();

startNode.next(middleNode);
middleNode.next(endNode);

// 2. Create the flow
const myFlow = new AsyncFlow(startNode);

// 3. Attach listeners for observability
myFlow.on('flow:start', (payload) => {
    const time = new Date(payload.startTime).toLocaleTimeString();
    console.log(`\n${colorize(`[FLOW:START]`, colors.cyan)} ${payload.flowId} at ${time}`);
});

myFlow.on('node:start', (payload) => {
    console.log(colorize(`  [NODE:START] ${payload.nodeClass}`, colors.grey));
});

myFlow.on('node:end', (payload) => {
    const status = payload.status.toUpperCase();
    const color = payload.status === 'success' ? colors.green : colors.red;
    const statusStr = colorize(status.padEnd(7), color); // Pad for alignment
    const duration = `${payload.duration.toFixed(2)}ms`.padStart(10);
    console.log(`  [NODE:END]   ${payload.nodeClass.padEnd(18)} | Status: ${statusStr} | Duration: ${duration}`);
    if (payload.error) {
        console.log(colorize(`    └─ ERROR: ${payload.error.message}`, colors.red));
    }
});

myFlow.on('flow:end', (payload) => {
    const status = payload.status.toUpperCase();
    const color = payload.status === 'success' ? colors.green : colors.red;
    const statusStr = colorize(status.padEnd(7), color);
    const duration = `${payload.duration.toFixed(2)}ms`;
    console.log(`\n${colorize(`[FLOW:END]`, colors.cyan)}   ${payload.flowId} | Status: ${statusStr} | Duration: ${duration}`);
    if (payload.error) {
        console.log(colorize(`  └─ FLOW ERROR: ${payload.error.message}`, colors.red));
    }
});

// 4. Run the flow
await myFlow.runAsync({});

// Expected output:
//
// [FLOW:START] <flow-id> at <time>
//   [NODE:START] WorkNode
//   [NODE:END]   WorkNode           | Status: SUCCESS | Duration:  XX.XXms
//   [NODE:START] SometimesFailsNode
//   [NODE:END]   SometimesFailsNode | Status: ERROR   | Duration:   X.XXms
//     └─ ERROR: Simulating a transient error
//   [NODE:START] SometimesFailsNode
//   [NODE:END]   SometimesFailsNode | Status: SUCCESS | Duration:   X.XXms
//   [NODE:START] WorkNode
//   [NODE:END]   WorkNode           | Status: SUCCESS | Duration:  XX.XXms
//
// [FLOW:END]   <flow-id> | Status: SUCCESS | Duration: XXX.XXms

Logging

qflow provides a simple yet powerful logging mechanism that gives you per-node control over how and where log messages are sent.

Basic Usage

By default, all built-in nodes will log to the console. You don't need to configure anything for this to work.

Per-Node Logging Configuration

To customize the logging for a specific node, you can pass a logging object in its parameters. This gives you fine-grained control over the logging behavior of each node in your flow.

import { HttpRequestNode } from '@fractal-solutions/qflow/nodes';

const httpNode = new HttpRequestNode();

// This node will log to the console by default
httpNode.setParams({
  url: 'https://api.example.com/data'
});

// This node will log to a file
const anotherHttpNode = new HttpRequestNode();
anotherHttpNode.setParams({
  url: 'https://api.example.com/other-data',
  logging: {
    method: 'file',
    params: { filePath: './logs/http.log' }
  }
});

Logging Methods

The following logging methods are available out of the box:

  • console (default): Logs messages to the console.
  • file: Appends log messages to a file.
    • params: { filePath: string }
  • event: Emits log messages through an event emitter.
    • params: { emitter: EventEmitter }
  • remote: Sends log messages to a remote server via a POST request.
    • params: { url: string }

Examples

Logging to a File

import { CodeInterpreterNode } from '@fractal-solutions/qflow/nodes';

const codeNode = new CodeInterpreterNode();
codeNode.setParams({
  code: 'print("Hello from Python!")',
  logging: {
    method: 'file',
    params: { filePath: './logs/code-interpreter.log' }
  }
});

Logging to an Event Emitter

This is useful for integrating qflow's logging with your application's event system.

import { EventEmitter } from 'events';
import { MemoryNode } from '@fractal-solutions/qflow/nodes';

const myEmitter = new EventEmitter();
myEmitter.on('log', (logEntry) => {
  console.log('Received log event:', logEntry);
});

const memoryNode = new MemoryNode();
memoryNode.setParams({
  action: 'store',
  content: 'This is a test memory.',
  logging: {
    method: 'event',
    params: { emitter: myEmitter }
  }
});

Disabling Logging for a Node

You can disable logging for a specific node by setting the type to none.

const silentHttpNode = new HttpRequestNode();
silentHttpNode.setParams({
  url: 'https://api.example.com/silent',
  logging: {
    type: 'none'
  }
});

Global Log Level

You can set a global log level to control the verbosity of all logs in your application. Only messages with a severity level equal to or higher than the current log level will be processed.

import { setLogLevel, LogLevel } from '@fractal-solutions/qflow/logger';

// Set the global log level to INFO. 
// This will show INFO, WARN, and ERROR messages, but hide DEBUG messages.
setLogLevel(LogLevel.INFO);

The available log levels are, in order of severity:

  • LogLevel.DEBUG: Detailed information, useful for debugging.
  • LogLevel.INFO: General information about the application's state.
  • LogLevel.WARN: Indicates a potential problem that does not prevent the application from running.
  • LogLevel.ERROR: Indicates a serious error that prevented a specific operation from completing.
  • LogLevel.NONE: Disables all logging.

Making Your Custom Nodes Loggable

You can make your own custom nodes loggable by following the same pattern as the built-in nodes.

  1. Import the log function:

    import { log } from '@fractal-solutions/qflow/logger';
  2. Call the log function: In your node's methods, call the log function and pass this.params.logging as the second argument. This will allow users of your node to configure its logging behavior.

Here is an example of a custom node that implements loggable behavior:

import { AsyncNode } from '@fractal-solutions/qflow';
import { log } from '@fractal-solutions/qflow/logger';

class MyCustomNode extends AsyncNode {
  async execAsync() {
    log('Starting execution of MyCustomNode', this.params.logging);
    // ... your node's logic ...

    log('Finished execution of MyCustomNode', this.params.logging);

    return 'default';
  }
}

// --- User of your node can now configure logging ---

const myNode = new MyCustomNode();

// This will log to the console (default)
myNode.setParams({ /* ... */ });

// This will log to a file
const anotherNode = new MyCustomNode();
anotherNode.setParams({
  /* ... */,
  logging: {
    method: 'file',
    params: { filePath: './my-custom-node.log' }
  }
});

Agents

For a detailed explanation of the agents, see the Agents documentation. The tools available to agents are documented in the Tools documentation.

Integrated Nodes and their Examples

Here's a comprehensive list of integrated nodes available in qflow, along with a brief description and a link to their detailed documentation.

Data

  • CodeInterpreterNode: Executes Python code snippets.
  • DataExtractorNode: Extracts structured data from HTML, JSON, or plain text.
  • DatabaseNode: Provides a powerful and flexible way to interact with SQL databases.
  • EmbeddingNode: Generates vector embeddings for text (requires Ollama).
  • MemoryNode: Stores and retrieves text memories (keyword-based).
  • SemanticMemoryNode: Stores and retrieves text memories via semantic search (requires Ollama).
  • TransformNode: Transforms input data using a provided JavaScript function.
  • PDFProcessorNode: Extracts text or images from PDF documents.
  • SpreadsheetNode: Reads from and writes to spreadsheet files (.xlsx, .xls, .csv) with advanced manipulation.
  • DataValidationNode: Validates structured data against a JSON Schema.

System

Web

  • HttpRequestNode: Makes a generic HTTP request to any URL.
  • WebScraperNode: Fetches the HTML content of a given URL.
  • DuckDuckGoSearchNode: Performs a web search using DuckDuckGo.
  • GoogleSearchNode: Performs a web search using the Google Custom Search API.
  • BrowserControlNode: Controls a web browser to navigate pages, interact with elements, and take screenshots.
  • WebSocketsNode: Provides real-time, two-way communication with web services.
  • WebhookNode: Exposes an HTTP endpoint to receive webhooks, triggering a specified qflow flow.
  • HttpServerNode: Creates a web server and handles HTTP requests by triggering a qflow flow.

Flow Control

  • SubFlowNode: Executes a sub-flow.
  • IteratorNode: Iterates items, executes sub-flow for each.
  • SchedulerNode: Schedules qflow flows for future or recurring execution using cron syntax or a delay.

Other

  • GitNode: Performs Git operations like clone, add, commit, and push.
  • GISNode: Performs Geographic Information System operations like geocoding and reverse geocoding.
  • GitHubNode: Performs GitHub operations like creating and managing issues.
  • HackerNewsNode: Fetches top stories and item details from Hacker News.
  • StripeNode: Performs Stripe operations like creating charges and retrieving account balances.

LLMs

Agent

  • AgentNode: The core of agentic behavior in qflow. It orchestrates tools and LLM reasoning to achieve complex goals.

Error Handling

qflow provides mechanisms to handle errors gracefully within your workflows.

  • Node-level Error Handling:
    • Synchronous Nodes: If an error occurs in prep or exec, it will be caught by the flow and propagate up. You can implement execFallback(prepRes, error) in your Node subclass to provide a fallback mechanism when exec fails after all retries.
    • Asynchronous AsyncNodes: Similarly, prepAsync or execAsync can throw errors. Implement execFallbackAsync(prepRes, error) for asynchronous fallbacks.
  • Flow-level Error Handling:
    • When you run a flow using flow.run(sharedState) or await flow.runAsync(sharedState), any unhandled errors from within the nodes will propagate up and can be caught using standard JavaScript try...catch blocks around the run or runAsync call. This allows you to manage errors at the workflow level.
import { Node, Flow } from '@fractal-solutions/qflow';

class FailingNode extends Node {
  exec() {
    throw new Error("Something went wrong in FailingNode!");
  }
  execFallback(prepRes, error) {
    console.error("FailingNode fallback triggered:", error.message);
    return "Fallback successful!";
  }
}

const failingNode = new FailingNode();
const errorFlow = new Flow(failingNode);

try {
  const result = errorFlow.run({});
  console.log("Flow completed with result:", result);
} catch (error) {
  console.error("Flow failed with unhandled error:", error.message);
}
// Expected output:
// FailingNode fallback triggered: Something went wrong in FailingNode!
// Flow completed with result: Fallback successful!

Debugging

Debugging qflow workflows can be done using standard JavaScript debugging tools and techniques.

  • console.log: The simplest way to inspect data and execution flow. Strategically place console.log statements within prep, exec, post, and their Async counterparts to trace the shared object, prepRes, and execRes values.
  • Debugger: Use your IDE's built-in debugger (e.g., VS Code's debugger) or Node.js/Bun's inspector (node --inspect or bun --inspect). Set breakpoints within your node's lifecycle methods to step through the execution and examine the state.
  • Error Messages: Pay close attention to the error messages and stack traces. qflow aims to provide clear error messages that point to the source of the problem within your nodes.

Testing

Testing qflow workflows involves unit testing individual nodes and integration testing entire flows.

  • Unit Testing Nodes:
    • Test each Node or AsyncNode subclass in isolation.
    • Mock external dependencies (e.g., API calls for LLMNode, GitHubNode) to ensure tests are fast and reliable.
    • Verify the behavior of prep, exec, post, and their Async counterparts, as well as setParams and execFallback.
  • Integration Testing Flows:
    • Test entire Flows or AsyncFlows to ensure nodes are chained correctly and data flows as expected.
    • Provide controlled shared state inputs and assert on the final shared state or the flow's return value.
    • Use your preferred testing framework (e.g., Jest, Mocha, Bun's built-in test runner).
// Example (using Bun's test runner syntax)
import { test, expect } from "bun:test";
import { Node, Flow } from '@fractal-solutions/qflow';

class TestNodeA extends Node {
  exec(input) { return input + 1; }
}

class TestNodeB extends Node {
  exec(input) { return input * 2; }
}

test("Simple Flow should process data correctly", () => {
  const nodeA = new TestNodeA();
  const nodeB = new TestNodeB();
  nodeA.next(nodeB);

  const flow = new Flow(nodeA);
  const sharedState = { initialValue: 5 };
  const result = flow.run(sharedState); // Assuming run returns the final execRes of the last node

  expect(result).toBe(12); // (5 + 1) * 2 = 12
});

test("Node should handle parameters via setParams", () => {
  class ParamNode extends Node {
    exec() { return this.params.value; }
  }
  const node = new ParamNode();
  node.setParams({ value: "hello" });
  const result = node.run({});
  expect(result).toBe("hello");
});

Integration

qflow is designed to be highly flexible and can be integrated into various application architectures:

  • CLI Tools: Build powerful command-line tools that automate complex tasks.
  • Web Servers (e.g., Express.js, Koa): Implement API endpoints that trigger workflows for data processing, background jobs, or agent-driven responses.
  • Background Services: Run long-running processes or scheduled tasks.
  • Browser Applications: Create interactive, client-side workflows (ensure appropriate polyfills for SharedArrayBuffer if using synchronous wait in Node).

Contributing

We welcome contributions! Please see our GitHub repository for more details on how to contribute.

About

A lightweight and flexible JavaScript library for creating and managing workflows and agents. PocketFlow in JS.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages