I use sensors on Pi for temperature and humidity, capturing time / epoch at point reading is taken
Overall approach
- wrapper class that reads sensors and returns a dict of sensor data including time / epoch
- controller on Pi that uses wrapper class and publishes data to a RESTful API (flask)
- active live updating web page (uses AJAX and ChartJS
PiSensor.py
import os, platform
import time, datetime, random
try:
import Adafruit_DHT
except ImportError:
plat = "{sys}.{mc}".format(sys=platform.system(), mc=platform.machine())
if (plat == "Linux.armv7l"):
raise
else:
print(f"Adafruit_DHT not supported on this platform {plat}")
from typing import (Tuple, Optional)
try:
DHT_SENSOR = Adafruit_DHT.DHT11
except NameError:
pass
DHT_PIN = 17
#----------------------------------------------------------------
# Note:
# ds18b20's data pin must be connected to pin7.
#----------------------------------------------------------------
class PiSensor:
__hostname = ""
__latitude = 1.3740981993768249
__longitude = 103.85034994827384
def __init__(self) -> None:
self.__hostname = os.uname()[1].split(".")[0]
def set_gps(self, latitude:float, longitude:float) -> None:
self.__latitude = latitude
self.__longitude = longitude
def get_gps(self) -> Tuple[float, float]:
return self.__longitude, self.__latitude
def sensorData(self, id:str, temp:float, humidity:Optional[float]=None) -> list:
data = [{"id":id, "time":datetime.datetime.now().strftime("%I:%M:%S%p"),
"value":temp, "hostname":self.__hostname, "epoch":int(time.time()*1000),
"latitude":self.__latitude, "longitude":self.__longitude, "type":"temperature"}]
if humidity is not None:
data2 = data[0].copy()
data2["id"] = data2["id"] + "_humidity" # type: ignore
data2["value"] = humidity
data2["type"] = "humidity"
data.append(data2)
return data
# Reads temperature from sensor and prints to stdout
# id is the id of the sensor
def readSensor(self, id:str) -> list:
with open("/sys/bus/w1/devices/"+id+"/w1_slave") as tfile:
text = tfile.read()
secondline = text.split("\n")[1]
temperaturedata = secondline.split(" ")[9]
temperature = float(temperaturedata[2:])
temperature = temperature / 1000
return self.sensorData(id, temperature)
# Reads temperature from all sensors found in /sys/bus/w1/devices/
# starting with "28-...
def readSensors(self) -> list:
count = 0
data:list = []
for file in os.listdir("/sys/bus/w1/devices/"):
if (file.startswith("28-")):
data += self.readSensor(file)
count+=1
if (count == 0):
print("No sensor found! Check connection")
humidity, temp = Adafruit_DHT.read_retry(DHT_SENSOR, DHT_PIN)
if humidity is not None and temp is not None:
data += self.sensorData("DHT11", temp, humidity)
else:
print("oh no DHT11 not working")
return data
controller (just sample code)
def read_sensor(alive, mode:int, ser:SerialInterface, config) -> None:
sess = requests.Session()
sensor = PiSensor()
logenv.alog().info("thread started... sleeping {s}".format(s=config["sensorsleep"]))
data += sensor.readSensors()
try:
resp = sess.post("{u}/sensor_data".format(u=u), json=data, verify=False)
except requests.exceptions.ConnectionError:
# maybe one of servers has gone down....
logenv.alog().warning("no data sent {u} not alive".format(u=u))
finally:
sess.close()
logenv.alog().info("shutting down thread")
flask API
@app.route('/sensor_data', methods=['POST'])
def sensor_data() -> str:
c = mycache()
dfsensor = c.get("dfsensor")
newsensor = json_normalize(request.get_json())
newsensor[["x","y"]] = newsensor[["epoch", "value"]]
newsensor["xy"] = newsensor[['x', 'y']].agg(pd.Series.to_dict, axis=1)
newsensor["amin"] = newsensor["value"]
newsensor["amax"] = newsensor["value"]
newsensor = newsensor.drop(columns=["x","y"])
# add new data from serial interface to start of list (append old data to new data).
# default time as now to new data
dfsensor = newsensor.append(dfsensor, sort=False)
# keep size down - only last 500 observations
c.set("dfsensor", dfsensor[:500])
del dfsensor
return jsonify(result={"status":"ok"})
# used in conjnction with AJAX functionality in home.html template
@app.route('/serial_json', methods=['GET'])
def serial_json() -> Response:
type = request.args.get("type", "temperature")
freq = request.args.get("freq", "")
graphtype = request.args.get("graphtype")
df = mycache().get("dfsensor{f}".format(f=freq))
df = df[df["type"]==type] # type: ignore
# resample data down to proportion of data set and defined number of points
if "sensorcookie" in request.cookies:
ck = json.loads(urllib.parse.unquote(request.cookies.get("sensorcookie")))
if not "pointsslider" in ck:
ck = {"pointsslider": 100, "pointstoplot":20, **ck}
else:
ck = {"pointsslider": 100, "pointstoplot":20}
df, min_epoch, max_epoch = sensor_ts.resample_for_plot(df, int(ck["pointsslider"]), int(ck["pointstoplot"]))
extdata = restdata_source.tickdata(df=df, required="graph", graphtype=graphtype)
del df
resp = make_response(jsonify(result={"data":extdata}))
ck = {"min_epoch":int(min_epoch), "max_epoch":int(max_epoch), **ck}
resp.set_cookie("sensorcookie", json.dumps(ck))
return resp