1
mirror of https://github.com/comfyanonymous/ComfyUI.git synced 2025-08-02 15:04:50 +08:00

Execution Model Inversion (#2666)

* Execution Model Inversion

This PR inverts the execution model -- from recursively calling nodes to
using a topological sort of the nodes. This change allows for
modification of the node graph during execution. This allows for two
major advantages:

    1. The implementation of lazy evaluation in nodes. For example, if a
    "Mix Images" node has a mix factor of exactly 0.0, the second image
    input doesn't even need to be evaluated (and visa-versa if the mix
    factor is 1.0).

    2. Dynamic expansion of nodes. This allows for the creation of dynamic
    "node groups". Specifically, custom nodes can return subgraphs that
    replace the original node in the graph. This is an incredibly
    powerful concept. Using this functionality, it was easy to
    implement:
        a. Components (a.k.a. node groups)
        b. Flow control (i.e. while loops) via tail recursion
        c. All-in-one nodes that replicate the WebUI functionality
        d. and more
    All of those were able to be implemented entirely via custom nodes,
    so those features are *not* a part of this PR. (There are some
    front-end changes that should occur before that functionality is
    made widely available, particularly around variant sockets.)

The custom nodes associated with this PR can be found at:
https://github.com/BadCafeCode/execution-inversion-demo-comfyui

Note that some of them require that variant socket types ("*") be
enabled.

* Allow `input_info` to be of type `None`

* Handle errors (like OOM) more gracefully

* Add a command-line argument to enable variants

This allows the use of nodes that have sockets of type '*' without
applying a patch to the code.

* Fix an overly aggressive assertion.

This could happen when attempting to evaluate `IS_CHANGED` for a node
during the creation of the cache (in order to create the cache key).

* Fix Pyright warnings

* Add execution model unit tests

* Fix issue with unused literals

Behavior should now match the master branch with regard to undeclared
inputs. Undeclared inputs that are socket connections will be used while
undeclared inputs that are literals will be ignored.

* Make custom VALIDATE_INPUTS skip normal validation

Additionally, if `VALIDATE_INPUTS` takes an argument named `input_types`,
that variable will be a dictionary of the socket type of all incoming
connections. If that argument exists, normal socket type validation will
not occur. This removes the last hurdle for enabling variant types
entirely from custom nodes, so I've removed that command-line option.

I've added appropriate unit tests for these changes.

* Fix example in unit test

This wouldn't have caused any issues in the unit test, but it would have
bugged the UI if someone copy+pasted it into their own node pack.

* Use fstrings instead of '%' formatting syntax

* Use custom exception types.

* Display an error for dependency cycles

Previously, dependency cycles that were created during node expansion
would cause the application to quit (due to an uncaught exception). Now,
we'll throw a proper error to the UI. We also make an attempt to 'blame'
the most relevant node in the UI.

* Add docs on when ExecutionBlocker should be used

* Remove unused functionality

* Rename ExecutionResult.SLEEPING to PENDING

* Remove superfluous function parameter

* Pass None for uneval inputs instead of default

This applies to `VALIDATE_INPUTS`, `check_lazy_status`, and lazy values
in evaluation functions.

* Add a test for mixed node expansion

This test ensures that a node that returns a combination of expanded
subgraphs and literal values functions correctly.

* Raise exception for bad get_node calls.

* Minor refactor of IsChangedCache.get

* Refactor `map_node_over_list` function

* Fix ui output for duplicated nodes

* Add documentation on `check_lazy_status`

* Add file for execution model unit tests

* Clean up Javascript code as per review

* Improve documentation

Converted some comments to docstrings as per review

* Add a new unit test for mixed lazy results

This test validates that when an output list is fed to a lazy node, the
node will properly evaluate previous nodes that are needed by any inputs
to the lazy node.

No code in the execution model has been changed. The test already
passes.

* Allow kwargs in VALIDATE_INPUTS functions

When kwargs are used, validation is skipped for all inputs as if they
had been mentioned explicitly.

* List cached nodes in `execution_cached` message

This was previously just bugged in this PR.
This commit is contained in:
guill
2024-08-15 08:21:11 -07:00
committed by GitHub
parent 0f9c2a7822
commit 5cfe38f41c
23 changed files with 2811 additions and 278 deletions

View File

@@ -0,0 +1,4 @@
# Config for testing nodes
testing:
custom_nodes: tests/inference/testing_nodes

View File

