Alejandro Saucedo
@AxSaucedo
in/axsaucedo
[NEXT]
![portrait](images/alejandro.jpg) Alejandro Saucedo |
|
[NEXT]
## http://ethical.institute/
[NEXT]
[NEXT]
[NEXT]
[NEXT]
- A set of tempaltes to support industry practitioners and suppliers in AI tenders.
- Fully open source, built using our "Machine Learning Maturity Model".
[NEXT]
Overview & caveats in scaling data pipelines
Airflow components (Celery, ML models, etc)Introducing difference between ML & Data Pipelines
Overview of Airflow + Usecase
[NEXT]
Today we are...
[NEXT]
Let's jump the hype-train!
A large scale crypto-analysis platformHeavy compute data analysis, Transform, Fetch...
Going deep running predictions on LSTMs
#### Can we survive the 2017 crypto-craze?
[NEXT]
All historical data from top 100 cryptos
Data goes from beginning to 09/2017
563871 daily-price (close) points
[NEXT]
from crypto_ml.data_loader import CryptoLoader cl
loader = cl()
loader.get_prices("bitcoin")
> array([ 134.21, 144.54, 139. , ..., 3905.95, 3631.04, 3630.7 ])
loader.get_df("bitcoin").head()
> Date Open High Low Close Market Cap
> 1608 2013-04-28 135.30 135.98 132.10 134.21 1,500,520,000
> 1607 2013-04-29 134.44 147.49 134.00 144.54 1,491,160,000
> 1606 2013-04-30 144.00 146.93 134.05 139.00 1,597,780,000
> 1605 2013-05-01 139.00 139.89 107.72 116.99 1,542,820,000
> 1604 2013-05-02 116.38 125.60 92.28 105.21 1,292,190,000
[NEXT]
from crypto_ml.manager import CryptoManager as cm
manager = cm()
manager.send_tasks()
> bitcoin [[4952.28323284 5492.85474648 6033.42626011 6573.99777374 7114.56928738
> 7655.14080101 8195.71231465 8736.28382828 9276.85534192 9817.42685555]]
> bitconnect [[157.70136155 181.86603134 206.03070113 230.19537092 254.36004071
> 278.5247105 302.6893803 326.85405009 351.01871988 375.18338967]]
[NEXT]
https://github.com/axsauze/crypto-ml
http://github.com/axsauze/industrial-airflow
[NEXT]
[NEXT SECTION]
[NEXT]
Crypto-ML Ltd. builds an incredibly accurate model
import random, time
def predict_crypto(self, crypto_data):
# pretend to do something
time.sleep(10)
return crypto_data * random.uniform(1, 2)
[NEXT]
They found the perfect dataset
from crypto_ml.data_loader import CryptoLoader
btc = CryptoLoader().get_df("bitcoin")
btc.head()
> Date Open High Low Close Market Cap
> 1608 2013-04-28 135.30 135.98 132.10 134.21 1,500,520,000
> 1607 2013-04-29 134.44 147.49 134.00 144.54 1,491,160,000
> 1606 2013-04-30 144.00 146.93 134.05 139.00 1,597,780,000
> 1605 2013-05-01 139.00 139.89 107.72 116.99 1,542,820,000
> 1604 2013-05-02 116.38 125.60 92.28 105.21 1,292,190,000
but they had no idea what to do
[NEXT] The Crypto-ML team realised copy pasting code from
wasn't enough...
[NEXT]
They had to do it properly
they learned how to build their
[NEXT]
Model training/creation and then prediction with model
[NEXT]
- Machine learning model development
[NEXT]
And then, using our model to predict unseen inputs
[NEXT]
[NEXT]
- Analysing dataset representability, class imbalance, etc
- Focusing in feature engineering and analysis
- Using the right model for the right challenge
- Taking a pragmatic approach on accuracy
[NEXT]
[NEXT]
def train_lstm_model(prices):
x, y = build_lstm_data(prices, WINDOW_SIZE, training=True)
model = get_rnn_model()
model.fit(x, y, batch_size=512, nb_epoch=1, validation_split=0.05)
model.save("latest_model.h5")
[NEXT]
def train_lstm_model(prices):
# <- Clean and transform data
x, y = build_lstm_data(prices, WINDOW_SIZE, training=True)
model = get_rnn_model()
model.fit(x, y, batch_size=512, nb_epoch=1, validation_split=0.05)
model.save("latest_model.h5")
[NEXT]
def train_lstm_model(prices):
# <- Clean and transform data
x, y = build_lstm_data(prices, WINDOW_SIZE, training=True)
# <- Select your model
model = get_rnn_model()
model.fit(x, y, batch_size=512, nb_epoch=1, validation_split=0.05)
model.save("latest_model.h5")
[NEXT]
def train_lstm_model(prices):
# <- Clean and transform data
x, y = build_lstm_data(prices, WINDOW_SIZE, training=True)
# <- Select your model
model = get_rnn_model()
# <- Train your model
model.fit(x, y, batch_size=512, nb_epoch=1, validation_split=0.05)
model.save("latest_model.h5")
[NEXT]
def train_lstm_model(prices):
# <- Clean and transform data
x, y = build_lstm_data(prices, WINDOW_SIZE, training=True)
# <- Select your model
model = get_rnn_model()
# <- Train your model
model.fit(x, y, batch_size=512, nb_epoch=1, validation_split=0.05)
# <- Persist your model
model.save("latest_model.h5")
[NEXT]
def deep_predict(prices):
x = build_lstm_data(prices, WINDOW_SIZE, training=False)
model = load_model("latest_model.h5")
return model.predict(x)
[NEXT]
def deep_predict(prices):
# <- Transform data
x = build_lstm_data(prices, WINDOW_SIZE, training=False)
model = load_model("latest_model.h5")
return model.predict(x)
[NEXT]
def deep_predict(prices):
# <- Transform data
x = build_lstm_data(prices, WINDOW_SIZE, training=False)
# <- Load model
model = load_model("latest_model.h5")
return model.predict(x)
[NEXT]
def deep_predict(prices):
# <- Transform data
x = build_lstm_data(prices, WINDOW_SIZE, training=False)
# <- Load model
model = load_model("latest_model.h5")
# <- Predict using your model
return model.predict(x)
[NEXT]
from crypto_ml.data_loader import CryptoLoader
cl = CryptoLoader()
df = cl.get_df("bitcoin")
prices = df[["Price"]].values
# Train model
train_lstm_model(prices)
# Predict using trained model
result = deep_predict(prices)
[NEXT]
[NEXT]
[NEXT SECTION]
[NEXT]
After CryptoML was caught using deep learning...
...they were featured in the top 10 global news
[NEXT]
They had a massive increase in their user-base
Users are running too many requests that can't be handled
[NEXT]
[NEXT]
- Distributed
- Asynchronous
- Task Queue
- For Python
[NEXT]
The Crypto-ML Devs thought go distributed was too hard
It's not with celery.
[NEXT]
def deep_predict(prices):
x = build_lstm_data(prices, WINDOW_SIZE, training=False)
model = load_model("latest_model.h5")
return model.predict(x)
[NEXT]
app = Celery('crypto_ml')
@app.task
def deep_predict(d_prices):
prices = load(prices)
x = build_lstm_data(prices, WINDOW_SIZE, training=False)
model = load_model("latest_model.h5")
return dump(model.predict(prices))
[NEXT]
# <- Initialise celery with id
app = Celery('crypto_ml')
@app.task
def deep_predict(d_prices):
prices = load(prices)
x = build_lstm_data(prices, WINDOW_SIZE, training=False)
model = load_model("latest_model.h5")
return dump(model.predict(prices))
[NEXT]
# <- Initialise celery with id
app = Celery('crypto_ml')
# <- Convert function into distributed
@app.task
def deep_predict(d_prices):
prices = load(prices)
x = build_lstm_data(prices, WINDOW_SIZE, training=False)
model = load_model("latest_model.h5")
return dump(model.predict(prices))
[NEXT]
# <- Initialise celery with id
app = Celery('crypto_ml')
# <- Convert function into distributed
@app.task
def deep_predict(d_prices):
# <- De-serialise input from queue
prices = load(prices)
x = build_lstm_data(prices, WINDOW_SIZE, training=False)
model = load_model("latest_model.h5")
return dump(model.predict(prices))
[NEXT]
# <- Initialise celery with id
app = Celery('crypto_ml')
# <- Convert function into distributed
@app.task
def deep_predict(d_prices):
# <- De-serialise input from queue
prices = load(prices)
x = build_lstm_data(prices, WINDOW_SIZE, training=False)
model = load_model("latest_model.h5")
# <- Serialise output to return to queue
return dump(model.predict(prices))
[NEXT]
$ celery -A crypto_ml worker
Darwin-15.6.0-x86_64-i386-64bit 2018-03-10 00:43:28
[config]
.> app: crypto_ml:0x10e81a9e8
.> transport: amqp://user:**@localhost:5672//
.> results: amqp://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[NEXT]
Now we just need to make the producer!
**Pretty much just calling the function**
[NEXT]
prices = cl.get_prices("bitcoin")
result = deep_predict(prices)
print(result)
[NEXT]
prices = cl.get_prices("bitcoin")
d_prices = dump(prices)
task = deep_predict.delay(d_prices)
p_result = task.get()
result = load(p_result)
print(result)
[NEXT]
prices = cl.get_prices("bitcoin")
# <- Serialise inputs
d_prices = dump(prices)
task = deep_predict.delay(d_prices)
p_result = task.get()
result = load(p_result)
print(result)
[NEXT]
prices = cl.get_prices("bitcoin")
# <- Serialise inputs
d_prices = dump(prices)
# <- Call function through distributed worker
task = deep_predict.delay(d_prices)
p_result = task.get()
result = load(p_result)
print(result)
[NEXT]
prices = cl.get_prices("bitcoin")
# <- Serialise inputs
d_prices = dump(prices)
# <- Call function through distributed worker
task = deep_predict.delay(d_prices)
# <- Wait for task to finish
p_result = task.get()
result = load(p_result)
print(result)
[NEXT]
prices = cl.get_prices("bitcoin")
# <- Serialise inputs
d_prices = dump(prices)
# <- Call function through distributed worker
task = deep_predict.delay(d_prices)
# <- Wait for task to finish
p_result = task.get()
# <- De-serialise result
result = load(p_result)
print(result)
[NEXT]
It adds the task to the queue
And it's picked up by a worker
[NEXT]
You can visualise through Celery "Flower"
$ celery -A crypto_ml flower
> INFO: flower.command: Visit me at http://localhost:5555
[NEXT]
You can run them across multiple servers and grow horizontally very easily
[NEXT]
We now have ML, and are distributed.
We have surely won.
[NEXT SECTION]
[NEXT]
[NEXT] The Crypto-ML has now an
amount of
Their data pipeline is getting unmanagable!
[NEXT]
People forget only a small fraction of real-world machine learning is composed of actual ML code
[NEXT]
- There is a growing need to pull data from different sources
- There is a growing need to pre- and post-process data
- As complexity increases tasks depending on others
- If a task fails we wouldn't want to run the children tasks
- Some tasks need to be triggered chronologically
- Data pipelines can get quite complex
- Having just celerized tasks ran by Linux chronjob gets messy
[NEXT]
The swiss army knife of data pipelines
[NEXT]
[NEXT]
...but the best out there
[NEXT]
...although it could be programmed to be
[NEXT]
...hence it's in Apache "Incubation"
[NEXT]
...version 2.0 is being developed
[NEXT]
...you could augment with STORM / Spark Streaming
[NEXT]
[NEXT]
- A data pipeline framework
- Written in Python
- It has an active community
- Provides a UI for management
[NEXT]
These are ETL operations that are executed in order and only execute if the previous ones are succesful
[NEXT]
These contain the execution order for operators
# Define DAG
DAG = DAG(dag_id="test",
start_date=datetime.now(),
schedule_interval="@once")
# Define operators
operator_1 = ...
operator_2 = ...
operator_3 = ...
# Define order
operator_1 >> operator_2 >> operator_3
As well as the schedule for execution
[NEXT]
View all DAGs, together with the schedule of execution
As well as the recent status of executed DAGs
[NEXT]
[NEXT]
# Create python functions
DAG = ...
def crypto_prediction():
data = ... # gather some data
prediction = ... # run some prediction
db.store(prediction) # store prediction
# Define Operators
crypto_task = PythonOperator(
task_id='pull_task',
python_callable=crypto_prediction, # <- Add python function
dag=DAG)
Here is an example of a PythonOperator
[NEXT]
You can use things like BranchOperator, SensorOperator, etc
[NEXT]
We can pass data across operators downstream with xcom
# Create python functions
def push_function():
return ['a', 'b', 'c']
def pull_function(**kwargs):
params = kwargs['ti'].xcom_pull(task_ids='push_task')
print(params)
# Define Operators
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True, # <- we pass the context
dag=DAG)
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True, # <- again we pass the context
dag=DAG)
# DAG order definition
push_task >> pull_task
This is useful to hold state data, such as a DB index IDs
[NEXT]
You can see XCom parameters for each operator run
[NEXT]
Modular separation of code, operators, dags, subdags, etc
This allows for better testing and re-usability
[NEXT]
Airflow also allows you to write your own custom "plugins"
from util import load, dump
class AirflowPlugin(object):
# The name of your plugin (str)
name = None
# A list of class(es) derived from BaseOperator
operators = []
# A list of class(es) derived from BaseHook
hooks = []
# A list of class(es) derived from BaseExecutor
executors = []
# A list of references to inject into the macros namespace
macros = []
# A list of objects created from a class derived
# from flask_admin.BaseView
admin_views = []
# A list of Blueprint object created from flask.Blueprint
flask_blueprints = []
# A list of menu links (flask_admin.base.MenuLink)
menu_links = []
It's possible to write your own Operators, hooks, etc
[NEXT] If that's not awesome enough...
[NEXT] It uses celery as task runner
As well as ability to use different backends
[NEXT]
Crypto-ML wants a workflow where:
- They pull crypto-data every day
- Data is transformed & standardised
- Once ready, a prediction should be computed
- Prediction should be stored in DB
- If relevant, a trade should be executed
[NEXT]
Let's break this down into Airflow terminology
[NEXT]
[NEXT]
[NEXT]
Scheduled task:
- Operator: polls new Crypto-data + triggers each sub-dags
Sub-DAG:
- Operator: Transforms the data
- Operator: Sends data to the crypto-prediction engine
- Sensor: Polls until the prediction is finished
- Branch: Modifies data & based on rules triggers action
- Operator: Stores data in database
- Operator: Sends request to trade
[NEXT] Success!
[NEXT]
[NEXT]
- Luigi
- Pinball
- Seldon Core
## Other similar (but different) * Dask * Apache Kafka
[NEXT]
- Docker
- Kubernetes
#### Implementations are in the codebase
[NEXT]
As with every other tech startup
the roller-coaster keeps going!
[NEXT SECTION]
Got an overview in scaling data pipelines
Airflow components (Celery, ML, etc)Difference between ML & Data Pipelines
Overview of Airflow + Usecase
[NEXT]
https://github.com/axsauze/crypto-ml
http://github.com/axsauze/industrial-airflow
[NEXT]
![portrait](images/alejandro.jpg)
Alejandro Saucedo |
|