FlowGram Runtime API Reference

FlowGram Runtime provides five core APIs for validating, running, monitoring, retrieving results, and canceling workflows. This document details the usage, parameters, and return values of these APIs.

TaskRun API

Function Description

The TaskRun API is used to start a workflow task. It takes a workflow schema and initial inputs, and returns a task ID.

Parameters

TaskRun API accepts a TaskRunInput object as its parameter:

ParameterTypeRequiredDescription
schemastringYesJSON string of the workflow schema, defining nodes and edges
inputsobjectNoInitial input parameters for the workflow, can be empty

The schema parameter is a JSON string that defines the structure of the workflow, including information about nodes and edges. The basic structure of the schema is as follows:

interface WorkflowSchema {
  nodes: WorkflowNodeSchema[];
  edges: WorkflowEdgeSchema[];
}

interface WorkflowNodeSchema {
  id: string;
  type: FlowGramNode;
  name?: string;
  meta: {
    position: {
      x: number;
      y: number;
    };
  };
  data: any;
  blocks?: WorkflowNodeSchema[];
  edges?: WorkflowEdgeSchema[];
}

interface WorkflowEdgeSchema {
  sourceNodeID: string;
  sourcePort: string;
  targetNodeID: string;
  targetPort: string;
}

Return Value

TaskRun API returns a TaskRunOutput object:

FieldTypeDescription
taskIDstringUnique identifier for the task, used for subsequent queries

Error Handling

TaskRun API may throw the following errors:

  • Schema parsing error: When the provided schema is not a valid JSON string
  • Schema structure error: When the schema structure does not match the expected format
  • Node type error: When the schema includes unsupported node types
  • Initialization error: When the workflow fails to initialize

Usage Example

import { TaskRunAPI } from '@flowgram.ai/runtime-js';

const schema = JSON.stringify({
  nodes: [
    {
      id: 'start',
      type: 'start',
      meta: { position: { x: 0, y: 0 } },
      data: {}
    },
    {
      id: 'llm',
      type: 'llm',
      meta: { position: { x: 200, y: 0 } },
      data: {
        modelName: 'gpt-3.5-turbo',
        temperature: 0.7,
        systemPrompt: 'You are an assistant',
        prompt: 'Introduce yourself'
      }
    },
    {
      id: 'end',
      type: 'end',
      meta: { position: { x: 400, y: 0 } },
      data: {}
    }
  ],
  edges: [
    {
      sourceNodeID: 'start',
      sourcePort: 'out',
      targetNodeID: 'llm',
      targetPort: 'in'
    },
    {
      sourceNodeID: 'llm',
      sourcePort: 'out',
      targetNodeID: 'end',
      targetPort: 'in'
    }
  ]
});

const inputs = {
  userInput: 'Please introduce yourself'
};

async function runWorkflow() {
  try {
    const result = await TaskRunAPI({
      schema,
      inputs
    });
    console.log('Task ID:', result.taskID);
    return result.taskID;
  } catch (error) {
    console.error('Failed to start workflow:', error);
  }
}

Notes

  • The schema must be a valid JSON string and conform to the WorkflowSchema structure
  • The workflow must include a start node (type: 'start') and an end node (type: 'end')
  • Connections between nodes must be correctly defined through edges
  • After a task is started, it will execute asynchronously. You can use the TaskReport API and TaskResult API to get the execution status and results

TaskReport API

Function Description

The TaskReport API is used to get the execution report of a workflow task, including the task status and the execution status of each node.

Parameters

TaskReport API accepts a TaskReportInput object as its parameter:

ParameterTypeRequiredDescription
taskIDstringYesUnique identifier for the task, returned by TaskRun API

Return Value

TaskReport API returns a TaskReportOutput object containing the execution report of the task:

FieldTypeDescription
workflowWorkflowStatusOverall status of the workflow
nodesRecord<string, NodeStatus>Execution status of each node

The WorkflowStatus structure is as follows:

interface WorkflowStatus {
  status: 'idle' | 'processing' | 'success' | 'fail' | 'canceled';
  terminated: boolean;
}

The NodeStatus structure is as follows:

interface NodeStatus {
  status: 'idle' | 'processing' | 'success' | 'fail' | 'canceled';
  startTime?: number;
  endTime?: number;
}

Error Handling

TaskReport API may encounter the following error situations:

  • Task does not exist: When the provided taskID does not exist, returns undefined
  • Report generation error: When an error occurs during report generation

Usage Example

import { TaskReportAPI } from '@flowgram.ai/runtime-js';