@@ -0,0 +1,461 @@
from io import BytesIO
import numpy
from PIL import Image
import pytest
from pytest import fixture
import time
import torch
from typing import Union, Dict
import json
import subprocess
import websocket #NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
import uuid
import urllib.request
import urllib.parse
import urllib.error
from comfy.graph_utils import GraphBuilder, Node
class RunResult:
def __init__(self, prompt_id: str):
self.outputs: Dict[str,Dict] = {}
self.runs: Dict[str,bool] = {}
self.prompt_id: str = prompt_id
def get_output(self, node: Node):
return self.outputs.get(node.id, None)
def did_run(self, node: Node):
return self.runs.get(node.id, False)
def get_images(self, node: Node):
output = self.get_output(node)
if output is None:
return []
return output.get('image_objects', [])
def get_prompt_id(self):
return self.prompt_id
class ComfyClient:
def __init__(self):
self.test_name = ""
def connect(self,
listen:str = '127.0.0.1',
port:Union[str,int] = 8188,
client_id: str = str(uuid.uuid4())
):
self.client_id = client_id
self.server_address = f"{listen}:{port}"
ws = websocket.WebSocket()
ws.connect("ws://{}/ws?clientId={}".format(self.server_address, self.client_id))
self.ws = ws
def queue_prompt(self, prompt):
p = {"prompt": prompt, "client_id": self.client_id}
data = json.dumps(p).encode('utf-8')
req = urllib.request.Request("http://{}/prompt".format(self.server_address), data=data)
return json.loads(urllib.request.urlopen(req).read())
def get_image(self, filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
with urllib.request.urlopen("http://{}/view?{}".format(self.server_address, url_values)) as response:
return response.read()
def get_history(self, prompt_id):
with urllib.request.urlopen("http://{}/history/{}".format(self.server_address, prompt_id)) as response:
return json.loads(response.read())
def set_test_name(self, name):
self.test_name = name
def run(self, graph):
prompt = graph.finalize()
for node in graph.nodes.values():
if node.class_type == 'SaveImage':
node.inputs['filename_prefix'] = self.test_name
prompt_id = self.queue_prompt(prompt)['prompt_id']
result = RunResult(prompt_id)
while True:
out = self.ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'executing':
data = message['data']
if data['prompt_id'] != prompt_id:
continue
if data['node'] is None:
break
result.runs[data['node']] = True
elif message['type'] == 'execution_error':
raise Exception(message['data'])
elif message['type'] == 'execution_cached':
pass # Probably want to store this off for testing
history = self.get_history(prompt_id)[prompt_id]
for o in history['outputs']:
for node_id in history['outputs']:
node_output = history['outputs'][node_id]
result.outputs[node_id] = node_output
if 'images' in node_output:
images_output = []
for image in node_output['images']:
image_data = self.get_image(image['filename'], image['subfolder'], image['type'])
image_obj = Image.open(BytesIO(image_data))
images_output.append(image_obj)
node_output['image_objects'] = images_output
return result
#
# Loop through these variables
#
@pytest.mark.execution
class TestExecution:
#
# Initialize server and client
#
@fixture(scope="class", autouse=True, params=[
# (use_lru, lru_size)
(False, 0),
(True, 0),
(True, 100),
])
def _server(self, args_pytest, request):
# Start server
pargs = [
'python','main.py',
'--output-directory', args_pytest["output_dir"],
'--listen', args_pytest["listen"],
'--port', str(args_pytest["port"]),
'--extra-model-paths-config', 'tests/inference/extra_model_paths.yaml',
]
use_lru, lru_size = request.param
if use_lru:
pargs += ['--cache-lru', str(lru_size)]
print("Running server with args:", pargs)
p = subprocess.Popen(pargs)
yield
p.kill()
torch.cuda.empty_cache()
def start_client(self, listen:str, port:int):
# Start client
comfy_client = ComfyClient()
# Connect to server (with retries)
n_tries = 5
for i in range(n_tries):
time.sleep(4)
try:
comfy_client.connect(listen=listen, port=port)
except ConnectionRefusedError as e:
print(e)
print(f"({i+1}/{n_tries}) Retrying...")
else:
break
return comfy_client
@fixture(scope="class", autouse=True)
def shared_client(self, args_pytest, _server):
client = self.start_client(args_pytest["listen"], args_pytest["port"])
yield client
del client
torch.cuda.empty_cache()
@fixture
def client(self, shared_client, request):
shared_client.set_test_name(f"execution[{request.node.name}]")
yield shared_client
@fixture
def builder(self, request):
yield GraphBuilder(prefix=request.node.name)
def test_lazy_input(self, client: ComfyClient, builder: GraphBuilder):
g = builder
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
input2 = g.node("StubImage", content="WHITE", height=512, width=512, batch_size=1)
mask = g.node("StubMask", value=0.0, height=512, width=512, batch_size=1)
lazy_mix = g.node("TestLazyMixImages", image1=input1.out(0), image2=input2.out(0), mask=mask.out(0))
output = g.node("SaveImage", images=lazy_mix.out(0))
result = client.run(g)
result_image = result.get_images(output)[0]
assert numpy.array(result_image).any() == 0, "Image should be black"
assert result.did_run(input1)
assert not result.did_run(input2)
assert result.did_run(mask)
assert result.did_run(lazy_mix)
def test_full_cache(self, client: ComfyClient, builder: GraphBuilder):
g = builder
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
input2 = g.node("StubImage", content="NOISE", height=512, width=512, batch_size=1)
mask = g.node("StubMask", value=0.5, height=512, width=512, batch_size=1)
lazy_mix = g.node("TestLazyMixImages", image1=input1.out(0), image2=input2.out(0), mask=mask.out(0))
g.node("SaveImage", images=lazy_mix.out(0))
client.run(g)
result2 = client.run(g)
for node_id, node in g.nodes.items():
assert not result2.did_run(node), f"Node {node_id} ran, but should have been cached"
def test_partial_cache(self, client: ComfyClient, builder: GraphBuilder):
g = builder
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
input2 = g.node("StubImage", content="NOISE", height=512, width=512, batch_size=1)
mask = g.node("StubMask", value=0.5, height=512, width=512, batch_size=1)
lazy_mix = g.node("TestLazyMixImages", image1=input1.out(0), image2=input2.out(0), mask=mask.out(0))
g.node("SaveImage", images=lazy_mix.out(0))
client.run(g)
mask.inputs['value'] = 0.4
result2 = client.run(g)
assert not result2.did_run(input1), "Input1 should have been cached"
assert not result2.did_run(input2), "Input2 should have been cached"
def test_error(self, client: ComfyClient, builder: GraphBuilder):
g = builder
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
# Different size of the two images
input2 = g.node("StubImage", content="NOISE", height=256, width=256, batch_size=1)
mask = g.node("StubMask", value=0.5, height=512, width=512, batch_size=1)
lazy_mix = g.node("TestLazyMixImages", image1=input1.out(0), image2=input2.out(0), mask=mask.out(0))
g.node("SaveImage", images=lazy_mix.out(0))
try:
client.run(g)
assert False, "Should have raised an error"
except Exception as e:
assert 'prompt_id' in e.args[0], f"Did not get back a proper error message: {e}"
@pytest.mark.parametrize("test_value, expect_error", [
(5, True),
("foo", True),
(5.0, False),
])
def test_validation_error_literal(self, test_value, expect_error, client: ComfyClient, builder: GraphBuilder):
g = builder
validation1 = g.node("TestCustomValidation1", input1=test_value, input2=3.0)
g.node("SaveImage", images=validation1.out(0))
if expect_error:
with pytest.raises(urllib.error.HTTPError):
client.run(g)
else:
client.run(g)
@pytest.mark.parametrize("test_type, test_value", [
("StubInt", 5),
("StubFloat", 5.0)
])
def test_validation_error_edge1(self, test_type, test_value, client: ComfyClient, builder: GraphBuilder):
g = builder
stub = g.node(test_type, value=test_value)
validation1 = g.node("TestCustomValidation1", input1=stub.out(0), input2=3.0)
g.node("SaveImage", images=validation1.out(0))
with pytest.raises(urllib.error.HTTPError):
client.run(g)
@pytest.mark.parametrize("test_type, test_value, expect_error", [
("StubInt", 5, True),
("StubFloat", 5.0, False)
])
def test_validation_error_edge2(self, test_type, test_value, expect_error, client: ComfyClient, builder: GraphBuilder):
g = builder
stub = g.node(test_type, value=test_value)
validation2 = g.node("TestCustomValidation2", input1=stub.out(0), input2=3.0)
g.node("SaveImage", images=validation2.out(0))
if expect_error:
with pytest.raises(urllib.error.HTTPError):
client.run(g)
else:
client.run(g)
@pytest.mark.parametrize("test_type, test_value, expect_error", [
("StubInt", 5, True),
("StubFloat", 5.0, False)
])
def test_validation_error_edge3(self, test_type, test_value, expect_error, client: ComfyClient, builder: GraphBuilder):
g = builder
stub = g.node(test_type, value=test_value)
validation3 = g.node("TestCustomValidation3", input1=stub.out(0), input2=3.0)
g.node("SaveImage", images=validation3.out(0))
if expect_error:
with pytest.raises(urllib.error.HTTPError):
client.run(g)
else:
client.run(g)
@pytest.mark.parametrize("test_type, test_value, expect_error", [
("StubInt", 5, True),
("StubFloat", 5.0, False)
])
def test_validation_error_edge4(self, test_type, test_value, expect_error, client: ComfyClient, builder: GraphBuilder):
g = builder
stub = g.node(test_type, value=test_value)
validation4 = g.node("TestCustomValidation4", input1=stub.out(0), input2=3.0)
g.node("SaveImage", images=validation4.out(0))
if expect_error:
with pytest.raises(urllib.error.HTTPError):
client.run(g)
else:
client.run(g)
@pytest.mark.parametrize("test_value1, test_value2, expect_error", [
(0.0, 0.5, False),
(0.0, 5.0, False),
(0.0, 7.0, True)
])
def test_validation_error_kwargs(self, test_value1, test_value2, expect_error, client: ComfyClient, builder: GraphBuilder):
g = builder
validation5 = g.node("TestCustomValidation5", input1=test_value1, input2=test_value2)
g.node("SaveImage", images=validation5.out(0))
if expect_error:
with pytest.raises(urllib.error.HTTPError):
client.run(g)
else:
client.run(g)
def test_cycle_error(self, client: ComfyClient, builder: GraphBuilder):
g = builder
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
input2 = g.node("StubImage", content="WHITE", height=512, width=512, batch_size=1)
mask = g.node("StubMask", value=0.5, height=512, width=512, batch_size=1)
lazy_mix1 = g.node("TestLazyMixImages", image1=input1.out(0), mask=mask.out(0))
lazy_mix2 = g.node("TestLazyMixImages", image1=lazy_mix1.out(0), image2=input2.out(0), mask=mask.out(0))
g.node("SaveImage", images=lazy_mix2.out(0))
# When the cycle exists on initial submission, it should raise a validation error
with pytest.raises(urllib.error.HTTPError):
client.run(g)
def test_dynamic_cycle_error(self, client: ComfyClient, builder: GraphBuilder):
g = builder
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
input2 = g.node("StubImage", content="WHITE", height=512, width=512, batch_size=1)
generator = g.node("TestDynamicDependencyCycle", input1=input1.out(0), input2=input2.out(0))
g.node("SaveImage", images=generator.out(0))
# When the cycle is in a graph that is generated dynamically, it should raise a runtime error
try:
client.run(g)
assert False, "Should have raised an error"
except Exception as e:
assert 'prompt_id' in e.args[0], f"Did not get back a proper error message: {e}"
assert e.args[0]['node_id'] == generator.id, "Error should have been on the generator node"
def test_custom_is_changed(self, client: ComfyClient, builder: GraphBuilder):
g = builder
# Creating the nodes in this specific order previously caused a bug
save = g.node("SaveImage")
is_changed = g.node("TestCustomIsChanged", should_change=False)
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
save.set_input('images', is_changed.out(0))
is_changed.set_input('image', input1.out(0))
result1 = client.run(g)
result2 = client.run(g)
is_changed.set_input('should_change', True)
result3 = client.run(g)
result4 = client.run(g)
assert result1.did_run(is_changed), "is_changed should have been run"
assert not result2.did_run(is_changed), "is_changed should have been cached"
assert result3.did_run(is_changed), "is_changed should have been re-run"
assert result4.did_run(is_changed), "is_changed should not have been cached"
def test_undeclared_inputs(self, client: ComfyClient, builder: GraphBuilder):
g = builder
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
input2 = g.node("StubImage", content="WHITE", height=512, width=512, batch_size=1)
input3 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
input4 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
average = g.node("TestVariadicAverage", input1=input1.out(0), input2=input2.out(0), input3=input3.out(0), input4=input4.out(0))
output = g.node("SaveImage", images=average.out(0))
result = client.run(g)
result_image = result.get_images(output)[0]
expected = 255 // 4
assert numpy.array(result_image).min() == expected and numpy.array(result_image).max() == expected, "Image should be grey"
def test_for_loop(self, client: ComfyClient, builder: GraphBuilder):
g = builder
iterations = 4
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
input2 = g.node("StubImage", content="WHITE", height=512, width=512, batch_size=1)
is_changed = g.node("TestCustomIsChanged", should_change=True, image=input2.out(0))
for_open = g.node("TestForLoopOpen", remaining=iterations, initial_value1=is_changed.out(0))
average = g.node("TestVariadicAverage", input1=input1.out(0), input2=for_open.out(2))
for_close = g.node("TestForLoopClose", flow_control=for_open.out(0), initial_value1=average.out(0))
output = g.node("SaveImage", images=for_close.out(0))
for iterations in range(1, 5):
for_open.set_input('remaining', iterations)
result = client.run(g)
result_image = result.get_images(output)[0]
expected = 255 // (2 ** iterations)
assert numpy.array(result_image).min() == expected and numpy.array(result_image).max() == expected, "Image should be grey"
assert result.did_run(is_changed)
def test_mixed_expansion_returns(self, client: ComfyClient, builder: GraphBuilder):
g = builder
val_list = g.node("TestMakeListNode", value1=0.1, value2=0.2, value3=0.3)
mixed = g.node("TestMixedExpansionReturns", input1=val_list.out(0))
output_dynamic = g.node("SaveImage", images=mixed.out(0))
output_literal = g.node("SaveImage", images=mixed.out(1))
result = client.run(g)
images_dynamic = result.get_images(output_dynamic)
assert len(images_dynamic) == 3, "Should have 2 images"
assert numpy.array(images_dynamic[0]).min() == 25 and numpy.array(images_dynamic[0]).max() == 25, "First image should be 0.1"
assert numpy.array(images_dynamic[1]).min() == 51 and numpy.array(images_dynamic[1]).max() == 51, "Second image should be 0.2"
assert numpy.array(images_dynamic[2]).min() == 76 and numpy.array(images_dynamic[2]).max() == 76, "Third image should be 0.3"
images_literal = result.get_images(output_literal)
assert len(images_literal) == 3, "Should have 2 images"
for i in range(3):
assert numpy.array(images_literal[i]).min() == 255 and numpy.array(images_literal[i]).max() == 255, "All images should be white"
def test_mixed_lazy_results(self, client: ComfyClient, builder: GraphBuilder):
g = builder
val_list = g.node("TestMakeListNode", value1=0.0, value2=0.5, value3=1.0)
mask = g.node("StubMask", value=val_list.out(0), height=512, width=512, batch_size=1)
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
input2 = g.node("StubImage", content="WHITE", height=512, width=512, batch_size=1)
mix = g.node("TestLazyMixImages", image1=input1.out(0), image2=input2.out(0), mask=mask.out(0))
rebatch = g.node("RebatchImages", images=mix.out(0), batch_size=3)
output = g.node("SaveImage", images=rebatch.out(0))
result = client.run(g)
images = result.get_images(output)
assert len(images) == 3, "Should have 3 image"
assert numpy.array(images[0]).min() == 0 and numpy.array(images[0]).max() == 0, "First image should be 0.0"
assert numpy.array(images[1]).min() == 127 and numpy.array(images[1]).max() == 127, "Second image should be 0.5"
assert numpy.array(images[2]).min() == 255 and numpy.array(images[2]).max() == 255, "Third image should be 1.0"
def test_output_reuse(self, client: ComfyClient, builder: GraphBuilder):
g = builder
input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
output1 = g.node("PreviewImage", images=input1.out(0))
output2 = g.node("PreviewImage", images=input1.out(0))
result = client.run(g)
images1 = result.get_images(output1)
images2 = result.get_images(output2)
assert len(images1) == 1, "Should have 1 image"
assert len(images2) == 1, "Should have 1 image"

View File

@@ -0,0 +1,23 @@
from .specific_tests import TEST_NODE_CLASS_MAPPINGS, TEST_NODE_DISPLAY_NAME_MAPPINGS
from .flow_control import FLOW_CONTROL_NODE_CLASS_MAPPINGS, FLOW_CONTROL_NODE_DISPLAY_NAME_MAPPINGS
from .util import UTILITY_NODE_CLASS_MAPPINGS, UTILITY_NODE_DISPLAY_NAME_MAPPINGS
from .conditions import CONDITION_NODE_CLASS_MAPPINGS, CONDITION_NODE_DISPLAY_NAME_MAPPINGS
from .stubs import TEST_STUB_NODE_CLASS_MAPPINGS, TEST_STUB_NODE_DISPLAY_NAME_MAPPINGS
# NODE_CLASS_MAPPINGS = GENERAL_NODE_CLASS_MAPPINGS.update(COMPONENT_NODE_CLASS_MAPPINGS)
# NODE_DISPLAY_NAME_MAPPINGS = GENERAL_NODE_DISPLAY_NAME_MAPPINGS.update(COMPONENT_NODE_DISPLAY_NAME_MAPPINGS)
NODE_CLASS_MAPPINGS = {}
NODE_CLASS_MAPPINGS.update(TEST_NODE_CLASS_MAPPINGS)
NODE_CLASS_MAPPINGS.update(FLOW_CONTROL_NODE_CLASS_MAPPINGS)
NODE_CLASS_MAPPINGS.update(UTILITY_NODE_CLASS_MAPPINGS)
NODE_CLASS_MAPPINGS.update(CONDITION_NODE_CLASS_MAPPINGS)
NODE_CLASS_MAPPINGS.update(TEST_STUB_NODE_CLASS_MAPPINGS)
NODE_DISPLAY_NAME_MAPPINGS = {}
NODE_DISPLAY_NAME_MAPPINGS.update(TEST_NODE_DISPLAY_NAME_MAPPINGS)
NODE_DISPLAY_NAME_MAPPINGS.update(FLOW_CONTROL_NODE_DISPLAY_NAME_MAPPINGS)
NODE_DISPLAY_NAME_MAPPINGS.update(UTILITY_NODE_DISPLAY_NAME_MAPPINGS)
NODE_DISPLAY_NAME_MAPPINGS.update(CONDITION_NODE_DISPLAY_NAME_MAPPINGS)
NODE_DISPLAY_NAME_MAPPINGS.update(TEST_STUB_NODE_DISPLAY_NAME_MAPPINGS)

View File

@@ -0,0 +1,194 @@
import re
import torch
class TestIntConditions:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"a": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}),
"b": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}),
"operation": (["==", "!=", "<", ">", "<=", ">="],),
},
}
RETURN_TYPES = ("BOOLEAN",)
FUNCTION = "int_condition"
CATEGORY = "Testing/Logic"
def int_condition(self, a, b, operation):
if operation == "==":
return (a == b,)
elif operation == "!=":
return (a != b,)
elif operation == "<":
return (a < b,)
elif operation == ">":
return (a > b,)
elif operation == "<=":
return (a <= b,)
elif operation == ">=":
return (a >= b,)
class TestFloatConditions:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"a": ("FLOAT", {"default": 0, "min": -999999999999.0, "max": 999999999999.0, "step": 1}),
"b": ("FLOAT", {"default": 0, "min": -999999999999.0, "max": 999999999999.0, "step": 1}),
"operation": (["==", "!=", "<", ">", "<=", ">="],),
},
}
RETURN_TYPES = ("BOOLEAN",)
FUNCTION = "float_condition"
CATEGORY = "Testing/Logic"
def float_condition(self, a, b, operation):
if operation == "==":
return (a == b,)
elif operation == "!=":
return (a != b,)
elif operation == "<":
return (a < b,)
elif operation == ">":
return (a > b,)
elif operation == "<=":
return (a <= b,)
elif operation == ">=":
return (a >= b,)
class TestStringConditions:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"a": ("STRING", {"multiline": False}),
"b": ("STRING", {"multiline": False}),
"operation": (["a == b", "a != b", "a IN b", "a MATCH REGEX(b)", "a BEGINSWITH b", "a ENDSWITH b"],),
"case_sensitive": ("BOOLEAN", {"default": True}),
},
}
RETURN_TYPES = ("BOOLEAN",)
FUNCTION = "string_condition"
CATEGORY = "Testing/Logic"
def string_condition(self, a, b, operation, case_sensitive):
if not case_sensitive:
a = a.lower()
b = b.lower()
if operation == "a == b":
return (a == b,)
elif operation == "a != b":
return (a != b,)
elif operation == "a IN b":
return (a in b,)
elif operation == "a MATCH REGEX(b)":
try:
return (re.match(b, a) is not None,)
except:
return (False,)
elif operation == "a BEGINSWITH b":
return (a.startswith(b),)
elif operation == "a ENDSWITH b":
return (a.endswith(b),)
class TestToBoolNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"value": ("*",),
},
"optional": {
"invert": ("BOOLEAN", {"default": False}),
},
}
RETURN_TYPES = ("BOOLEAN",)
FUNCTION = "to_bool"
CATEGORY = "Testing/Logic"
def to_bool(self, value, invert = False):
if isinstance(value, torch.Tensor):
if value.max().item() == 0 and value.min().item() == 0:
result = False
else:
result = True
else:
try:
result = bool(value)
except:
# Can't convert it? Well then it's something or other. I dunno, I'm not a Python programmer.
result = True
if invert:
result = not result
return (result,)
class TestBoolOperationNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"a": ("BOOLEAN",),
"b": ("BOOLEAN",),
"op": (["a AND b", "a OR b", "a XOR b", "NOT a"],),
},
}
RETURN_TYPES = ("BOOLEAN",)
FUNCTION = "bool_operation"
CATEGORY = "Testing/Logic"
def bool_operation(self, a, b, op):
if op == "a AND b":
return (a and b,)
elif op == "a OR b":
return (a or b,)
elif op == "a XOR b":
return (a ^ b,)
elif op == "NOT a":
return (not a,)
CONDITION_NODE_CLASS_MAPPINGS = {
"TestIntConditions": TestIntConditions,
"TestFloatConditions": TestFloatConditions,
"TestStringConditions": TestStringConditions,
"TestToBoolNode": TestToBoolNode,
"TestBoolOperationNode": TestBoolOperationNode,
}
CONDITION_NODE_DISPLAY_NAME_MAPPINGS = {
"TestIntConditions": "Int Condition",
"TestFloatConditions": "Float Condition",
"TestStringConditions": "String Condition",
"TestToBoolNode": "To Bool",
"TestBoolOperationNode": "Bool Operation",
}

