0

I have a BNO055 sensor connected to my Raspberry Pi via I2C protocol. This sensor can output 9 measurements including acceleration in all 3 axes.(x,y,z)

I'd like to have a live plot of the x_acceleration as a function of time.

I'm trying to run this code for a project on my Raspberry Pi.

def animate(i,t_updt,f,sp1,sensor1):
    initialtime = time.perf_counter()
    lastTime = initialtime;
    times=np.array([])
    x=np.array([])
    while(1): 
        currentTime = time.perf_counter()
        if  (currentTime - lastTime >= 1/f):
            lastTime = currentTime
            times = np.append(times,lastTime - initialtime)
            sens1Acc = sensor1.acceleration     
            x = np.append(x,sens1Acc[0])       
        if len(times) == t_updt*f:
            break
    #Clear and draw new values   
    sp1.clear()
    sp1.plot(times,x)
    sp1.set_title('X - accel - sensor1')
    sp1.set_ylim([-10,10])
import adafruit_bno055
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import datetime as dt
from busio import I2C
from board import SDA, SCL
from time import sleep

##Creating sensor1 object
sensor1 = adafruit_bno055.BNO055_I2C(i2c,40)
sensor1.mode = adafruit_bno055.NDOF_MODE
sleep(2) # wait for system to stabilize

f=50 #Sampling freq

fig = plt.figure()
sp1 = fig.add_subplot(1,1,1)       

ani = animation.FuncAnimation(fig, animate, fargs=(5,f,sp1,sensor1),interval=1000)
plt.show()


As you can see I've implemented my animate function which is being called every 1 sec. During the running of this function, there is a while loop that runs for 5 seconds(because of the fargs[0] argument passed ) , in which we sample 5 seconds of data at a rate of 50 hz.

My only problem is: The graph plot doesn't update. I've printed out and saw that the x values do actually get updated, but the plot itself doesn't update...

Any ideas?

1 Answer 1

1

