WorkflowTools API Reference
The WorkflowTools
class provides essential utilities for workflow manipulation, port discovery, workflow stitching, and graph analysis. It's the engine behind many of Discomfort's core features.
Class Overview
from custom_nodes.discomfort.discomfort import Discomfort
# Access through Discomfort instance (recommended)
discomfort = await Discomfort.create()
tools = discomfort.Tools
# Or import directly
from custom_nodes.discomfort.workflow_tools import WorkflowTools
tools = WorkflowTools()
Image Utilities
open_image_as_tensor(image_path)
Loads an image file as a ComfyUI-compatible tensor.
Parameters
image_path
(str) - Path to the image file
Returns
torch.Tensor
- Image tensor in ComfyUI format[batch, height, width, channels]
Example
# Load an image for workflow input
image = tools.open_image_as_tensor("input/my_photo.jpg")
print(f"Image shape: {image.shape}") # [1, 512, 512, 3]
# Use in workflow
with discomfort.Context() as context:
context.save("input_image", image)
await discomfort.run(["img2img_workflow.json"], context=context)
Notes
- Automatically converts to RGB format
- Normalizes pixel values to [0, 1] range
- Adds batch dimension if not present
- Supported formats: PNG, JPG, JPEG, BMP, TIFF
save_comfy_image_to_disk(tensor, output_path)
Saves a ComfyUI image tensor to disk.
Parameters
tensor
(torch.Tensor) - Image tensor from ComfyUI workflowoutput_path
(str) - Path where to save the image
Returns
- None
Example
# Save workflow output
results = await discomfort.run(["workflow.json"])
if "output_image" in results:
image_tensor = results["output_image"]
tools.save_comfy_image_to_disk(image_tensor, "outputs/result.png")
print("Image saved successfully!")
Notes
- Handles tensor format conversion automatically
- Clamps values to valid range [0, 255]
- Creates output directory if it doesn't exist
- Supports PNG, JPG output formats
Workflow Analysis
discover_port_nodes(workflow_path)
Analyzes a workflow to identify DiscomfortPort nodes and their types.
Parameters
workflow_path
(str) - Path to workflow JSON file
Returns
Dict[str, Any]
- Dictionary containing port analysis:{
"inputs": {unique_id: port_info, ...},
"outputs": {unique_id: port_info, ...},
"passthrus": {unique_id: port_info, ...},
"execution_order": [node_id, ...],
"nodes": {node_id: node_data, ...},
"links": [link_data, ...],
"unique_id_to_node": {unique_id: node_id, ...}
}
Example
# Analyze a workflow
ports = tools.discover_port_nodes("my_workflow.json")
# Check available inputs
print("Available inputs:")
for uid, info in ports["inputs"].items():
print(f" {uid}: {info['type']} (node {info['node_id']})")
# Check available outputs
print("Available outputs:")
for uid, info in ports["outputs"].items():
print(f" {uid}: {info['type']} (node {info['node_id']})")
# Validate workflow structure
if not ports["inputs"]:
print("Warning: No input ports found!")
Port Info Structure
Each port info contains:
{
"node_id": 123, # Node ID in workflow
"tags": ["tag1", "tag2"], # User-defined tags
"type": "IMAGE", # Inferred data type
"input_type": "IMAGE", # Type from input connection
"output_type": "IMAGE" # Type from output connection
}
validate_workflow(workflow)
Validates workflow structure for compatibility with Discomfort.
Parameters
workflow
(Dict[str, Any]) - Workflow JSON data
Returns
bool
- True if valid, False otherwise
Example
import json
# Load and validate workflow
with open("workflow.json", "r") as f:
workflow_data = json.load(f)
if tools.validate_workflow(workflow_data):
print("✅ Workflow is valid")
else:
print("❌ Workflow validation failed")
Validation Checks
- Required keys:
nodes
,links
- Link integrity (valid source/target nodes)
- Proper link format
[link_id, source_id, source_slot, target_id, target_slot, type]
- Node ID consistency
Workflow Manipulation
stitch_workflows(workflows, delete_input_ports=False, delete_output_ports=False)
Combines multiple workflows into a single executable workflow.
Parameters
workflows
(List[Union[str, dict]]) - List of workflow paths or objectsdelete_input_ports
(bool, optional) - Remove unconnected input ports from resultdelete_output_ports
(bool, optional) - Remove unconnected output ports from result
Returns
Dict[str, Any]
- Stitching result:{
"stitched_workflow": workflow_data,
"inputs": {unique_id: port_info, ...},
"outputs": {unique_id: port_info, ...},
"execution_order": [node_id, ...]
}
Example
# Basic workflow stitching
workflows = [
"load_model.json", # Loads model and outputs "model", "clip", "vae"
"prepare_latent.json", # Uses "model" input, outputs "latent"
"ksampler.json" # Uses "model", "latent" inputs, outputs "image"
]
result = tools.stitch_workflows(workflows)
stitched = result["stitched_workflow"]
# Save stitched workflow
with open("stitched_complete.json", "w") as f:
json.dump(stitched, f, indent=2)
# Use stitched workflow
await discomfort.run([stitched])
Advanced Example
# Stitch with port cleanup
result = tools.stitch_workflows(
workflows=["partial1.json", "partial2.json", "partial3.json"],
delete_input_ports=True, # Remove unused inputs
delete_output_ports=True # Remove unused outputs
)
stitched = result["stitched_workflow"]
print(f"Final inputs: {list(result['inputs'].keys())}")
print(f"Final outputs: {list(result['outputs'].keys())}")
# Use cleaned workflow
await discomfort.run([stitched], inputs={"prompt": "A beautiful scene"})
How Stitching Works
- Renumbers all nodes and links to avoid conflicts
- Connects workflows via shared
unique_id
values - Preserves execution order through topological sorting
- Validates connections between compatible data types
Advanced Workflow Operations
remove_reroute_nodes(nodes, links)
Removes Reroute nodes and rewires connections directly.
Parameters
nodes
(Dict[int, dict]) - Dictionary of node data by IDlinks
(List[list]) - List of link data
Returns
Tuple[Dict, List, Dict]
-(clean_nodes, clean_links, link_id_map)
Example
import json
# Load workflow
with open("workflow_with_reroutes.json", "r") as f:
workflow = json.load(f)
nodes_dict = {node["id"]: node for node in workflow["nodes"]}
links_list = workflow["links"]
# Remove reroutes
clean_nodes, clean_links, link_map = tools.remove_reroute_nodes(nodes_dict, links_list)
# Create clean workflow
clean_workflow = {
"nodes": list(clean_nodes.values()),
"links": clean_links,
**{k: v for k, v in workflow.items() if k not in ["nodes", "links"]}
}
print(f"Removed {len(nodes_dict) - len(clean_nodes)} reroute nodes")
Internal Methods
_get_workflow_with_reroutes_removed(workflow)
Returns a clean workflow with all Reroute nodes removed.
Parameters
workflow
(Dict[str, Any]) - Original workflow data
Returns
Dict[str, Any]
- Cleaned workflow
Example
# Internal usage (typically called automatically)
clean_workflow = tools._get_workflow_with_reroutes_removed(original_workflow)
_load_pass_by_rules()
Loads pass-by-reference rules from configuration file.
Returns
Dict[str, str]
- Mapping of data types to pass-by methods
Example
# View current rules
rules = tools.pass_by_rules
for data_type, method in rules.items():
print(f"{data_type}: {method}")
# Output:
# MODEL: ref
# CLIP: ref
# VAE: ref
# IMAGE: val
# LATENT: val
_discover_context_handlers(workflow)
Discovers internal DiscomfortContextLoader/Saver nodes in workflows.
Parameters
workflow
(Dict[str, Any]) - Workflow data
Returns
Dict[str, List[Dict]]
- Found handlers:{
"loaders": [{"node_id": 123, "unique_id": "model"}, ...],
"savers": [{"node_id": 456, "unique_id": "output"}, ...]
}
_prune_workflow_to_output(workflow, target_output_unique_id)
Creates a minimal workflow that generates only a specific output.
Parameters
workflow
(Dict[str, Any]) - Original workflowtarget_output_unique_id
(str) - The output to preserve
Returns
Dict[str, Any]
- Pruned workflow containing only necessary nodes
Example
# Create minimal workflow for specific output
original = json.load(open("complex_workflow.json"))
minimal = tools._prune_workflow_to_output(original, "final_image")
print(f"Original: {len(original['nodes'])} nodes")
print(f"Minimal: {len(minimal['nodes'])} nodes")
# Save minimal workflow for reference storage
context.save("model_ref", minimal, pass_by="ref")
_prepare_prompt_for_contextual_run(prompt, port_info, context, pass_by_behaviors, handlers_info=None)
Converts a prompt for contextual execution by swapping DiscomfortPorts.
Parameters
prompt
(Dict[str, Any]) - ComfyUI prompt dataport_info
(Dict[str, Any]) - Port discovery resultscontext
(WorkflowContext) - Active context instancepass_by_behaviors
(Dict[str, str]) - Pass-by rules for each unique_idhandlers_info
(Dict, optional) - Handler discovery results
Returns
Dict[str, Any]
- Modified prompt with context loaders/savers
Configuration and Rules
pass_by_rules
Property
Dictionary defining which data types use pass-by-value vs pass-by-reference.
Structure
{
"MODEL": "ref", # Models stored as workflow graphs
"CLIP": "ref", # CLIP encoders as workflow graphs
"VAE": "ref", # VAE as workflow graphs
"CONDITIONING": "ref", # Conditioning as workflow graphs
"CONTROL_NET": "ref", # ControlNet as workflow graphs
"IMAGE": "val", # Images as direct tensor data
"LATENT": "val", # Latents as direct tensor data
"MASK": "val", # Masks as direct tensor data
"STRING": "val", # Text as direct string data
"INT": "val", # Integers as direct values
"FLOAT": "val", # Floats as direct values
"BOOLEAN": "val", # Booleans as direct values
"ANY": "val" # Default to pass-by-value
}
Usage
# Check how a type is handled
image_method = tools.pass_by_rules.get("IMAGE", "val")
print(f"Images are passed by {image_method}") # "val"
model_method = tools.pass_by_rules.get("MODEL", "val")
print(f"Models are passed by {model_method}") # "ref"
Complete Example: Workflow Pipeline
import asyncio
from custom_nodes.discomfort.discomfort import Discomfort
import json
async def build_and_run_pipeline():
"""Example of using WorkflowTools for a complete pipeline."""
discomfort = await Discomfort.create()
tools = discomfort.Tools
try:
# 1. Load and validate individual workflows
workflows = ["load_model.json", "img2img.json", "upscale.json"]
for wf_path in workflows:
# Validate each workflow
with open(wf_path, "r") as f:
wf_data = json.load(f)
if not tools.validate_workflow(wf_data):
raise ValueError(f"Invalid workflow: {wf_path}")
# Analyze ports
ports = tools.discover_port_nodes(wf_path)
print(f"{wf_path}:")
print(f" Inputs: {list(ports['inputs'].keys())}")
print(f" Outputs: {list(ports['outputs'].keys())}")
# 2. Stitch workflows together
print("\nStitching workflows...")
stitch_result = tools.stitch_workflows(workflows)
stitched_workflow = stitch_result["stitched_workflow"]
# Save stitched workflow
with open("pipeline_stitched.json", "w") as f:
json.dump(stitched_workflow, f, indent=2)
# 3. Prepare input data
input_image = tools.open_image_as_tensor("input/photo.jpg")
# 4. Run the pipeline
with discomfort.Context() as context:
inputs = {
"input_image": input_image,
"prompt": "A beautiful enhanced photo, 4k, detailed",
"model_name": "my_model.safetensors",
"upscale_factor": 2.0
}
results = await discomfort.run([stitched_workflow], inputs=inputs, context=context)
# 5. Save outputs
if "upscaled_image" in results:
output_image = results["upscaled_image"]
tools.save_comfy_image_to_disk(output_image, "output/enhanced.png")
print("Pipeline completed successfully!")
# 6. Check memory usage
usage = context.get_usage()
print(f"Memory used: {usage['ram_usage_gb']:.1f}GB")
finally:
await discomfort.shutdown()
if __name__ == "__main__":
asyncio.run(build_and_run_pipeline())
Error Handling
Common Exceptions
ValueError
- Invalid workflow structure
- Missing required workflow keys
- Invalid pass-by rules configuration
FileNotFoundError
- Workflow files don't exist
- Pass-by rules configuration missing
- Image files not found
KeyError
- Target output not found in workflow
- Missing unique_id in port discovery
NetworkXUnfeasible
- Workflow contains cycles (from
stitch_workflows
) - Invalid execution order
Best Practices
- Always validate workflows before stitching or execution
- Check port discovery results before assuming inputs/outputs exist
- Handle file I/O errors when loading workflows or images
- Monitor memory usage when processing large images
- Use descriptive unique_ids for better debugging
Next Steps
- WorkflowContext API - Data management and persistence
- ComfyConnector API - Server management and communication
- Discomfort Class - Main orchestration API
- Core Concepts - Understanding DiscomfortPorts and Context