View File

@@ -0,0 +1,173 @@
from comfy.graph_utils import GraphBuilder, is_link
from comfy.graph import ExecutionBlocker
from .tools import VariantSupport
NUM_FLOW_SOCKETS = 5
@VariantSupport()
class TestWhileLoopOpen:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
inputs = {
"required": {
"condition": ("BOOLEAN", {"default": True}),
},
"optional": {
},
}
for i in range(NUM_FLOW_SOCKETS):
inputs["optional"][f"initial_value{i}"] = ("*",)
return inputs
RETURN_TYPES = tuple(["FLOW_CONTROL"] + ["*"] * NUM_FLOW_SOCKETS)
RETURN_NAMES = tuple(["FLOW_CONTROL"] + [f"value{i}" for i in range(NUM_FLOW_SOCKETS)])
FUNCTION = "while_loop_open"
CATEGORY = "Testing/Flow"
def while_loop_open(self, condition, **kwargs):
values = []
for i in range(NUM_FLOW_SOCKETS):
values.append(kwargs.get(f"initial_value{i}", None))
return tuple(["stub"] + values)
@VariantSupport()
class TestWhileLoopClose:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
inputs = {
"required": {
"flow_control": ("FLOW_CONTROL", {"rawLink": True}),
"condition": ("BOOLEAN", {"forceInput": True}),
},
"optional": {
},
"hidden": {
"dynprompt": "DYNPROMPT",
"unique_id": "UNIQUE_ID",
}
}
for i in range(NUM_FLOW_SOCKETS):
inputs["optional"][f"initial_value{i}"] = ("*",)
return inputs
RETURN_TYPES = tuple(["*"] * NUM_FLOW_SOCKETS)
RETURN_NAMES = tuple([f"value{i}" for i in range(NUM_FLOW_SOCKETS)])
FUNCTION = "while_loop_close"
CATEGORY = "Testing/Flow"
def explore_dependencies(self, node_id, dynprompt, upstream):
node_info = dynprompt.get_node(node_id)
if "inputs" not in node_info:
return
for k, v in node_info["inputs"].items():
if is_link(v):
parent_id = v[0]
if parent_id not in upstream:
upstream[parent_id] = []
self.explore_dependencies(parent_id, dynprompt, upstream)
upstream[parent_id].append(node_id)
def collect_contained(self, node_id, upstream, contained):
if node_id not in upstream:
return
for child_id in upstream[node_id]:
if child_id not in contained:
contained[child_id] = True
self.collect_contained(child_id, upstream, contained)
def while_loop_close(self, flow_control, condition, dynprompt=None, unique_id=None, **kwargs):
assert dynprompt is not None
if not condition:
# We're done with the loop
values = []
for i in range(NUM_FLOW_SOCKETS):
values.append(kwargs.get(f"initial_value{i}", None))
return tuple(values)
# We want to loop
upstream = {}
# Get the list of all nodes between the open and close nodes
self.explore_dependencies(unique_id, dynprompt, upstream)
contained = {}
open_node = flow_control[0]
self.collect_contained(open_node, upstream, contained)
contained[unique_id] = True
contained[open_node] = True
# We'll use the default prefix, but to avoid having node names grow exponentially in size,
# we'll use "Recurse" for the name of the recursively-generated copy of this node.
graph = GraphBuilder()
for node_id in contained:
original_node = dynprompt.get_node(node_id)
node = graph.node(original_node["class_type"], "Recurse" if node_id == unique_id else node_id)
node.set_override_display_id(node_id)
for node_id in contained:
original_node = dynprompt.get_node(node_id)
node = graph.lookup_node("Recurse" if node_id == unique_id else node_id)
assert node is not None
for k, v in original_node["inputs"].items():
if is_link(v) and v[0] in contained:
parent = graph.lookup_node(v[0])
assert parent is not None
node.set_input(k, parent.out(v[1]))
else:
node.set_input(k, v)
new_open = graph.lookup_node(open_node)
assert new_open is not None
for i in range(NUM_FLOW_SOCKETS):
key = f"initial_value{i}"
new_open.set_input(key, kwargs.get(key, None))
my_clone = graph.lookup_node("Recurse")
assert my_clone is not None
result = map(lambda x: my_clone.out(x), range(NUM_FLOW_SOCKETS))
return {
"result": tuple(result),
"expand": graph.finalize(),
}
@VariantSupport()
class TestExecutionBlockerNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
inputs = {
"required": {
"input": ("*",),
"block": ("BOOLEAN",),
"verbose": ("BOOLEAN", {"default": False}),
},
}
return inputs
RETURN_TYPES = ("*",)
RETURN_NAMES = ("output",)
FUNCTION = "execution_blocker"
CATEGORY = "Testing/Flow"
def execution_blocker(self, input, block, verbose):
if block:
return (ExecutionBlocker("Blocked Execution" if verbose else None),)
return (input,)
FLOW_CONTROL_NODE_CLASS_MAPPINGS = {
"TestWhileLoopOpen": TestWhileLoopOpen,
"TestWhileLoopClose": TestWhileLoopClose,
"TestExecutionBlocker": TestExecutionBlockerNode,
}
FLOW_CONTROL_NODE_DISPLAY_NAME_MAPPINGS = {
"TestWhileLoopOpen": "While Loop Open",
"TestWhileLoopClose": "While Loop Close",
"TestExecutionBlocker": "Execution Blocker",
}