I use sensors on Pi for temperature and humidity, capturing time / epoch at point reading is taken Overall approach

  1. wrapper class that reads sensors and returns a dict of sensor data including time / epoch
  2. controller on Pi that uses wrapper class and publishes data to a RESTful API (flask)
  3. active live updating web page (uses AJAX and ChartJS

PiSensor.py

import os, platform
import time, datetime, random
try:
    import Adafruit_DHT
except ImportError:
    plat = "{sys}.{mc}".format(sys=platform.system(), mc=platform.machine())
    if (plat == "Linux.armv7l"):
        raise
    else:
        print(f"Adafruit_DHT not supported on this platform {plat}")
from typing import (Tuple, Optional)

try:
    DHT_SENSOR = Adafruit_DHT.DHT11
except NameError:
    pass
DHT_PIN = 17


#----------------------------------------------------------------
#   Note:
#       ds18b20's data pin must be connected to pin7.
#----------------------------------------------------------------

class PiSensor:
    __hostname = ""
    __latitude = 1.3740981993768249
    __longitude = 103.85034994827384

    def __init__(self) -> None:
        self.__hostname = os.uname()[1].split(".")[0]

    def set_gps(self, latitude:float, longitude:float) -> None:
        self.__latitude = latitude
        self.__longitude = longitude

    def get_gps(self) -> Tuple[float, float]:
        return self.__longitude, self.__latitude

    def sensorData(self, id:str, temp:float, humidity:Optional[float]=None) -> list:
        data = [{"id":id, "time":datetime.datetime.now().strftime("%I:%M:%S%p"),
                "value":temp, "hostname":self.__hostname, "epoch":int(time.time()*1000),
                "latitude":self.__latitude, "longitude":self.__longitude, "type":"temperature"}]
        if humidity is not None:
            data2 = data[0].copy()
            data2["id"] = data2["id"] + "_humidity" # type: ignore
            data2["value"] = humidity
            data2["type"] = "humidity"
            data.append(data2)

        return data

    # Reads temperature from sensor and prints to stdout
    # id is the id of the sensor
    def readSensor(self, id:str) -> list:
        with open("/sys/bus/w1/devices/"+id+"/w1_slave") as tfile:
            text = tfile.read()
        secondline = text.split("\n")[1]
        temperaturedata = secondline.split(" ")[9]
        temperature = float(temperaturedata[2:])
        temperature = temperature / 1000

        return self.sensorData(id, temperature)


    # Reads temperature from all sensors found in /sys/bus/w1/devices/
    # starting with "28-...
    def readSensors(self) -> list:
        count = 0
        data:list = []
        for file in os.listdir("/sys/bus/w1/devices/"):
            if (file.startswith("28-")):
                data += self.readSensor(file)
                count+=1
        if (count == 0):
            print("No sensor found! Check connection")


        humidity, temp = Adafruit_DHT.read_retry(DHT_SENSOR, DHT_PIN)
        if humidity is not None and temp is not None:
            data += self.sensorData("DHT11", temp, humidity)
        else:
            print("oh no DHT11 not working")


        return data

controller (just sample code)

def read_sensor(alive, mode:int, ser:SerialInterface, config) -> None:
    sess = requests.Session()
    sensor = PiSensor()

    logenv.alog().info("thread started... sleeping {s}".format(s=config["sensorsleep"]))

    data += sensor.readSensors()

    try:
        resp = sess.post("{u}/sensor_data".format(u=u), json=data, verify=False)
    except requests.exceptions.ConnectionError:
        # maybe one of servers has gone down....
        logenv.alog().warning("no data sent {u} not alive".format(u=u))
    finally:
        sess.close()
        logenv.alog().info("shutting down thread")

flask API

@app.route('/sensor_data', methods=['POST'])
def sensor_data() -> str:
    c = mycache()
    dfsensor = c.get("dfsensor")
    newsensor = json_normalize(request.get_json())
    newsensor[["x","y"]] = newsensor[["epoch", "value"]]
    newsensor["xy"] = newsensor[['x', 'y']].agg(pd.Series.to_dict, axis=1)
    newsensor["amin"] = newsensor["value"]
    newsensor["amax"] = newsensor["value"]
    newsensor = newsensor.drop(columns=["x","y"])

    # add new data from serial interface to start of list (append old data to new data).
    # default time as now to new data
    dfsensor = newsensor.append(dfsensor, sort=False)
    # keep size down - only last 500 observations
    c.set("dfsensor", dfsensor[:500])
    del dfsensor

    return jsonify(result={"status":"ok"})

# used in conjnction with AJAX functionality in home.html template
@app.route('/serial_json', methods=['GET'])
def serial_json() -> Response:
    type = request.args.get("type", "temperature")
    freq = request.args.get("freq", "")
    graphtype = request.args.get("graphtype")

    df = mycache().get("dfsensor{f}".format(f=freq))
    df = df[df["type"]==type] # type: ignore

    # resample data down to proportion of data set and defined number of points
    if "sensorcookie" in request.cookies:
        ck = json.loads(urllib.parse.unquote(request.cookies.get("sensorcookie")))
        if not "pointsslider" in ck:
            ck = {"pointsslider": 100, "pointstoplot":20, **ck}
    else:
        ck = {"pointsslider": 100, "pointstoplot":20}
    df, min_epoch, max_epoch = sensor_ts.resample_for_plot(df, int(ck["pointsslider"]), int(ck["pointstoplot"]))

    extdata = restdata_source.tickdata(df=df, required="graph", graphtype=graphtype)
    del df
    resp = make_response(jsonify(result={"data":extdata}))
    ck = {"min_epoch":int(min_epoch), "max_epoch":int(max_epoch), **ck}
    resp.set_cookie("sensorcookie", json.dumps(ck))

    return resp

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.