-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathespresso.py
executable file
·2498 lines (2241 loc) · 118 KB
/
espresso.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python
# -*- coding: utf-8 -*-
import pygame
import random
import time
import math
import os
import glob
import subprocess
import Adafruit_GPIO.SPI as SPI
# import Adafruit_MAX31855.MAX31855 as MAX31855
import RPi.GPIO as GPIO
# Note: could switch to gpiozero (instead of RPi.GPIO).
# https://gpiozero.readthedocs.io/en/v1.2.0/index.html
import thread
import signal
import json
import sys
import uptime
import numpy as np
import itertools
import traceback
if len(sys.argv) > 1 and sys.argv[1] == "test":
pass
else:
import plotly.plotly as py
import datetime
from Adafruit_ADS1x15 import ADS1x15
from collections import deque
# Remember that the PiTFT display uses the following GPIO pins: all the hardware SPI pins (SCK, MOSI, MISO, CE0, CE1), and GPIO 24 and 25.
# The 4 microswitches on the side of the display use: GPIO 17, 22, 23, 27 (from top to bottom)
# sys.stdout = open("/home/pi/logs/stdout-" + time.strftime('%Y-%m-%d-%H%M') + ".txt", 'w')
if len(sys.argv) > 1 and sys.argv[1] == "test":
pass
else:
sys.stdout = open("/home/pi/logs/stdout-" + time.strftime('%Y-%m-%d-%H%M') + ".txt", 'w', 1)
sys.stderr = open("/home/pi/logs/stderr-" + time.strftime('%Y-%m-%d-%H%M') + ".txt", 'w', 1)
max_brightness = 30
GPIO.setmode(GPIO.BCM)
button1_pin = 17
button2_pin = 22
button3_pin = 23
button4_pin = 27
button_pins = [button1_pin, button2_pin, button3_pin, button4_pin]
pump_pin = 12
heat_pin = 16
# Will also need to add: heat_pin and threewayvalve_pin
#
# Reading temperature from thermocouple, using software SPI connection
# DO = 26
# CS = 19
# CLK = 13
#
# sensor = MAX31855.MAX31855(CLK, CS, DO)
os.system('modprobe w1-gpio')
os.system('modprobe w1-therm')
base_dir = '/sys/bus/w1/devices/'
device_folder = glob.glob(base_dir + '28*')[0]
device_file = device_folder + '/w1_slave'
def read_sensor_raw():
f = open(device_file, 'r')
lines = f.readlines()
f.close()
return lines
# More reliable by a bit slower
# def read_sensor_raw():
# catdata = subprocess.Popen(['cat',device_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# out,err = catdata.communicate()
# out_decode = out.decode('utf-8')
# lines = out_decode.split('\n')
# return lines
def read_sensor():
lines = read_sensor_raw()
while lines[0].strip()[-3:] != 'YES':
# time.sleep(0.2)
lines = read_sensor_raw()
equals_pos = lines[1].find('t=')
if equals_pos != -1:
temp_string = lines[1][equals_pos+2:]
temp_c = float(temp_string) / 1000.0
return temp_c
# Setting up input pins for buttons:
GPIO.setup(button1_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(button2_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(button3_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(button4_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
# Setting the output pins:
GPIO.setup(pump_pin, GPIO.OUT)
# GPIO.setup(threewayvalve_pin, GPIO.OUT)
GPIO.setup(heat_pin, GPIO.OUT)
GPIO.output(pump_pin, 0)
GPIO.output(heat_pin, 0)
# I2C connection to ADC
# ADC connected to potentiometer and load cell.
ADS1115 = 0x01 # 16-bit ADC
adc = ADS1x15(ic=ADS1115)
# ADC for scale voltage readings
sps = 8 # sps can be 8, 16, 32, 64, 128, 250, 475, 860
len_raw_weight_values = 15
adc_resolution = 2048
scaling = [-287.35410, 430.74201]
# print "Scale resolution = %0.4f grams" %(round(adc_resolution,0)/(2**15)*scaling[1]/1000.0)
raw_weight_values=deque([scaling[0] - adc.readADCDifferential01(adc_resolution, sps)/1000.0 * scaling[1]]*len_raw_weight_values, len_raw_weight_values)
tare_weight = 0.0
prev_weight = 0.0
min_pp = 1
max_pp = 10
#####################################################################
# Global variables used to share state across functions and threads #
#####################################################################
steam_on = False
post_shot = False
pot_value = 0
keep_reading_pot = False
last_timer = 0
last_weight = 0
shot_pouring = False
seconds_elapsed = 0.0
end_shot_time = 0.0
keep_reading_scale = False
menu = 0 # Other options: 0 = main menu; 1 = settings menu
backflush_on = False
flush_on = False
# Betas control the total amount of heat to distribute across shot_heat_duration
# Alpha controls how much of the shot total energy should be placed towards the preheat or towards the end of shot_heat_duration
settings_namelist = ["set_temp",
"target_weight","target_time",
"time_t1","time_t2","time_t3","time_t4","profile_01","profile_12","profile_23","profile_34","pp0","pp1","pp2","pp3","pp4",
"flow_mode", "time_auto_flow",
"kP","kI","kD","k0",
"time_preheat","time_shot_heat",
"beta_0","alpha","warmup_temp","warmup_minutes"]
def load_settings():
try:
settings = json.load(open("/home/pi/settings.txt","r"))
for varname in settings_namelist:
globals()[varname] = settings[varname]
except:
reset_settings()
def save_settings():
settings = {}
for varname in settings_namelist:
settings[varname] = globals()[varname]
json.dump(settings, open("/home/pi/settings.txt", "w"))
def reset_settings():
global flow_mode, set_temp,target_weight, target_time, time_t1, time_t2, time_t3, time_t4, profile_01, profile_12, profile_23, profile_34, pp0, pp1, pp2, pp3, pp4, kP, kI, kD, k0, time_preheat, time_shot_heat, beta_0, alpha, warmup_temp, warmup_minutes
set_temp = 89
target_weight = 36
target_time = 31
flow_mode = "Auto"
time_auto_flow = 12
pp0, pp1, pp2, pp3, pp4 = 1, 7, 7, 7, 7
time_t1, time_t2, time_t3, time_t4 = 8, 8, 8, 8
profile_01, profile_12, profile_23, profile_34 = "Gradual", "Flat", "Flat", "Flat"
kP = 0.07
kI = 0.12
kD = 2.50
k0 = 0.035
time_preheat = 3.0 # Preheating lasts 3 seconds
time_shot_heat = 23.0 # No heat in the last 31-23 = 8 seconds
beta_0 = 29.0 # means: 29% heat on average over entire time_preheat+time_shot_heat for a reference 36g shot, where time_preheat+time_shot_heat = 27 seconds.
alpha = -25.0 # means: a 25% drop in heat from preheat to end of shot_heat_duration
warmup_temp = 101
warmup_minutes = 3
load_settings()
steam_temp = 90.0
time_auto_flow = 13
power = 0
pump_power = 0
# These values will be logged and saved in a json file
# y, heat_series, shot_series are directly used for display
start_temp = read_sensor()
y = deque([start_temp]*3, 3600)
y_time = deque([0]*3, 3600)
heat_series = deque([100]*3, 3600)
shot_series = deque([False]*3, 3600)
weight_series = []
weight_series_time = []
filtered_weight_series = []
filtered_flow_series = []
filtered_time = []
adc_values = []
raw_weights = []
# With a refresh speed of 0.5 seconds, 3600 data points = 30 minutes of temperature logging. Perhaps overkill.
# Making sure that y starts with a length of 3, so that the PID function can be run right after read_temp.
trigger_refresh_graph = False
trigger_heat = False
trigger_refresh_timer = False
filter_on = False
trigger_refresh_weight_graph = False
#################################################################################################
# Functions defining actions triggered by buttons (e.g. pour_shot) or automatically (e.g. heat) #
#################################################################################################
length_history = len(y)
def read_temp():
global length_history
current_temp_reading = read_sensor()
y.append(current_temp_reading)
y_time.append(time.time()-start_script_time)
if length_history < 60:
# This variable will be used to adjust how far back the integral term should go in the PID.
length_history += 1
def pid(y, set_temp):
# P: how far from the set_temp is y?
P = set_temp - y[-1]
# I: Mean error over the last 60 measurements (60*.80 = 48 second)
I = 1/length_history * sum(set_temp - y[-i] for i in xrange(1, length_history))
# Avoid integral windup:
I = clip(I, -1, 1)
# D: how has y evolved over the last 4 readings?
if len(y) < 4:
D = 0
else:
D = y[-2] - y[-1]
# D = ((y[-4] + y[-3])/2 - (y[-2] + y[-1])/2)/2
if start_script_time > end_shot_time:
minutes_on = (time.time()-start_script_time)/60
baseline_power = interpolate(minutes_on, (13, k0), (20, k0/4))
else:
minutes_since_last_shot = (time.time()-end_shot_time)/60
baseline_power = interpolate(minutes_since_last_shot, (6, k0), (10, k0/4))
pid_power = (baseline_power + kP*P + kI*I + kD*D)*100
power = clip(pid_power, 0, 100)
return(power)
# Open-loop temperature control during the shot
# Using settings: beta_0, alpha, and:
beta_1 = ((set_temp-20.0)/340.0)/(time_preheat + time_shot_heat)*100.0 # means: average heat during shot increases by about 0.70% for every additional gram of water.
# About beta_1:
# Heating 1 calorie = heating 1 gram of water by 1 degree celsius.
# Heating 1 additional gram from 20 C to 90 C = 70 calories.
# The boiler is 1425 Watt; i.e. it can produce 1425 joules per second (1 watt = 1 joule per second by definition) = 340 calories per second (1 calorie = 4.19 joules).
# Heating 1 additional gram requires turning the boiler on for an additional 70/340 = 0.20 second during the total duration of the shot.
# Spread over the entire duration of heating: 0.20/(time_preheat + time_shot_heat)*100% heat needs to be applied for each heat cycle.
# beta_1 should be hard-coded and not adjustable from the settings menu.
def shot_heat_gain(target_weight, target_time, time_preheat, time_shot_heat, alpha, beta_0, beta_1):
# this function is used to scale heat power during the shot
shot_heat_duration = time_preheat + time_shot_heat
t = -time_preheat
unscaled_total_heat = 0.0
while t < time_shot_heat:
unscaled_total_heat += 1 + alpha/100 * (t+time_preheat)/shot_heat_duration # Could add other stuff here: predicted flow rate, pump power???
t += .82 # This is approximately the time between heat cycles
heat_gain = 27/.82 *(beta_0 + beta_1 * (target_weight - 36))/unscaled_total_heat
# print "Heat gain = %s" %heat_gain
return heat_gain
def adjust_heating_power():
global power, post_shot, length_history
if time.time() - start_script_time < warmup_minutes*60 and start_temp < 50:
# if heat_cycles < warmup_minutes*60/0.79 and start_temp < 50:
early_boost = warmup_temp - set_temp # For the first 3.5 minutes, if the machine starts cold, set_temp will be boosted up to 101 degrees. The machine will stabilize faster around set_temp.
else:
early_boost = 0
if shot_pouring == False:
if (steam_on == False) and (post_shot == False):
if (y[-1] > .90 * (set_temp + early_boost)) and (y[-1] < 110):
# if abs(y[-1] - set_temp) <= .5 and y[-1] <= y[-4] + .25 : # Temp is close to set point and either dropping or increasing very slowly.
# power = max(2, pid(y, set_temp + early_boost)) # Power on will always be at least 2%.
# else:
power = pid(y, set_temp + early_boost)
elif (y[-1] <= .90 * (set_temp + early_boost)):
power = 100
elif (y[-1] >= 110):
power = 0
elif (steam_on == False) and (post_shot == True):
if (y[-1] > set_temp) and (time.time() - end_shot_time < 40) and (y[-1] < 98): # Wait until temperature goes back down and stabilizes somewhat before handing temp control back to the PID
if y[-1] < y[-2]:
power = 16
else:
power = 0
else:
length_history = 2 # Resetting length_history to avoid confusing the PID (integral term) with high post shot temperature values
power = pid(y, set_temp + early_boost)
post_shot = False
elif steam_on == True:
if y[-1] < steam_temp:
power = 100
else:
power = 0
elif shot_pouring == True:
if (y[-1] >= 105):
power = 0
else:
if seconds_elapsed < time_shot_heat:
power = clip(heat_gain * (1 + alpha/100 * (time.time() - start_preheat_time)/(time_preheat + time_shot_heat)), 0, 100)
else:
power = 0
# print round(power, 2)
# Now that everything is calculated: append heat power data, along with whether a shot is being pulled or not, to lists.
heat_series.append(power)
shot_series.append(shot_pouring)
# heat_cycles = 0
def output_heat():
# global heat_cycles
if power > 0:
GPIO.output(heat_pin, True)
time.sleep(.79 * power/100) # remember that the conversion speed of the temperature sensor is always > .82 second. read_temp() and output_heat() work in 2 different threads; they shouldn't get out of sync. output_heat() is called each time a new temperature reading has been made; and the last output_heat() should always end before a new reading becomes available...
if power < 100:
GPIO.output(heat_pin, False)
time.sleep(.79 * (100-power)/100)
GPIO.output(heat_pin, False)
# if heat_cycles <= warmup_minutes*60/0.79 + 1:
# heat_cycles += 1 # Note it's better to count heat cycles than to rely on time.time() as NTP may need to sync if the RPi just booted.
def output_pump():
# Setting the pump pulse sequences corresponding to power levels in 10% increments.
pulse_sequence = [[0,0,0,0,0,0,0,0,0,0], # 0% power
[0,0,0,0,1,0,0,0,0,0], # 10%
[0,0,0,0,1,0,0,0,0,1], # 20%
[0,0,0,1,0,0,1,0,0,1], # 30%
[1,0,0,1,0,1,0,0,1,0], # 40%
[1,0,1,0,1,0,1,0,1,0], # 50%
[0,1,1,0,1,0,1,1,0,1], # 60%
[1,1,1,0,1,1,0,1,1,0], # 70%
[1,1,1,1,0,1,1,1,1,0], # 80%
[1,1,1,1,0,1,1,1,1,1], # 90%
[1,1,1,1,1,1,1,1,1,1]] # 100% power
p = pump_pin
cycle = itertools.cycle(range(0,10))
while pump_power >= 0 and pump_power <= 10 and (flush_on == True or shot_pouring == True):
GPIO.output(p, pulse_sequence[int(pump_power)][cycle.next()])
time.sleep(.02)
if pump_power < 0 or pump_power > 10 or flush_on == False or shot_pouring == False:
GPIO.output(p, 0)
if pump_power < 0 or pump_power > 10:
print "Error, choose pump_power between 0 and 10"
return
adc_singl3 = 0
adc_diff10 = 0
read_adc_thread_already_started = False
def read_adc():
# ADC measurements (scale and pot) have to be read sequentially; can't be done in 2 separate threads running concurrently for scale and pot, or output values will be wrong.
global adc_diff10, adc_singl3, read_adc_thread_already_started
# Make sure that only one read_adc thread is running at the same time.
if read_adc_thread_already_started == True:
print "Another read_adc() thread is already running, no need to open another one."
return
else:
read_adc_thread_already_started = True
print "Started read_adc() thread"
while keep_reading_pot == True or keep_reading_scale == True:
if keep_reading_scale == True:
try:
adc_diff10 = -adc.readADCDifferential01(adc_resolution, sps)
except Exception as e:
print "Exception in read_adc(): adc.readADCDifferential01() " + str(e)
read_scale(adc_diff10)
if keep_reading_pot == True:
try:
adc_singl3 = adc.readADCSingleEnded(3, 4096, 860)
except Exception as e:
print "Exception in read_adc(): adc.readADCSingleEnded() " + str(e)
read_pot(adc_singl3)
if keep_reading_scale == False: # Give the loop a rest if we're only measuring the pot voltage (at a super high sampling rate already); but if we also need to measure the voltage differential from the scale (at a lower sampling rate, for more precision), skip this step...
time.sleep(.1)
if keep_reading_pot == False and keep_reading_scale == False:
read_adc_thread_already_started = False
print "Ended read_adc() thread"
return
trigger_process_weight_reading = False
def read_scale(adc_diff10):
# Running in its own thread
global t1, raw_weight_values, trigger_process_weight_reading
if adc_diff10 > 0.1:
raw_weight_values.append(scaling[0] + adc_diff10/1000.0*scaling[1])
else:
print "Problem with scale: Reading unlikely voltage differential between ADC1 and ADC0 = %sV." %(adc_diff10)
print "Replacing implied raw_weight_value (%s g.) with previous value (%s g.)." %(scaling[0] + adc_diff10/1000.0*scaling[1] ,raw_weight_values[-1])
raw_weight_values.append(raw_weight_values[-1])
t1 = time.time()
trigger_process_weight_reading = True
# Let the volts-gram conversion and all the filtering happen in another thread.
def read_pot(adc_singl3):
global pot_value
old_pot_value = pot_value
pot_value = adc_singl3/3313.0 * 100
if abs(pot_value - old_pot_value) < 2: # To prevent oscillation, any change in pot value that is less than 2% will be discarded.
pot_value = old_pot_value
def mva(x, n1, n2):
# Moving average of last [n1, n2] values of x
return(mean(list(x[i] for i in range(len(list(x))-n1-1,len(list(x))-n2-2, -1))))
def interpolate(x, point1, point2):
# Find y so that (x, y) falls on the line between (x1, y1) and (x2, y2)
# If x < x1, y = y1
# If x > x2, y = y2
x1, y1 = float(point1[0]), float(point1[1])
x2, y2 = float(point2[0]), float(point2[1])
if x1 <= x <= x2:
y_value = y1 + (y2 - y1)/(x2 - x1)*(x - x1)
elif x > x2:
y_value = y2
elif x < x1:
y_value = y1
return y_value
def cut(x, breakpoints):
n = len(breakpoints)
if x < breakpoints[0]:
return 0
for i in range(1, n):
if breakpoints[i-1] <= x < breakpoints[i]:
return i
if x >= breakpoints[n-1]:
return n
def auto_adjust_pump(predicted_end_time, change_predicted_end_time, seconds_elapsed):
deriv_gain = 3.0
deriv_gain0 = min(max(33.0 - seconds_elapsed, 0), deriv_gain)
# For example: if target_time = 31.5:
# breakpoints_now = [interpolate(seconds_elapsed, (10,35), (25,27)),
# interpolate(seconds_elapsed, (10,40), (25,30)),
# interpolate(seconds_elapsed, (10,70), (25,33))]
breakpoints_now = [interpolate(seconds_elapsed, (10, target_time + 3.5), (0.80 * target_time, target_time - 4.5)),
interpolate(seconds_elapsed, (10, target_time + 8.5), (0.80 * target_time, target_time - 1.5)),
interpolate(seconds_elapsed, (10, target_time + 38.5), (0.80 * target_time, target_time + 1.5))]
breakpoints_future = [interpolate(seconds_elapsed + deriv_gain0, (10, target_time + 3.5), (0.80 * target_time, target_time - 4.5)),
interpolate(seconds_elapsed + deriv_gain0, (10, target_time + 8.5), (0.80 * target_time, target_time - 1.5)),
interpolate(seconds_elapsed + deriv_gain0, (10, target_time + 38.5), (0.80 * target_time, target_time + 1.5))]
a = 3 - cut(predicted_end_time, breakpoints_now)
b = 3 - cut(predicted_end_time + deriv_gain0*change_predicted_end_time/dt, breakpoints_future)
# Look up table of pump actions: pump_action[row][column] = pump_action[a][b]
# b is: Too long, OK, Too short, Way too short
# a is:
pump_action = [[ 2, 1, 1, 0], # Too long
[ 1, 0, -1, -1], # OK
[ 0, -1, -1, -2], # Too short
[ -1, -2, -2, -3]] # Way too short
return pump_action[a][b]
def clip(x, low, high):
if x >= low and x <= high:
return x
elif x < low:
return low
elif x > high:
return high
flow_mode = "Auto"
def apply_pump_profile():
global pump_power
if seconds_elapsed >= t_list[len(pp_list) - 1]:
# If seconds elapsed >= t4
pump_power = int(clip(pp_list[len(pp_list) - 1], min_pp, max_pp))
else:
# Otherwise, follow the pump profile:
for i in range(0, len(pp_list) - 1):
# if seconds_elapsed in [t0, t1), [t1, t2), [t2, t3), [t3, t4)
if seconds_elapsed >= t_list[i] and seconds_elapsed < t_list[i + 1]:
if profile_list[i] == "Flat" or t_list[i] == t_list[i+1]:
pump_power = int(clip(pp_list[i], 0, max_pp))
else:
pump_power = int(clip(interpolate(seconds_elapsed, (t_list[i], pp_list[i]), (t_list[i+1], pp_list[i+1])), 0, max_pp))
break
def pour_shot():
# This function will be run in its own thread
print "pour_shot thread started"
# Remember to open the 3-way valve here.
global pump_power, shot_pouring, end_shot_time, trigger_update_log_shot, flow_per_second, predicted_end_time, current_time, dt, keep_reading_pot, trigger_refresh_display_pump, log_shot, pp_list, t_list, profile_list
if flow_mode == "Manual":
keep_reading_pot = True
thread.start_new_thread(read_pot, ())
else:
keep_reading_pot = False
time.sleep(time_preheat+.5)
log_shot = True
trigger_update_log_shot = False
thread.start_new_thread(update_log_shot, ())
last_auto_adjust = 0.0
predicted_end_time = 0
pump_power = 0
previous_pump_power = -1
dt = 1.5
thread.start_new_thread(output_pump, ())
pp_list = [pp0, pp1, pp2, pp3, pp4]
t_list = [ 0, time_t1, time_t2, time_t3, time_t4]
profile_list = [profile_01, profile_12, profile_23, profile_34]
# pp_list.append(pp_list[-1])
# t_list.append(time_auto_flow)
# profile_list.append("Flat")
while shot_pouring == True:
current_time = time.time() - start_script_time
if flow_mode == "Auto":
if (seconds_elapsed < time_auto_flow) and (filtered_weight <= 1):
apply_pump_profile()
else:
if (time.time() - last_auto_adjust >= dt):
flow_per_second = flow_smooth
if (flow_per_second > 0):
predicted_end_time = (time.time() - start_shot_time) + (target_weight - filtered_weight) / flow_per_second - 1.15
try:
old_predicted_end_time
except NameError:
old_predicted_end_time = predicted_end_time
change_predicted_end_time = predicted_end_time - old_predicted_end_time
else: # Should never be in this situation: weight was reported as > 2 grams, or we're after 11 seconds, but coffee is not flowing.
predicted_end_time = 100 # The solution: force a pump power increase by reporting a long predicted time.
old_predicted_end_time = 100
change_predicted_end_time = 0
pump_change = auto_adjust_pump(predicted_end_time, change_predicted_end_time, time.time() - start_shot_time)
pump_power = int(clip(pump_power + pump_change, min_pp, max_pp))
old_predicted_end_time = predicted_end_time
last_auto_adjust = time.time()
elif flow_mode == "Manual":
pump_power = clip(int(round(pot_value / 10, 0)), 0, 10)
elif flow_mode == "Program":
apply_pump_profile()
if filtered_weight > target_weight - 1.15*flow_smooth:
end_shot()
if (pump_power != previous_pump_power) or (time.time()-last_log_time >= dt) or (time.time() - last_auto_adjust < .125):
trigger_update_log_shot = True
if pump_power != previous_pump_power:
trigger_refresh_display_pump = True
previous_pump_power = pump_power
time.sleep(.125)
if shot_pouring == False:
GPIO.output(pump_pin, 0)
keep_reading_pot = False
return
flow_per_second = 0
predicted_end_time = 0
time_too_long = 0
time_too_short = 0
time_way_too_short = 0
current_time = 0
def update_log_shot():
global last_log_time, trigger_update_log_shot, weight_series, weight_series_time, filtered_weight_series, adc_values, raw_weights, filtered_flow_series, filtered_time
log_current_time = []
log_current_weight = []
log_filtered_weight = []
log_flow_per_second = []
log_predicted_end_time = []
log_pump_power = []
log_time_too_long = []
log_time_too_short = []
log_time_way_too_short = []
log_start_shot_time = []
log_raw_weight = []
log_adc_diff10 = []
# Idea: record more parameters here: target_weight, time_t1, max_power, temperature at the beginning of the shot, temperature at the end, power_when_pulling_shot, etc. These could be added to the text of the e-mail.
last_log_time = 0
while log_shot == True:
time_too_long = interpolate(seconds_elapsed, (10, target_time + 38.5), (0.80 * target_time, target_time + 1.5))
time_too_short = interpolate(seconds_elapsed, (10, target_time + 8.5), (0.80 * target_time, target_time - 1.5))
time_way_too_short = interpolate(seconds_elapsed, (10, target_time + 3.5), (0.80 * target_time, target_time - 4.5))
if (trigger_update_log_shot == True):
log_current_time.append(current_time)
log_current_weight.append(current_weight)
log_filtered_weight.append(filtered_weight)
log_flow_per_second.append(flow_per_second)
log_predicted_end_time.append(predicted_end_time)
log_pump_power.append(pump_power)
log_time_too_long.append(time_too_long)
log_time_too_short.append(time_too_short)
log_time_way_too_short.append(time_way_too_short)
last_log_time = time.time()
trigger_update_log_shot = False
time.sleep(.02)
if log_shot == False:
start = start_shot_time - start_script_time
end = end_shot_time - start_script_time
if end - start > 15: # Only save logs of shots >= 15 seconds; don't bother otherwise.
filename = "/home/pi/logs/log_shot" + time.strftime('%Y-%m-%d-%H%M') + ".json"
json.dump({"time": list(log_current_time),
"weight": list(log_current_weight),
"weight_filtered": list(log_filtered_weight),
"flow_per_second": list(log_flow_per_second),
"predicted_end_time": list(log_predicted_end_time),
"pump_power": list(log_pump_power),
"t0": list(log_time_too_long),
"t1": list(log_time_too_short),
"t2": list(log_time_way_too_short),
"full_weight_series": list(weight_series),
"full_weight_series_time": list(weight_series_time),
"filtered_time": list(filtered_time),
"filtered_weight_series": list(filtered_weight_series),
"filtered_flow_series": list(filtered_flow_series),
"start": start,
"target_weight": target_weight,
"target_time": target_time,
"alpha": alpha,
"beta_0": beta_0,
"time_preheat": time_preheat,
"time_shot_heat": time_shot_heat,
"end": end,
"raw_weights": list(raw_weights),
"adc_values" : list(adc_values),
"tare_weight": tare_weight
}, open(filename, "w"))
if len(sys.argv) > 1 and sys.argv[1] == "test":
pass
else:
os.system(str("sudo R --silent --vanilla -f /home/pi/flow_analysis.R --args " + filename))
# flow_analysis.R will generate graphs with ggplot, save them as pdf, and run send_email.py to send the pdf & json files as attachment
# To clean up (but once everything has been dumped into the json file), empty the _series lists so that old values don't get dumped again for the next shot.
weight_series = []
weight_series_time = []
filtered_weight_series = []
filtered_flow_series = []
filtered_time = []
adc_values = []
raw_weights = []
return
def time_shot():
# This function will be run in its own thread
# print "time_shot thread started"
global seconds_elapsed, start_shot_time
time.sleep(time_preheat+.5)
start_shot_time = time.time()
while shot_pouring == True:
seconds_elapsed = math.floor((time.time() - start_shot_time)*10)/10
refresh_timer_display()
time.sleep(.1)
# Make sure that the thread can exit
if shot_pouring == False:
# print "time_shot thread exited"
return
# Several problems with time_shot():
# this function does too much. but it is not a good timer: "seconds_elapsed" is inaccurate: even if time.time() - start_shot_time = 12.199, seconds_elapsed = 12.1.
# Most of what the function does should be called directly by pour_shot.
# seconds_elapsed can only be used for display. for filtering calculations, pump power, logs, each function needs to call time.time() when it needs to.
def end_shot():
global wait, end_shot_time, shot_pouring, pump_power, post_shot, display_end_shot_line_now, flow_per_second, predicted_end_time
end_shot_time = time.time()
shot_pouring = False
wait = True
GPIO.output(pump_pin, 0)
pump_power = 0
flow_per_second = 0
predicted_end_time = 0
display_end_shot_line_now = True
post_shot = True
# Remember to close the 3-way valve here.
thread.start_new_thread(wait_after_shot_and_refresh, ())
last_log_time = time.time()
wait = False
def wait_after_shot_and_refresh():
global wait, trigger_refresh_graph, keep_reading_scale, last_weight, last_timer, log_shot, trigger_update_log_shot, current_time, filter_on, seconds_elapsed, filtered_weight
wait = True
while time.time() - end_shot_time < 6:
if time.time() - last_log_time >= .5:
current_time = time.time() - start_script_time
trigger_update_log_shot = True
time.sleep(.02)
if time.time() - end_shot_time >= 6:
temp_surface = lcd.subsurface(((27, 70), (253, 150)))
pygame.image.save(temp_surface, "/home/pi/lastshot.png")
reset_graph_area()
lcd.fill(col_background, area_timer)
pygame.display.update(area_timer)
refresh_graph(y, graph_top, graph_bottom, area_graph, area_text_temp)
trigger_refresh_graph = True
keep_reading_scale = False
last_weight = current_weight
last_timer = seconds_elapsed
log_shot = False
filter_on = False
seconds_elapsed = 0.0
filtered_weight = 0.0
wait = False
refresh_buttons()
return
# def try_it(f):
# # Quick and dirty way to make sure that we get a clean exit if something goes wrong.
# # Functions that run in their own thread are unsafe: if they crash, the whole script could crash with the GPIO output pins stuck in On position.
# def safe_function():
# try:
# f()
# except BaseException as e:
# clean_exit()
# print "Exiting: something went very wrong in " + str(f) + ". Exception: " + e
# traceback.print_exc()
# return
# return safe_function
# def start_safe_thread(f, *args):
# # print "Starting safe thread: " + str(f)
# thread.start_new_thread(try_it(f), ())
######################################
# DISPLAY INFO / MENUS ON THE SCREEN #
######################################
####################################
# UI area positions and dimensions #
####################################
# Screen resolution is 320 * 240.
# Careful here: pygame.Rect uses these coordinates for rectangles: (left, top, width, height) [not (left, top, right, bottom)]
area_graph = ((0,60),(290,160))
# This is the reduced-size graph window, used when pulling a shot or entering the settings menu.
# area_graph = ((0,65),(150,155))
area_text_temp = ((0,0),(160,60)) # same here, for the refresh_graph function.
area_timer = ((160,0),(120,60)) # same for the refresh_timer_display
area_icons =((290,0),(30,240))
area_belowgraph = ((0,220), (280, 20))
area_menu = ((150, 60), (130, 160))
min_range = 5
graph_top = area_graph[0][1]
graph_bottom = area_graph[0][1] + area_graph[1][1]
graph_left = area_graph[0][0]
graph_right = area_graph[0][0] + area_graph[1][0]
npixels_per_tempreading = 2
# Setting up the screen
os.putenv('SDL_FBDEV', '/dev/fb1')
pygame.display.init()
pygame.font.init()
pygame.mouse.set_visible(False)
# lcd = pygame.display.set_mode((320, 240))
# 24 bit depth required to use anti-aliasing
lcd = pygame.display.set_mode((320, 240),0, 24)
col_lightblue = (0,191,255)
col_orange = (255,165,0)
col_lightgreen = (124,252,0)
col_red = (255,0,0)
col_white = (255,255,255)
col_veryverydarkgrey = (48,48,48)
col_verydarkgrey = (64,64,64)
col_darkgrey = (128,128,128)
col_medgrey = (192,192,192)
col_lightgrey = (224,224,224)
col_black = (0,0,0)
col_lightred = (153,0,76)
col_verylightred = (218,62,140)
col_yellow = (255,255,0)
col_darkred = (204,0,0)
col_blue = (44, 103, 210)
col_background = col_black
col_text = col_lightblue
col_axis_lines = col_lightblue
col_axis_labels = col_lightblue
col_pump_box_full = col_lightblue
col_pump_box_empty = col_verydarkgrey
col_pump_text = col_lightblue
col_temp_line = col_white
col_temp_line_during_shot = col_red
col_heat = col_orange
col_weight_text = col_white
col_weight_line = col_white
col_lastshot = col_darkgrey
col_settemp_line = col_orange
col_menu_text = col_white
col_menu_text_select = col_black
col_menu_text_select_background = col_lightgrey
col_menu_value = col_white
col_flow_polygon = col_lightred
col_flow_line = col_verylightred
col_weight_target = col_orange
col_pump_profile_line = col_white
col_pump_profile_grid = col_verydarkgrey
col_pump_profile_param = col_red
col_intrashot_grid = col_verydarkgrey
col_intrashot_label = col_white
color_scenarios = [col_darkred, col_orange, col_yellow, col_white]
col_lights_outline = col_verydarkgrey
# col_background = col_white
# col_text = col_blue
# col_axis_lines = col_blue
# col_axis_labels = col_blue
# col_pump_box_full = col_blue
# col_pump_box_empty = col_lightgrey
# col_pump_text = col_blue
# col_temp_line = col_verydarkgrey
# col_temp_line_during_shot = col_orange
# col_heat = col_red
# col_weight_text = col_verydarkgrey
# col_weight_line = col_black
# col_lastshot = col_lightgrey
# col_settemp_line = col_red
# col_menu_text = col_black
# col_menu_text_select = col_white
# col_menu_text_select_background = col_blue
# col_menu_value = col_black
# col_flow_polygon = col_lightred
# col_flow_line = col_lightred
# col_weight_target = col_darkred
# col_pump_profile_line = col_black
# col_pump_profile_grid = col_lightgrey
# col_pump_profile_param = col_red
# col_intrashot_grid = col_lightgrey
# col_intrashot_label = col_black
# color_scenarios = [col_darkred, col_orange, col_yellow, col_white]
# col_lights_outline = col_verydarkgrey
lcd.fill(col_background, ((0,0), (320,240)))
pygame.display.update()
def display_brightness(value):
if value > 100 or value < 0:
print "Error: pick a value between 0 and 100"
return
else:
global brightness
brightness = value
v = str(int(value * 1023.0 / 100.0))
os.system(str("gpio -g pwm 18 " + v))
return
def BestTick(largest):
if largest > 150:
tick = 50
elif largest > 75:
tick = 25
elif largest > 40:
tick = 20
elif largest > 20:
tick = 10
elif largest > 10:
tick = 5
else:
tick = 2
return tick
def coordinates(max_values, value, graph_top, graph_bottom):
# graph_top and graph_bottom are the location of the top and bottom of the graph area on the screen in pixels (0 is the very top of the screen, 240 is the very bottom)
# example: graph_top = 80, and graph_bottom = 220.
return (graph_top + (max_values[1] - value) * (graph_bottom - graph_top)/(max_values[1] - max_values[0]))
def axis_min_max(subset_y):
# if max(subset_y) - min(subset_y) < min_range:
# y_range = (min(subset_y) - min_range/2 + (max(subset_y) - min(subset_y))/2, max(subset_y) + min_range/2 - (max(subset_y) - min(subset_y))/2)
# else:
# y_range = (min(subset_y), max(subset_y))
y_range = (min(min(subset_y), set_temp), max(max(subset_y), set_temp))
padding_y_axis = 0.05 * (y_range[1] - y_range[0])
# These are the most extreme values that could be plotted on the y axis, on the top/bottom edges of area_graph
y_axis = (int(min(math.floor(y_range[0] - padding_y_axis), set_temp - 3)), int(max(math.ceil(y_range[1] + padding_y_axis), set_temp + 3)))
return y_axis
def display_text(string, coordinates, size, color, condensed = False):
if condensed == False:
font = pygame.font.Font(None, size)
else:
font = font = pygame.font.Font('/usr/share/fonts/truetype/liberation/LiberationSansNarrow-Bold.ttf', size)
text_surface = font.render(string, True, color)
lcd.blit(text_surface, coordinates)
def draw_axes(y_axis, tick, graph_top, graph_bottom):
for val in xrange(0, y_axis[1], tick) :
coord_val = coordinates(y_axis, val, graph_top, graph_bottom)
if (coord_val < graph_bottom) and (coord_val > graph_top): # remember that y coordinates start at 0 from the top of the display
pygame.draw.line(lcd, col_axis_lines, (graph_left, coord_val), (graph_right-30, coord_val)) # change this 300 value, based on graph_right
display_text(str(val), (graph_right-25, coord_val-10), 20, col_axis_labels) # change this 305 value, based on graph_right, leave space for icons
def draw_lines(y_coord, y_axis, graph_top, graph_bottom, color_series):
coord_set_temp = coordinates(y_axis, set_temp, graph_top, graph_bottom)
if (coord_set_temp < graph_bottom) and (coord_set_temp > graph_top): # remember that y coordinates start at 0 from the top of the display
pygame.draw.line(lcd, col_settemp_line, (graph_left, coord_set_temp), (graph_right-30, coord_set_temp)) # change this 300 value, based on graph_right
# pointlist = [[graph_left + npixels_per_tempreading*j for j in range(0, len(y_coord))],
# [y_coord[j] for j in range(0, len(y_coord))]]
# pointlist = [[pointlist[0][i],pointlist[1][i]] for i in range(0, len(y_coord))]
for j in xrange(1, len(y_coord)):
pygame.draw.aaline(lcd, color_series[j-1], (graph_left + npixels_per_tempreading*(j-1), y_coord[j-1]),(graph_left + npixels_per_tempreading*j, y_coord[j]), 0)
# def draw_lines(subset_y, y_axis, tick, graph_top, graph_bottom, color_series):
# coord_set_temp = coordinates(y_axis, set_temp, graph_top, graph_bottom)
# if (coord_set_temp < graph_bottom) and (coord_set_temp > graph_top): # remember that y coordinates start at 0 from the top of the display
# pygame.draw.line(lcd, col_settemp_line, (graph_left, coord_set_temp), (graph_right-30, coord_set_temp)) # change this 300 value, based on graph_right
# y_coord = [coordinates(y_axis, subset_y[j], graph_top, graph_bottom) for j in xrange(0, len(subset_y))]
# # pointlist = [[[graph_left + npixels_per_tempreading*(j-1), y_coord[j-1]], [graph_left + npixels_per_tempreading*j, y_coord[j]]] for j in range(1, len(y_coord))]
# # pointlist = list(itertools.chain.from_iterable(pointlist))
# # pygame.draw.lines(lcd, col_temp_line, False, pointlist, 1)
# graph_surface.lock()
# for j in xrange(1, len(y_coord)):
# pygame.draw.aaline(graph_surface, color_series[j-1], (npixels_per_tempreading*(j-1), y_coord[j-1] - graph_top),(npixels_per_tempreading*j, y_coord[j] - graph_top)) # 2 is the line thickness here.
# # pygame.draw.line(lcd, color_series[j-1], (graph_left + npixels_per_tempreading*(j-1), y_coord[j-1]),(graph_left + npixels_per_tempreading*j, y_coord[j]), 1) # 2 is the line thickness here.
# graph_surface.unlock()
# lcd.blit(graph_surface, (graph_left, graph_top))
def draw_power(power_data):
for j in xrange(0, len(power_data)):
pygame.draw.line(lcd, col_heat, (graph_left+npixels_per_tempreading*j, 220), (graph_left+npixels_per_tempreading*j, int(220-power_data[j]/4)),1)
# prev_y_axis = (0, 0)
# y_minmax = (65, 215)
# prev_y_minmax = (0, 0)
def refresh_graph(y, graph_top, graph_bottom, area_graph, area_text_temp):
global prev_y_axis, prev_y_minmax, prev_x_range