View File

@@ -0,0 +1,335 @@
import torch
from .tools import VariantSupport
from comfy.graph_utils import GraphBuilder
class TestLazyMixImages:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"image1": ("IMAGE",{"lazy": True}),
"image2": ("IMAGE",{"lazy": True}),
"mask": ("MASK",),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "mix"
CATEGORY = "Testing/Nodes"
def check_lazy_status(self, mask, image1, image2):
mask_min = mask.min()
mask_max = mask.max()
needed = []
if image1 is None and (mask_min != 1.0 or mask_max != 1.0):
needed.append("image1")
if image2 is None and (mask_min != 0.0 or mask_max != 0.0):
needed.append("image2")
return needed
# Not trying to handle different batch sizes here just to keep the demo simple
def mix(self, mask, image1, image2):
mask_min = mask.min()
mask_max = mask.max()
if mask_min == 0.0 and mask_max == 0.0:
return (image1,)
elif mask_min == 1.0 and mask_max == 1.0:
return (image2,)
if len(mask.shape) == 2:
mask = mask.unsqueeze(0)
if len(mask.shape) == 3:
mask = mask.unsqueeze(3)
if mask.shape[3] < image1.shape[3]:
mask = mask.repeat(1, 1, 1, image1.shape[3])
result = image1 * (1. - mask) + image2 * mask,
return (result[0],)
class TestVariadicAverage:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"input1": ("IMAGE",),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "variadic_average"
CATEGORY = "Testing/Nodes"
def variadic_average(self, input1, **kwargs):
inputs = [input1]
while 'input' + str(len(inputs) + 1) in kwargs:
inputs.append(kwargs['input' + str(len(inputs) + 1)])
return (torch.stack(inputs).mean(dim=0),)
class TestCustomIsChanged:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"image": ("IMAGE",),
},
"optional": {
"should_change": ("BOOL", {"default": False}),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "custom_is_changed"
CATEGORY = "Testing/Nodes"
def custom_is_changed(self, image, should_change=False):
return (image,)
@classmethod
def IS_CHANGED(cls, should_change=False, *args, **kwargs):
if should_change:
return float("NaN")
else:
return False
class TestCustomValidation1:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"input1": ("IMAGE,FLOAT",),
"input2": ("IMAGE,FLOAT",),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "custom_validation1"
CATEGORY = "Testing/Nodes"
def custom_validation1(self, input1, input2):
if isinstance(input1, float) and isinstance(input2, float):
result = torch.ones([1, 512, 512, 3]) * input1 * input2
else:
result = input1 * input2
return (result,)
@classmethod
def VALIDATE_INPUTS(cls, input1=None, input2=None):
if input1 is not None:
if not isinstance(input1, (torch.Tensor, float)):
return f"Invalid type of input1: {type(input1)}"
if input2 is not None:
if not isinstance(input2, (torch.Tensor, float)):
return f"Invalid type of input2: {type(input2)}"
return True
class TestCustomValidation2:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"input1": ("IMAGE,FLOAT",),
"input2": ("IMAGE,FLOAT",),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "custom_validation2"
CATEGORY = "Testing/Nodes"
def custom_validation2(self, input1, input2):
if isinstance(input1, float) and isinstance(input2, float):
result = torch.ones([1, 512, 512, 3]) * input1 * input2
else:
result = input1 * input2
return (result,)
@classmethod
def VALIDATE_INPUTS(cls, input_types, input1=None, input2=None):
if input1 is not None:
if not isinstance(input1, (torch.Tensor, float)):
return f"Invalid type of input1: {type(input1)}"
if input2 is not None:
if not isinstance(input2, (torch.Tensor, float)):
return f"Invalid type of input2: {type(input2)}"
if 'input1' in input_types:
if input_types['input1'] not in ["IMAGE", "FLOAT"]:
return f"Invalid type of input1: {input_types['input1']}"
if 'input2' in input_types:
if input_types['input2'] not in ["IMAGE", "FLOAT"]:
return f"Invalid type of input2: {input_types['input2']}"
return True
@VariantSupport()
class TestCustomValidation3:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"input1": ("IMAGE,FLOAT",),
"input2": ("IMAGE,FLOAT",),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "custom_validation3"
CATEGORY = "Testing/Nodes"
def custom_validation3(self, input1, input2):
if isinstance(input1, float) and isinstance(input2, float):
result = torch.ones([1, 512, 512, 3]) * input1 * input2
else:
result = input1 * input2
return (result,)
class TestCustomValidation4:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"input1": ("FLOAT",),
"input2": ("FLOAT",),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "custom_validation4"
CATEGORY = "Testing/Nodes"
def custom_validation4(self, input1, input2):
result = torch.ones([1, 512, 512, 3]) * input1 * input2
return (result,)
@classmethod
def VALIDATE_INPUTS(cls, input1, input2):
if input1 is not None:
if not isinstance(input1, float):
return f"Invalid type of input1: {type(input1)}"
if input2 is not None:
if not isinstance(input2, float):
return f"Invalid type of input2: {type(input2)}"
return True
class TestCustomValidation5:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"input1": ("FLOAT", {"min": 0.0, "max": 1.0}),
"input2": ("FLOAT", {"min": 0.0, "max": 1.0}),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "custom_validation5"
CATEGORY = "Testing/Nodes"
def custom_validation5(self, input1, input2):
value = input1 * input2
return (torch.ones([1, 512, 512, 3]) * value,)
@classmethod
def VALIDATE_INPUTS(cls, **kwargs):
if kwargs['input2'] == 7.0:
return "7s are not allowed. I've never liked 7s."
return True
class TestDynamicDependencyCycle:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"input1": ("IMAGE",),
"input2": ("IMAGE",),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "dynamic_dependency_cycle"
CATEGORY = "Testing/Nodes"
def dynamic_dependency_cycle(self, input1, input2):
g = GraphBuilder()
mask = g.node("StubMask", value=0.5, height=512, width=512, batch_size=1)
mix1 = g.node("TestLazyMixImages", image1=input1, mask=mask.out(0))
mix2 = g.node("TestLazyMixImages", image1=mix1.out(0), image2=input2, mask=mask.out(0))
# Create the cyle
mix1.set_input("image2", mix2.out(0))
return {
"result": (mix2.out(0),),
"expand": g.finalize(),
}
class TestMixedExpansionReturns:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"input1": ("FLOAT",),
},
}
RETURN_TYPES = ("IMAGE","IMAGE")
FUNCTION = "mixed_expansion_returns"
CATEGORY = "Testing/Nodes"
def mixed_expansion_returns(self, input1):
white_image = torch.ones([1, 512, 512, 3])
if input1 <= 0.1:
return (torch.ones([1, 512, 512, 3]) * 0.1, white_image)
elif input1 <= 0.2:
return {
"result": (torch.ones([1, 512, 512, 3]) * 0.2, white_image),
}
else:
g = GraphBuilder()
mask = g.node("StubMask", value=0.3, height=512, width=512, batch_size=1)
black = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1)
white = g.node("StubImage", content="WHITE", height=512, width=512, batch_size=1)
mix = g.node("TestLazyMixImages", image1=black.out(0), image2=white.out(0), mask=mask.out(0))
return {
"result": (mix.out(0), white_image),
"expand": g.finalize(),
}
TEST_NODE_CLASS_MAPPINGS = {
"TestLazyMixImages": TestLazyMixImages,
"TestVariadicAverage": TestVariadicAverage,
"TestCustomIsChanged": TestCustomIsChanged,
"TestCustomValidation1": TestCustomValidation1,
"TestCustomValidation2": TestCustomValidation2,
"TestCustomValidation3": TestCustomValidation3,
"TestCustomValidation4": TestCustomValidation4,
"TestCustomValidation5": TestCustomValidation5,
"TestDynamicDependencyCycle": TestDynamicDependencyCycle,
"TestMixedExpansionReturns": TestMixedExpansionReturns,
}
TEST_NODE_DISPLAY_NAME_MAPPINGS = {
"TestLazyMixImages": "Lazy Mix Images",
"TestVariadicAverage": "Variadic Average",
"TestCustomIsChanged": "Custom IsChanged",
"TestCustomValidation1": "Custom Validation 1",
"TestCustomValidation2": "Custom Validation 2",
"TestCustomValidation3": "Custom Validation 3",
"TestCustomValidation4": "Custom Validation 4",
"TestCustomValidation5": "Custom Validation 5",
"TestDynamicDependencyCycle": "Dynamic Dependency Cycle",
"TestMixedExpansionReturns": "Mixed Expansion Returns",
}