async function getTaskReport(taskID) {
  try {
    const report = await TaskReportAPI({ taskID });

    if (!report) {
      console.log('Task does not exist or report not generated');
      return;
    }

    console.log('Workflow status:', report.workflow.status);
    console.log('Workflow terminated:', report.workflow.terminated);

    // Print status of each node
    for (const [nodeId, nodeStatus] of Object.entries(report.nodes)) {
      console.log(`Node ${nodeId} status:`, nodeStatus.status);
      if (nodeStatus.startTime) {
        console.log(`Node ${nodeId} start time:`, new Date(nodeStatus.startTime).toLocaleString());
      }
      if (nodeStatus.endTime) {
        console.log(`Node ${nodeId} end time:`, new Date(nodeStatus.endTime).toLocaleString());
      }
    }

    return report;
  } catch (error) {
    console.error('Failed to get task report:', error);
  }
}

Notes

  • The task report is real-time, you can call TaskReport API multiple times to get the latest execution status
  • If the workflow has not terminated (workflow.terminated is false), the workflow is still executing
  • Node status can be 'idle' (not started), 'processing' (executing), 'success' (successful), 'fail' (failed), or 'canceled' (canceled)
  • It is recommended to poll the task report periodically to monitor the progress of the workflow

TaskCancel API

Function Description

The TaskCancel API is used to cancel a running workflow task.

Parameters

TaskCancel API accepts a TaskCancelInput object as its parameter:

ParameterTypeRequiredDescription
taskIDstringYesUnique identifier for the task, returned by TaskRun API

Return Value

TaskCancel API returns a TaskCancelOutput object:

FieldTypeDescription
successbooleanIndicates whether the task was successfully canceled

Error Handling

TaskCancel API may encounter the following error situations:

  • Task does not exist: When the provided taskID does not exist, returns { success: false }
  • Task already completed: When the task has already completed or been canceled, it cannot be canceled again

Usage Example

import { TaskCancelAPI } from '@flowgram.ai/runtime-js';

async function cancelTask(taskID) {
  try {
    const result = await TaskCancelAPI({ taskID });

    if (result.success) {
      console.log('Task successfully canceled');
    } else {
      console.log('Failed to cancel task, task may not exist or is already completed');
    }

    return result.success;
  } catch (error) {
    console.error('Failed to cancel task:', error);
    return false;
  }
}

Notes

  • Task cancellation is asynchronous, after a successful cancellation request, the task may take some time to completely stop
  • Tasks that have already completed cannot be canceled
  • After canceling a task, you can check the final status of the task through TaskReport API, the status of a canceled task will change to 'canceled'
  • Canceling a task does not clear the intermediate results of the task, you can still get the results of the executed part through TaskResult API

TaskResult API

Function Description

The TaskResult API is used to get the final result of a workflow task.

Parameters

TaskResult API accepts a TaskResultInput object as its parameter:

ParameterTypeRequiredDescription
taskIDstringYesUnique identifier for the task, returned by TaskRun API

Return Value

TaskResult API returns a WorkflowOutputs object containing the output results of the workflow:

type WorkflowOutputs = Record<string, any>;

The structure of the returned object depends on the specific implementation and output definition of the workflow.

Error Handling

TaskResult API may encounter the following error situations:

  • Task does not exist: When the provided taskID does not exist, returns undefined
  • Task not completed: When the task has not terminated, returns undefined
  • Result retrieval error: When an error occurs during result retrieval

Usage Example

import { TaskResultAPI } from '@flowgram.ai/runtime-js';

async function getTaskResult(taskID) {
  try {
    const result = await TaskResultAPI({ taskID });

    if (!result) {
      console.log('Task does not exist or is not yet completed');
      return;
    }

    console.log('Task result:', result);
    return result;
  } catch (error) {
    console.error('Failed to get task result:', error);
  }
}

// Usage example: wait for task to complete and get result
async function waitForResult(taskID, pollingInterval = 1000, timeout = 60000) {
  const startTime = Date.now();

  while (Date.now() - startTime < timeout) {
    // Get task report
    const report = await TaskReportAPI({ taskID });

    // If task has terminated, get result
    if (report && report.workflow.terminated) {
      return await TaskResultAPI({ taskID });
    }

    // Wait for a while before checking again
    await new Promise(resolve => setTimeout(resolve, pollingInterval));
  }

  throw new Error('Timeout waiting for task result');
}

