-
Notifications
You must be signed in to change notification settings - Fork 67
Channel Experiment
Sean Parent edited this page Apr 7, 2016
·
2 revisions
#include <stlab/channel.hpp>
#include <stlab/future.hpp>
#include <iostream>
#include <string>
#include <utility>
#include <vector>
using namespace std;
using namespace stlab;
/*
sum is an example of an accumulating "co-routine". It will await for values, keeping an
internal sum, until the channel is closed and then it will yield the result as a string.
*/
struct sum {
process_state _state = process_state::await;
int _sum = 0;
void await(int n) { _sum += n; }
int yield() { _state = process_state::await; return _sum; }
void close() { _state = process_state::yield; }
auto state() const { return _state; }
};
int main() {
/*
Create a channel to aggregate our values.
*/
sender<int> aggregate;
receiver<int> receiver;
tie(aggregate, receiver) = channel<int>();
/*
Create a vector to hold all the futures for each result as it is piped to channel.
The future is of type <void> because the value is passed into the channel.
*/
vector<stlab::future<void>> results;
for (int n = 0; n != 10; ++n) {
// Asynchronously generate a bunch of values.
results.emplace_back(async(default_scheduler(), [_n = n]{ return _n; })
// Then send those values into a copy of the channel
.then([_aggregate = aggregate](int n) {
_aggregate(n);
}));
}
// Now it is safe to close (or destruct) this channel, all the copies remain open.
aggregate.close();
auto pipe = receiver
/*
The receiver is our common end point - we attach the vector of futures to it (another)
inefficiency here - this is a lambda whose only purpose is to hold the vector of
futures.
*/
| [ _results = move(results) ](auto x){ return x; }
// Then we can pipe the values to our accumulator
| sum()
// And pipe the final value to a lambda to print it.
// Returning void from the pipe will mark it as ready.
| [](auto x){ cout << x << endl; };
receiver.set_ready(); // close this end of the pipe
// Wait for everything to execute (just for demonstration)
sleep(100);
}