API reference

Exported functions

Tissue.startMethod
start(graph)

Start all the calculators and begin pulling data out of the source calculator.

All calculators run in their own task. A call to start(graph) does not block; it returns after spawning all the tasks.

source
Tissue.stopMethod
stop(graph)

Stop the graph gracefully.

The graph will stop pulling new data packets out of the source calculator, and all the tasks running calculators will exit after they are done processing the last generated packet.

Tissue.wait_until_done(graph) can be called in the main thread to block it until every calculator is fully terminated and closed.

Examples

using Tissue
using MyGraphs: CoolestGraph

graph = CoolestGraph()

start(graph)
sleep(5)
stop(graph)
wait_until_done(graph)
source
Tissue.wait_until_doneMethod
wait_until_done(graph)

Block the main thread until the graph is done.

Can only be called after start(graph) was called. This waits for all the calculators to be finished processing the last packet, and calls close(calculator) for each calculator. This can occur either because stop(graph) was called, or the source calculator indicated that it is done generating data by returning nothing.

source

Exported macros

Tissue.@graphMacro
@graph <GraphName> begin ... end

Define a graph with a topology as specified in the begin ... end block.

This defines a struct <GraphName> with a constructor that takes no arguments. In the begin ... end block, use @calculator to define a calculator and @bindstreams to bind the input streams of a calculator to the output streams of other calculators. All @calculator declarations must come before any @bindstreams declaration.

Examples

using Tissue

struct SourceCalculator end
Tissue.process(c::SourceCalculator) = 42

struct WorkerCalculator end
function Tissue.process(c::WorkerCalculator, num)
    out = do_work(in_num)
    println(out)
end

@graph NumberGraph begin
    # 1. Declare calculators.
    @calculator source  = SourceCalculator()
    @calculator worker  = WorkerCalculator()

    # 2. Declare the streams which connects the source to the worker
    @bindstreams worker (num = source)
end

graph = NumberGraph()
source
Tissue.@calculatorMacro
@calculator calculator_handle = MyCalculator(arg1, arg2)

Create a calculator node in a graph.

Marks the calculator_handle variable as a new calculator constructed by MyCalculator(arg1, arg2), a user-defined struct. calculator_handle can then be used in a @bindstreams declaration to bind its input streams to the output streams of other calculators in the graph.

There must be one and only one source calculator in a graph. The source calculator of a graph is the only calculator which has no input streams. It is called the source calculator because it generates the data that will be processed by the rest of the graph.

Must be used in the begin ... end block of @graph, before all @bindstreams.

Examples

using Tissue

mutable struct MySourceCalculator
    last::Int64
    MySourceCalculator() = new(0)
end

function Tissue.process(c::MySourceCalculator)
    c.last += 1

    c.last
end

struct SinkCalculator end
function Tissue.process(c::SinkCalculator, number_stream)
    println(num)
end

@graph PrinterGraph begin
    @calculator source = MySourceCalculator()
    @calculator sink = SinkCalculator()

    @bindstreams sink (number_stream = source)
end
source
Tissue.@bindstreamsMacro
@bindstreams calculator_handle (stream1 = calc1) (stream2 = calc2) ...

Bind the streams of calculator_handle to the output stream of other calculators in the graph.

Multiple Tissue.process(calc) can be implemented for the same calculator type. @bindstreams selects the Tissue.process(calc) method to be used for calculator calculator_handle in this graph based on which named input streams are bound, and binds the output stream of the specified calculators to these input streams.

calculator_handle, calc1 and calc2 are variables that were assigned to inside a @calculator declaration. stream1 and stream2 are named streams that were defined in a process(c::CalculatorType, stream1, stream2) method definition.

Must be used in the begin ... end block of @graph, after all @calculator declarations.

Examples

using Tissue

struct SourceCalculator end
Tissue.process(c::SourceCalculator) = 42

struct MyCalculator end

# This method will be selected by the `@bindstreams` declaration
function Tissue.process(c::MyCalculator, stream1::Number)
    println(stream1)
    stream1
end

# This method will not be selected
function Tissue.process(c::MyCalculator, stream1::Number, stream2::Number)
    sum = stream1 + stream2
    println(sum)
    sum
end

# This method will not be selected
function Tissue.process(c::MyCalculator, stream1::Number, stream2::Number, stream3::Number)
    sum = stream1 + stream2 + stream3
    println(sum)
    sum
end

@graph MyGraph begin
    @calculator source = SourceCalculator()
    @calculator mycalc = MyCalculator()

    @bindstreams mycalc (stream1 = source)
end
source

Functions to be implemented by user

Tissue.processMethod
process(calculator, ...; graph::Graph)

Process the input streams into an output.

The first argument must be the associated calculator. Any other argument defines an input stream to the calculator. An input stream will be named with the exact same name as the corresponding argument. At all times, data coming from mutliple input streams into a process() method is guaranteed to be derived from the same datum generated by the source calculator.

Optionally, if you need a reference to the graph object in the process() method, you can add it as a keyword argument named graph and Tissue.jl will supply a reference to the active graph object.

The value returned from the method is the value sent to the calculator's output stream. Hence, any calculator that binds one of their input streams with @bindstreams to this calculator's output stream will receive the return value of this method as an argument. nothing is treated specially: it means that the calculator will not process the given input. Any downstream calculator that depends on other calculators too will simply drop the value taken from their output stream.

It is important to remember that each calculator runs in its own task. Therefore, best practices about concurrency must be applied. Notably, the stream arguments of process() could possibly be accessed by multiple threads simultaneously. Therefore, it is highly recommended to make a deepcopy of an argument before mutating it. This will also make your calculator more reusable across graphs.

Multiple process() methods can be defined per calculator, allowing the calculator to be used in different graph topologies.

Arguments

  • calculator: the associated calculator
  • ...: the input streams

Examples

using Base: deepcopy

function process(calc::MyCalculator, in_stream; graph)
    if some_condition
        stop(graph)
    end

    in_stream_copy = deepcopy(in_stream)

    mutate(in_stream_copy)

    return in_stream_copy
end
source
Tissue.closeMethod
close(calculator)

Perform cleanup for the calculator. Optional.

You can define a method for your calculator type to perform any necessary cleanup. Called by wait_until_done(graph) on each calculator to perform cleanup.

Examples

struct GoofyCalculator 
    resource
    function GoofyCalculator() 
        resource = acquire_resource()
        new(resource)
    end
end

function process(c::GoofyCalculator, some_stream)
    use_resource(c.resource)
end

function close(c::GoofyCalculator)
    release_resource(c.resource)
end
source