Notes

  • Results can only be obtained when the task has terminated (completed, failed, or canceled)
  • If the task has not yet completed, TaskResult API will return undefined
  • It is recommended to check if the task has terminated through TaskReport API before calling TaskResult API to get the result
  • For canceled tasks, only partial results or no results may be available
  • The specific structure of the result depends on the definition of the workflow, and needs to be parsed according to the actual output of the workflow

TaskValidate API

Function Description

The TaskValidate API is used to validate the validity of workflow schema and input parameters before actually running the workflow. This API helps you discover potential configuration errors before starting the workflow.

Parameters

TaskValidate API accepts a TaskValidateInput object as its parameter:

ParameterTypeRequiredDescription
schemastringYesJSON string of the workflow schema, defining nodes and edges
inputsobjectNoInitial input parameters for the workflow, used to validate if inputs meet schema requirements

Return Value

TaskValidate API returns a TaskValidateOutput object:

FieldTypeDescription
validbooleanIndicates whether validation passed, true means validation succeeded, false means validation failed
errorsstring[]Optional field, contains specific error message list when validation fails

Validation Content

TaskValidate API validates the following content:

  • Schema Structure Validation: Checks if the schema conforms to WorkflowSchema format requirements
  • Node Type Validation: Validates if node types in the schema are supported
  • Edge Connection Validation: Checks if connections between nodes are correct
  • Input Parameter Validation: Validates if provided inputs meet the input requirements defined in the schema
  • Workflow Integrity Validation: Checks if the workflow contains necessary start and end nodes

Error Handling

TaskValidate API may return the following types of validation errors:

  • Schema parsing error: When the provided schema is not a valid JSON string
  • Schema structure error: When the schema structure does not match the expected format
  • Node configuration error: When node configuration is incomplete or incorrect
  • Connection error: When there are issues with connections between nodes
  • Input parameter error: When input parameters do not meet requirements

Usage Example

import { TaskValidateAPI } from '@flowgram.ai/runtime-js';

const schema = JSON.stringify({
  nodes: [
    {
      id: 'start',
      type: 'start',
      meta: { position: { x: 0, y: 0 } },
      data: {
        outputs: {
          type: 'object',
          properties: {
            userInput: {
              type: 'string',
              extra: {
                index: 0,
              },
            }
          },
          required: ['userInput'],
        },
      }
    },
    {
      id: 'llm',
      type: 'llm',
      meta: { position: { x: 200, y: 0 } },
      data: {
        title: 'LLM_0',
        inputsValues: {
          prompt: {
            type: 'ref',
            content: ['start_0', 'userInput'],
          }
        },
        inputs: {
          type: 'object',
          required: ['editor'],
          properties: {
            prompt: {
              type: 'string',
              extra: {
                formComponent: 'prompt-editor',
              },
            },
          },
        },
        outputs: {
          type: 'object',
          properties: {
            result: {
              type: 'string',
            },
          },
        },
      },
    },
    {
      id: 'end',
      type: 'end',
      meta: { position: { x: 400, y: 0 } },
      data: {}
    }
  ],
  edges: [
    {
      sourceNodeID: 'start',
      sourcePort: 'out',
      targetNodeID: 'llm',
      targetPort: 'in'
    },
    {
      sourceNodeID: 'llm',
      sourcePort: 'out',
      targetNodeID: 'end',
      targetPort: 'in'
    }
  ]
});

const inputs = {
  userInput: 'Please introduce yourself'
};

async function validateWorkflow() {
  try {
    const result = await TaskValidateAPI({
      schema,
      inputs
    });

    if (result.valid) {
      console.log('Workflow validation passed, safe to run');
      return true;
    } else {
      console.error('Workflow validation failed:');
      result.errors?.forEach(error => {
        console.error('- ' + error);
      });
      return false;
    }
  } catch (error) {
    console.error('Error occurred during validation:', error);
    return false;
  }
}

// Validate before running the workflow
async function safeRunWorkflow() {
  const isValid = await validateWorkflow();
  if (isValid) {
    // Run the workflow after validation passes
    const runResult = await TaskRunAPI({ schema, inputs });
    console.log('Workflow started, Task ID:', runResult.taskID);
  } else {
    console.log('Workflow validation failed, please fix errors and retry');
  }
}

Notes

  • It is recommended to call TaskValidate API before calling TaskRun API for validation
  • Passing validation does not guarantee that the workflow will not encounter errors during runtime, but it can identify most configuration issues in advance
  • TaskValidate API typically executes faster than TaskRun API, making it suitable for real-time validation
  • Error messages in validation results can help quickly locate and fix issues
  • For complex workflows, it is recommended to use this API frequently during development for validation