API reference
Exported functions
Tissue.start
— Methodstart(graph)
Start all the calculators and begin pulling data out of the source calculator.
All calculators run in their own task. A call to start(graph)
does not block; it returns after spawning all the tasks.
Tissue.stop
— Methodstop(graph)
Stop the graph gracefully.
The graph will stop pulling new data packets out of the source calculator, and all the tasks running calculators will exit after they are done processing the last generated packet.
Tissue.wait_until_done(graph)
can be called in the main thread to block it until every calculator is fully terminated and closed.
Examples
using Tissue
using MyGraphs: CoolestGraph
graph = CoolestGraph()
start(graph)
sleep(5)
stop(graph)
wait_until_done(graph)
Tissue.wait_until_done
— Methodwait_until_done(graph)
Block the main thread until the graph is done.
Can only be called after start(graph)
was called. This waits for all the calculators to be finished processing the last packet, and calls close(calculator)
for each calculator. This can occur either because stop(graph)
was called, or the source calculator indicated that it is done generating data by returning nothing
.
Tissue.Graph
— TypeThe parent type of all graphs.
Exported macros
Tissue.@graph
— Macro@graph <GraphName> begin ... end
Define a graph with a topology as specified in the begin ... end
block.
This defines a struct <GraphName>
with a constructor that takes no arguments. In the begin ... end
block, use @calculator
to define a calculator and @bindstreams
to bind the input streams of a calculator to the output streams of other calculators. All @calculator
declarations must come before any @bindstreams
declaration.
Examples
using Tissue
struct SourceCalculator end
Tissue.process(c::SourceCalculator) = 42
struct WorkerCalculator end
function Tissue.process(c::WorkerCalculator, num)
out = do_work(in_num)
println(out)
end
@graph NumberGraph begin
# 1. Declare calculators.
@calculator source = SourceCalculator()
@calculator worker = WorkerCalculator()
# 2. Declare the streams which connects the source to the worker
@bindstreams worker (num = source)
end
graph = NumberGraph()
Tissue.@calculator
— Macro@calculator calculator_handle = MyCalculator(arg1, arg2)
Create a calculator node in a graph.
Marks the calculator_handle
variable as a new calculator constructed by MyCalculator(arg1, arg2)
, a user-defined struct. calculator_handle
can then be used in a @bindstreams
declaration to bind its input streams to the output streams of other calculators in the graph.
There must be one and only one source calculator in a graph. The source calculator of a graph is the only calculator which has no input streams. It is called the source calculator because it generates the data that will be processed by the rest of the graph.
Must be used in the begin ... end
block of @graph
, before all @bindstreams
.
Examples
using Tissue
mutable struct MySourceCalculator
last::Int64
MySourceCalculator() = new(0)
end
function Tissue.process(c::MySourceCalculator)
c.last += 1
c.last
end
struct SinkCalculator end
function Tissue.process(c::SinkCalculator, number_stream)
println(num)
end
@graph PrinterGraph begin
@calculator source = MySourceCalculator()
@calculator sink = SinkCalculator()
@bindstreams sink (number_stream = source)
end
Tissue.@bindstreams
— Macro@bindstreams calculator_handle (stream1 = calc1) (stream2 = calc2) ...
Bind the streams of calculator_handle
to the output stream of other calculators in the graph.
Multiple Tissue.process(calc)
can be implemented for the same calculator type. @bindstreams
selects the Tissue.process(calc)
method to be used for calculator calculator_handle
in this graph based on which named input streams are bound, and binds the output stream of the specified calculators to these input streams.
calculator_handle
, calc1
and calc2
are variables that were assigned to inside a @calculator
declaration. stream1
and stream2
are named streams that were defined in a process(c::CalculatorType, stream1, stream2)
method definition.
Must be used in the begin ... end
block of @graph
, after all @calculator
declarations.
Examples
using Tissue
struct SourceCalculator end
Tissue.process(c::SourceCalculator) = 42
struct MyCalculator end
# This method will be selected by the `@bindstreams` declaration
function Tissue.process(c::MyCalculator, stream1::Number)
println(stream1)
stream1
end
# This method will not be selected
function Tissue.process(c::MyCalculator, stream1::Number, stream2::Number)
sum = stream1 + stream2
println(sum)
sum
end
# This method will not be selected
function Tissue.process(c::MyCalculator, stream1::Number, stream2::Number, stream3::Number)
sum = stream1 + stream2 + stream3
println(sum)
sum
end
@graph MyGraph begin
@calculator source = SourceCalculator()
@calculator mycalc = MyCalculator()
@bindstreams mycalc (stream1 = source)
end
Functions to be implemented by user
Tissue.process
— Methodprocess(calculator, ...; graph::Graph)
Process the input streams into an output.
The first argument must be the associated calculator. Any other argument defines an input stream to the calculator. An input stream will be named with the exact same name as the corresponding argument. At all times, data coming from mutliple input streams into a process()
method is guaranteed to be derived from the same datum generated by the source calculator.
Optionally, if you need a reference to the graph
object in the process()
method, you can add it as a keyword argument named graph
and Tissue.jl will supply a reference to the active graph object.
The value returned from the method is the value sent to the calculator's output stream. Hence, any calculator that binds one of their input streams with @bindstreams
to this calculator's output stream will receive the return value of this method as an argument. nothing
is treated specially: it means that the calculator will not process the given input. Any downstream calculator that depends on other calculators too will simply drop the value taken from their output stream.
It is important to remember that each calculator runs in its own task. Therefore, best practices about concurrency must be applied. Notably, the stream arguments of process()
could possibly be accessed by multiple threads simultaneously. Therefore, it is highly recommended to make a deepcopy
of an argument before mutating it. This will also make your calculator more reusable across graphs.
Multiple process()
methods can be defined per calculator, allowing the calculator to be used in different graph topologies.
Arguments
calculator
: the associated calculator...
: the input streams
Examples
using Base: deepcopy
function process(calc::MyCalculator, in_stream; graph)
if some_condition
stop(graph)
end
in_stream_copy = deepcopy(in_stream)
mutate(in_stream_copy)
return in_stream_copy
end
Tissue.close
— Methodclose(calculator)
Perform cleanup for the calculator. Optional.
You can define a method for your calculator type to perform any necessary cleanup. Called by wait_until_done(graph)
on each calculator to perform cleanup.
Examples
struct GoofyCalculator
resource
function GoofyCalculator()
resource = acquire_resource()
new(resource)
end
end
function process(c::GoofyCalculator, some_stream)
use_resource(c.resource)
end
function close(c::GoofyCalculator)
release_resource(c.resource)
end