View File

@@ -0,0 +1,105 @@
import torch
class StubImage:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"content": (['WHITE', 'BLACK', 'NOISE'],),
"height": ("INT", {"default": 512, "min": 1, "max": 1024 ** 3, "step": 1}),
"width": ("INT", {"default": 512, "min": 1, "max": 4096 ** 3, "step": 1}),
"batch_size": ("INT", {"default": 1, "min": 1, "max": 1024 ** 3, "step": 1}),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "stub_image"
CATEGORY = "Testing/Stub Nodes"
def stub_image(self, content, height, width, batch_size):
if content == "WHITE":
return (torch.ones(batch_size, height, width, 3),)
elif content == "BLACK":
return (torch.zeros(batch_size, height, width, 3),)
elif content == "NOISE":
return (torch.rand(batch_size, height, width, 3),)
class StubMask:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"value": ("FLOAT", {"default": 0.5, "min": 0.0, "max": 1.0, "step": 0.01}),
"height": ("INT", {"default": 512, "min": 1, "max": 1024 ** 3, "step": 1}),
"width": ("INT", {"default": 512, "min": 1, "max": 4096 ** 3, "step": 1}),
"batch_size": ("INT", {"default": 1, "min": 1, "max": 1024 ** 3, "step": 1}),
},
}
RETURN_TYPES = ("MASK",)
FUNCTION = "stub_mask"
CATEGORY = "Testing/Stub Nodes"
def stub_mask(self, value, height, width, batch_size):
return (torch.ones(batch_size, height, width) * value,)
class StubInt:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"value": ("INT", {"default": 0, "min": -0xffffffff, "max": 0xffffffff, "step": 1}),
},
}
RETURN_TYPES = ("INT",)
FUNCTION = "stub_int"
CATEGORY = "Testing/Stub Nodes"
def stub_int(self, value):
return (value,)
class StubFloat:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"value": ("FLOAT", {"default": 0.0, "min": -1.0e38, "max": 1.0e38, "step": 0.01}),
},
}
RETURN_TYPES = ("FLOAT",)
FUNCTION = "stub_float"
CATEGORY = "Testing/Stub Nodes"
def stub_float(self, value):
return (value,)
TEST_STUB_NODE_CLASS_MAPPINGS = {
"StubImage": StubImage,
"StubMask": StubMask,
"StubInt": StubInt,
"StubFloat": StubFloat,
}
TEST_STUB_NODE_DISPLAY_NAME_MAPPINGS = {
"StubImage": "Stub Image",
"StubMask": "Stub Mask",
"StubInt": "Stub Int",
"StubFloat": "Stub Float",
}

View File

@@ -0,0 +1,53 @@
def MakeSmartType(t):
if isinstance(t, str):
return SmartType(t)
return t
class SmartType(str):
def __ne__(self, other):
if self == "*" or other == "*":
return False
selfset = set(self.split(','))
otherset = set(other.split(','))
return not selfset.issubset(otherset)
def VariantSupport():
def decorator(cls):
if hasattr(cls, "INPUT_TYPES"):
old_input_types = getattr(cls, "INPUT_TYPES")
def new_input_types(*args, **kwargs):
types = old_input_types(*args, **kwargs)
for category in ["required", "optional"]:
if category not in types:
continue
for key, value in types[category].items():
if isinstance(value, tuple):
types[category][key] = (MakeSmartType(value[0]),) + value[1:]
return types
setattr(cls, "INPUT_TYPES", new_input_types)
if hasattr(cls, "RETURN_TYPES"):
old_return_types = cls.RETURN_TYPES
setattr(cls, "RETURN_TYPES", tuple(MakeSmartType(x) for x in old_return_types))
if hasattr(cls, "VALIDATE_INPUTS"):
# Reflection is used to determine what the function signature is, so we can't just change the function signature
raise NotImplementedError("VariantSupport does not support VALIDATE_INPUTS yet")
else:
def validate_inputs(input_types):
inputs = cls.INPUT_TYPES()
for key, value in input_types.items():
if isinstance(value, SmartType):
continue
if "required" in inputs and key in inputs["required"]:
expected_type = inputs["required"][key][0]
elif "optional" in inputs and key in inputs["optional"]:
expected_type = inputs["optional"][key][0]
else:
expected_type = None
if expected_type is not None and MakeSmartType(value) != expected_type:
return f"Invalid type of {key}: {value} (expected {expected_type})"
return True
setattr(cls, "VALIDATE_INPUTS", validate_inputs)
return cls
return decorator

View File

@@ -0,0 +1,364 @@
from comfy.graph_utils import GraphBuilder
from .tools import VariantSupport
@VariantSupport()
class TestAccumulateNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"to_add": ("*",),
},
"optional": {
"accumulation": ("ACCUMULATION",),
},
}
RETURN_TYPES = ("ACCUMULATION",)
FUNCTION = "accumulate"
CATEGORY = "Testing/Lists"
def accumulate(self, to_add, accumulation = None):
if accumulation is None:
value = [to_add]
else:
value = accumulation["accum"] + [to_add]
return ({"accum": value},)
@VariantSupport()
class TestAccumulationHeadNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"accumulation": ("ACCUMULATION",),
},
}
RETURN_TYPES = ("ACCUMULATION", "*",)
FUNCTION = "accumulation_head"
CATEGORY = "Testing/Lists"
def accumulation_head(self, accumulation):
accum = accumulation["accum"]
if len(accum) == 0:
return (accumulation, None)
else:
return ({"accum": accum[1:]}, accum[0])
class TestAccumulationTailNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"accumulation": ("ACCUMULATION",),
},
}
RETURN_TYPES = ("ACCUMULATION", "*",)
FUNCTION = "accumulation_tail"
CATEGORY = "Testing/Lists"
def accumulation_tail(self, accumulation):
accum = accumulation["accum"]
if len(accum) == 0:
return (None, accumulation)
else:
return ({"accum": accum[:-1]}, accum[-1])
@VariantSupport()
class TestAccumulationToListNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"accumulation": ("ACCUMULATION",),
},
}
RETURN_TYPES = ("*",)
OUTPUT_IS_LIST = (True,)
FUNCTION = "accumulation_to_list"
CATEGORY = "Testing/Lists"
def accumulation_to_list(self, accumulation):
return (accumulation["accum"],)
@VariantSupport()
class TestListToAccumulationNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"list": ("*",),
},
}
RETURN_TYPES = ("ACCUMULATION",)
INPUT_IS_LIST = (True,)
FUNCTION = "list_to_accumulation"
CATEGORY = "Testing/Lists"
def list_to_accumulation(self, list):
return ({"accum": list},)
@VariantSupport()
class TestAccumulationGetLengthNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"accumulation": ("ACCUMULATION",),
},
}
RETURN_TYPES = ("INT",)
FUNCTION = "accumlength"
CATEGORY = "Testing/Lists"
def accumlength(self, accumulation):
return (len(accumulation['accum']),)
@VariantSupport()
class TestAccumulationGetItemNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"accumulation": ("ACCUMULATION",),
"index": ("INT", {"default":0, "step":1})
},
}
RETURN_TYPES = ("*",)
FUNCTION = "get_item"
CATEGORY = "Testing/Lists"
def get_item(self, accumulation, index):
return (accumulation['accum'][index],)
@VariantSupport()
class TestAccumulationSetItemNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"accumulation": ("ACCUMULATION",),
"index": ("INT", {"default":0, "step":1}),
"value": ("*",),
},
}
RETURN_TYPES = ("ACCUMULATION",)
FUNCTION = "set_item"
CATEGORY = "Testing/Lists"
def set_item(self, accumulation, index, value):
new_accum = accumulation['accum'][:]
new_accum[index] = value
return ({"accum": new_accum},)
class TestIntMathOperation:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"a": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}),
"b": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}),
"operation": (["add", "subtract", "multiply", "divide", "modulo", "power"],),
},
}
RETURN_TYPES = ("INT",)
FUNCTION = "int_math_operation"
CATEGORY = "Testing/Logic"
def int_math_operation(self, a, b, operation):
if operation == "add":
return (a + b,)
elif operation == "subtract":
return (a - b,)
elif operation == "multiply":
return (a * b,)
elif operation == "divide":
return (a // b,)
elif operation == "modulo":
return (a % b,)
elif operation == "power":
return (a ** b,)
from .flow_control import NUM_FLOW_SOCKETS
@VariantSupport()
class TestForLoopOpen:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"remaining": ("INT", {"default": 1, "min": 0, "max": 100000, "step": 1}),
},
"optional": {
f"initial_value{i}": ("*",) for i in range(1, NUM_FLOW_SOCKETS)
},
"hidden": {
"initial_value0": ("*",)
}
}
RETURN_TYPES = tuple(["FLOW_CONTROL", "INT",] + ["*"] * (NUM_FLOW_SOCKETS-1))
RETURN_NAMES = tuple(["flow_control", "remaining"] + [f"value{i}" for i in range(1, NUM_FLOW_SOCKETS)])
FUNCTION = "for_loop_open"
CATEGORY = "Testing/Flow"
def for_loop_open(self, remaining, **kwargs):
graph = GraphBuilder()
if "initial_value0" in kwargs:
remaining = kwargs["initial_value0"]
while_open = graph.node("TestWhileLoopOpen", condition=remaining, initial_value0=remaining, **{(f"initial_value{i}"): kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)})
outputs = [kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)]
return {
"result": tuple(["stub", remaining] + outputs),
"expand": graph.finalize(),
}
@VariantSupport()
class TestForLoopClose:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"flow_control": ("FLOW_CONTROL", {"rawLink": True}),
},
"optional": {
f"initial_value{i}": ("*",{"rawLink": True}) for i in range(1, NUM_FLOW_SOCKETS)
},
}
RETURN_TYPES = tuple(["*"] * (NUM_FLOW_SOCKETS-1))
RETURN_NAMES = tuple([f"value{i}" for i in range(1, NUM_FLOW_SOCKETS)])
FUNCTION = "for_loop_close"
CATEGORY = "Testing/Flow"
def for_loop_close(self, flow_control, **kwargs):
graph = GraphBuilder()
while_open = flow_control[0]
sub = graph.node("TestIntMathOperation", operation="subtract", a=[while_open,1], b=1)
cond = graph.node("TestToBoolNode", value=sub.out(0))
input_values = {f"initial_value{i}": kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)}
while_close = graph.node("TestWhileLoopClose",
flow_control=flow_control,
condition=cond.out(0),
initial_value0=sub.out(0),
**input_values)
return {
"result": tuple([while_close.out(i) for i in range(1, NUM_FLOW_SOCKETS)]),
"expand": graph.finalize(),
}
NUM_LIST_SOCKETS = 10
@VariantSupport()
class TestMakeListNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"value1": ("*",),
},
"optional": {
f"value{i}": ("*",) for i in range(1, NUM_LIST_SOCKETS)
},
}
RETURN_TYPES = ("*",)
FUNCTION = "make_list"
OUTPUT_IS_LIST = (True,)
CATEGORY = "Testing/Lists"
def make_list(self, **kwargs):
result = []
for i in range(NUM_LIST_SOCKETS):
if f"value{i}" in kwargs:
result.append(kwargs[f"value{i}"])
return (result,)
UTILITY_NODE_CLASS_MAPPINGS = {
"TestAccumulateNode": TestAccumulateNode,
"TestAccumulationHeadNode": TestAccumulationHeadNode,
"TestAccumulationTailNode": TestAccumulationTailNode,
"TestAccumulationToListNode": TestAccumulationToListNode,
"TestListToAccumulationNode": TestListToAccumulationNode,
"TestAccumulationGetLengthNode": TestAccumulationGetLengthNode,
"TestAccumulationGetItemNode": TestAccumulationGetItemNode,
"TestAccumulationSetItemNode": TestAccumulationSetItemNode,
"TestForLoopOpen": TestForLoopOpen,
"TestForLoopClose": TestForLoopClose,
"TestIntMathOperation": TestIntMathOperation,
"TestMakeListNode": TestMakeListNode,
}
UTILITY_NODE_DISPLAY_NAME_MAPPINGS = {
"TestAccumulateNode": "Accumulate",
"TestAccumulationHeadNode": "Accumulation Head",
"TestAccumulationTailNode": "Accumulation Tail",
"TestAccumulationToListNode": "Accumulation to List",
"TestListToAccumulationNode": "List to Accumulation",
"TestAccumulationGetLengthNode": "Accumulation Get Length",
"TestAccumulationGetItemNode": "Accumulation Get Item",
"TestAccumulationSetItemNode": "Accumulation Set Item",
"TestForLoopOpen": "For Loop Open",
"TestForLoopClose": "For Loop Close",
"TestIntMathOperation": "Int Math Operation",
"TestMakeListNode": "Make List",
}