Commit 21c59e5b authored by dhomm's avatar dhomm
Browse files

add Thesis code to main

parent 065f00fc
#!/usr/bin/env python
# Step One of the PLOE algorithm
import numpy as np
import statistics
def paired_acc_gyr(sensor_list, position_key):
for s in sensor_list:
if s.info.metadata["NAME"].lower() in "accelerometer":
acc = s
for s2 in sensor_list:
if s2.info.metadata["NAME"].lower() in "gyroscope" and \
s2.info.metadata[position_key] == acc.info.metadata[position_key]:
gyr = s2
yield acc, gyr
def get_none_wear_times(acc_data, gyro_data, freq):
"""
Based on PLOE, a time frame will be recognized
as non wear time, if it is at least 30mins holding following
conditions:
1. the standard deviation of acceleration is less than 13mg
2. the value range for 2 out of 3 axes is less than 50mg
:args:
acceleration_data: data recordings of acceleration as list or np array, [[x1, y1, z1], [x2, y2, z2], ....]
gyro_data: analog to acceleration_data but including gyroscope recordings
freq: the Frequency in Hertz (Hz) as integer
:return:
None, if there is no non wear time
A list of tuples with start and end index of acceleration with non wear time
"""
if ((len(acc_data)/freq)/60) < 30:
return []
# nonwear_times
non_wear = []
non_wear_15 = None
# non_wear = None
#using 15min intervalls searching for 15min non wear periods
start, end = 0, freq*60*15
while end < len(acc_data):
non_wear_15 = check_timeframe(acc_data, gyro_data, start, end)
# check 1 minute time frames before and after to check if the non wear time can accumulate to 30min
if non_wear_15:
while check_timeframe(acc_data, gyro_data, start, end):
start = start - freq*60
start = start + freq*60
while check_timeframe(acc_data, gyro_data, start, end):
end = end + freq*60
end = end - freq*60
if end - start >= freq*60*30:
non_wear.append((start, end))
else:
pass
# could add that even if the time frame is shorter 30min
# the new start will be set afterwards
else:
start = end
end = end + freq*60*15
return non_wear
def check_accelerometer(acc_data, start, end):
"""
This function checks acceleration recording[start:end]
if in this period of time it included acceleration above
two thresholds
"""
# standart deviation for time frame
std_x0 = np.std([comp[0] for comp in acc_data[start:end]])
std_x1 = np.std([comp[1] for comp in acc_data[start:end]])
std_x2 = np.std([comp[2] for comp in acc_data[start:end]])
# ranges for all components in the time frame
min_x0, max_x0 = np.inf, -np.inf
min_x1, max_x1 = np.inf, -np.inf
min_x2, max_x2 = np.inf, -np.inf
for sample in acc_data[start:end]:
if sample[0] < min_x0:
min_x0 = sample[0]
elif sample[0] > max_x0:
max_x0 = sample[0]
if sample[1] < min_x1:
min_x1 = sample[1]
elif sample[1] > max_x1:
max_x1 = sample[1]
if sample[2] < min_x2:
min_x2 = sample[2]
elif sample[2] > max_x2:
max_x2 = sample[2]
# the ranges:
range_x0 = max_x0 - min_x0
range_x1 = max_x1 - min_x1
range_x2 = max_x2 - min_x2
# what are 13mg and what are 50mg?
# 13g = 127.48645m/s² => 13mg = 0.1275m/s²
# 50g = 490.3325m/s² => 50mg = 0.4903m/s²
mg_13 = 0.1275
mg_50 = 0.4903
# I think these thresholds are far to low =>
# Threshold for std = 0.5m/s², for range of acceleration = 1m/s²
std_threshold = 0.5
range_threshold = 1
#print("Standartdde:", [std_x0, std_x1, std_x2])
#print("Rages:", [range_x0, range_x1, range_x2])
if len([std for std in [std_x0, std_x1, std_x2] if std < std_threshold]) > 1 and\
len([j for j in [range_x0, range_x1, range_x2] if j < range_threshold]) > 1:
non_movement = (start, end)
else:
#print([std_x0, std_x1, std_x2], [range_x0, range_x1, range_x2])
non_movement = None
return non_movement
def check_gyroscope(gyro_data, start, end):
"""
This function checks a gyroscope recording[start:end],
if in this period of time angular velocity has taken place
"""
# standart deviation for time frame
std_x0 = np.std([comp[0] for comp in gyro_data[start:end]])
std_x1 = np.std([comp[1] for comp in gyro_data[start:end]])
std_x2 = np.std([comp[2] for comp in gyro_data[start:end]])
# ranges for all components in the time frame
min_x0, max_x0 = np.inf, -np.inf
min_x1, max_x1 = np.inf, -np.inf
min_x2, max_x2 = np.inf, -np.inf
for sample in gyro_data[start:end]:
if sample[0] < min_x0:
min_x0 = sample[0]
elif sample[0] > max_x0:
max_x0 = sample[0]
if sample[1] < min_x1:
min_x1 = sample[1]
elif sample[1] > max_x1:
max_x1 = sample[1]
if sample[2] < min_x2:
min_x2 = sample[2]
elif sample[2] > max_x2:
max_x2 = sample[2]
# the ranges:
range_x0 = max_x0 - min_x0
range_x1 = max_x1 - min_x1
range_x2 = max_x2 - min_x2
std_threshold = 0.2
range_threshold = 0.5
#print("Standartdde:", [std_x0, std_x1, std_x2])
#print("Rages:", [range_x0, range_x1, range_x2])
if len([std for std in [std_x0, std_x1, std_x2] if std < std_threshold]) > 1 and\
len([j for j in [range_x0, range_x1, range_x2] if j < range_threshold]) > 1:
non_movement = (start, end)
else:
#print([std_x0, std_x1, std_x2], [range_x0, range_x1, range_x2])
non_movement = None
return non_movement
def check_timeframe(acc_data, gyro_data, start, end):
"""
This function checks if a given time span beginning with start
and ending with end index, is a static activity and therefore not recording any
acceleration except the gravity and only almost 0 angular velocity
:args:
acc_data: complete acceleration data
gyro_data: complete gyroscope data
start: first index
end: last index
:return:
"""
if check_accelerometer(acc_data, start, end) and check_gyroscope(gyro_data, start, end):
return (start, end)
else:
return None
# Step 2 of the PLOE algorithm
def get_sit_stand(acc_data, gyro_data, freq, timespan, non_wear, thresholds=(9, 4)):
"""
This function estimates the sitting data and the standing
data given the acceleration or the subtitles.
:args:
subtitle:
acc_data:
freq:
timespan: Integer indicating how long a static activity needs to take place to be estimated as such
(PLOE uses 15seconds)
non_wear:
:return:
sitting1, sitting2, standing: three tuples holding each two integer representing start and end index
of this static activity:
sitting1 is when the thumb is pointing left and right, respectively
sitting2 is when the thumb is pointing up
standing is when the hands pointing to the ground (right now they could point up in the air as well)
"""
sitting1 = []
sitting2 = []
standing = []
# if sitting or standing activities are labeled
"""
for act in subtitle[0]:
# since labels can vary, decision that the probability of sit and stand in
# sitting or standing
if "sit" in act[2].lower():
sitting.append((int(act[0]/(1000/freq)), int(act[1]/(1000/freq))))
if "stand" in act[2].lower():
standing.append((int(act[0]/(1000/freq)), int(act[1]/(1000/freq))))
# if subtitle include such patterns return them
#if sitting and standing:
# return sitting, standing
"""
# assuming that sitting, lying or standing still data
# has the same properties as non-wear time but is shorter than 30mins
# checking for 15sec time frames
if timespan < 0 or freq*timespan < 1:
raise Exception("timespan needs to be an integer > 0 and frequency*timespan needs to be > 1 ")
start, end = 0, int(freq*timespan)
while end < len(acc_data):
no_movement = check_timeframe(acc_data, gyro_data, start, end)
# for both standing and sitting non-wear properties should hold
if no_movement:
# check if it is included in a non_wear timespan
worn = True
for timespan in non_wear:
# check the static activity is recognized during a period of not wear time
if timespan[0] > no_movement[0] and no_movement[1] < timespan[1]:
worn = False
#check if it is already in sitting or standing (probably not needed in the end)
if worn:
# decide whether it is sitting or standing
activity = sit_or_stand(acc_data, start, end, thresholds)
#print(activity, start, end)
if int(activity) == 0:
if activity == 0.1:
sitting1.append((start, end))
elif activity == 0.2:
sitting2.append((start, end))
elif activity == 1:
standing.append((start, end))
elif activity == 2:
pass
#print("Static, but not usefull:", (start, end))
else:
raise Exception("Somehow the static activity seems weird please, check Index:", start, end)
start = end
end = end + int(freq*timespan)
return sitting1, sitting2, standing
def sit_or_stand(acc_data, start, end, thresholds=(9, 4)):
"""
This function estimates if the given static acceleration data
belongs to sitting or standing data
:args:
acc_data:
:return:
0 for standing
1 for sitting
2 for static activity where the wrist is not oriented as we want it to be
"""
acc_data = acc_data[start:end]
median_x = abs(statistics.median([sample[0] for sample in acc_data]))
median_y = abs(statistics.median([sample[1] for sample in acc_data]))
median_z = abs(statistics.median([sample[2] for sample in acc_data]))
#print("Mediaaaan:",median_y)
# if either Y or Z is 1g and X is nearly 0: sitting
if median_x < thresholds[1] and median_y < thresholds[1] and median_z > thresholds[0]:
# first sitting with thumbs pointing left and right
return 0.1
elif median_x < thresholds[1] and median_y > thresholds[0] and median_z < thresholds[1]:
# second sitting with thumbs pointing up
#print("hallöle222222222222")
return 0.2
# if X-Axis (directed to the palm or ellbow) == 1g
# and both other are more near to 0: standing
elif median_x > thresholds[0] and median_y < thresholds[1] and median_z < thresholds[1]:
#print("hallöle33333333333333")
return 1
else:
# otherwise the orientation is not usefull for PLOE
return 2
def direction_decider(median_x_standing, median_y_sitting, median_z_sitting):
if median_x_standing > 0:
x = "X -> Palm"
else:
x = "X -> Elbow"
if median_y_sitting > 0 :
y = "Y -> Not Thumb"
else:
y = "Y -> Thumb"
if median_z_sitting > 0:
z = "Z -> Palm"
else:
z = "Z -> Backhand"
return x + ", " + y + ", " + z
def placement_decider(median_x_standing, median_y_sitting, median_z_sitting):
"""
This placement_decider is based on the orientation of the accelerometer's
axes in the Paper:
On Placement, Location and Orientation of Wrist-Worn Tri-Axial Accelerometers during
Free-Living Measurements
This does not hold for any given Sensor in every data set, therefore a new placement_decider needs to
be developed dependend on one given orientation.
"""
# ++-
if median_x_standing > 0 and median_y_sitting > 0 and median_z_sitting < 0:
return "Position L1"
# ---
elif median_x_standing < 0 and median_y_sitting < 0 and median_z_sitting < 0:
return "Position L2"
# +-+
elif median_x_standing > 0 and median_y_sitting < 0 and median_z_sitting > 0:
return "Position L3"
# -++
elif median_x_standing < 0 and median_y_sitting > 0 and median_z_sitting > 0:
return "Position L4"
# +--
elif median_x_standing > 0 and median_y_sitting < 0 and median_z_sitting < 0:
return "Position R1"
#-+-
elif median_x_standing < 0 and median_y_sitting > 0 and median_z_sitting < 0:
return "Position R2"
# +++
elif median_x_standing > 0 and median_y_sitting > 0 and median_z_sitting > 0:
return "Position R3"
# --+
elif median_x_standing < 0 and median_y_sitting < 0 and median_z_sitting > 0:
return "Position R4"
else:
raise Exception("This case is actually impossible but just in case")
def placement_l1_dependend(l1_prop, median_x_standing, median_y_sitting2, median_z_sitting1):
"""
This function returns the placement, location and orientation of
an accelerometer given different characteristics dependent
on the characteristics of the Placement L1
:args:
l1_prop: (maybe aswell a list with medians), string with since of the needed medians
median_x_standing:
median_y_sitting2:
median_z_sitting1:
:return:
the placement and directions as string
"""
# medians = [median_x_standing, median_y_sitting2, median_z_sitting1]
directions = direction_decider(median_x_standing, median_y_sitting2, median_z_sitting1)
# Positions L1 is as in PLOE paper
# print("Actual orientation of axes:", ['+' if el > 0 else "-" for el in medians])
if l1_prop == "++-":
pl = placement_decider(median_x_standing, median_y_sitting2, median_z_sitting1)
# other combinations
elif l1_prop == "+++":
pl = placement_decider(median_x_standing, median_y_sitting2, -median_z_sitting1)
elif l1_prop == "+-+":
pl = placement_decider(median_x_standing, -median_y_sitting2, -median_z_sitting1)
elif l1_prop == "+--":
pl = placement_decider(median_x_standing, -median_y_sitting2, median_z_sitting1)
elif l1_prop == "-++":
pl = placement_decider(-median_x_standing, median_y_sitting2, -median_z_sitting1)
elif l1_prop == "-+-":
pl = placement_decider(-median_x_standing, median_y_sitting2, median_z_sitting1)
elif l1_prop == "--+":
pl = placement_decider(-median_x_standing, -median_y_sitting2, -median_z_sitting1)
elif l1_prop == "---":
pl = placement_decider(-median_x_standing, -median_y_sitting2, median_z_sitting1)
return pl + ", " + directions
# Step 4 of the PLOE algorithm since Step 3 will be applied within this step on the segments given
def wrist_localization_orientation(sitting1_data, sitting2_data, standing_data, l1_prop="++-"):
"""
This function considers the common orientation of an accelerometer as following:
if the sensor is placed on the left wrist:
if the sensor is wearn on top of the wrist:
if the sensor is placed like common watch:
Position L1:
X-Axis directed to the palm,
Y-Axis directed in opposite direction of the thumb ,
Z-Axis directed as the back of the hand
according to this all other positions can be calculated.
X-Axis can be orientated as following:
1. directed to the hand
2. directed to the ellbow
Y-Axis can be orientated as following:
1. in direction of the thumb
2. in the opposite direction of the thumb
Z-Axis can be orientated as following:
1. same direction as the backhand
2. directed as the palm
and further computes sever static activities to show the accelerometer
placement, location and orientation
:args:
sitting1_data: array with recorded accelerometer data of the activity: sitting1
sitting2_data: array with recorded accelerometer data of the activity: sitting2
standing_data: array with recorded accelerometer data of the activity: standing
l1_prop: if the orientation of X,Y and Z axis is not as in the for Position L1 in
the function describtion
:return:
the placement, location and orientation as following: probably X, Y and Z direction
"""
# starting with the distingtion between left and right wrist:
# only used Feature in: On Placement, Location and Orientation of Wrist-Worn
# Tri-Axial Accelerometers during Free-Living Measurements
# calculate median for every component
# during standing
median_x_standing = statistics.median([sample[0] for sample in standing_data])
#median_y_standing = statistics.median([sample[1] for sample in standing_data])
#median_z_standing = statistics.median([sample[2] for sample in standing_data])
# during sitting1: backhand pointing up (most often occurence)
#median_x_sitting 1= statistics.median([sample[0] for sample in sitting1_data])
#median_y_sitting1 = statistics.median([sample[1] for sample in sitting1_data])
median_z_sitting1 = statistics.median([sample[2] for sample in sitting1_data])
#during sitting2: thumbs pointing up
#median_x_sitting2 = statistics.median([sample[0] for sample in sitting2_data])
median_y_sitting2 = statistics.median([sample[1] for sample in sitting2_data])
#median_z_sitting2 = statistics.median([sample[2] for sample in sitting2_data])
# not used but could come in handy for better evaluation
# calculating the mean of every component
"""
mean_x_standing = sum([sample[0] for sample in standing_data])/len(standing_data)
mean_y_standing = sum([sample[1] for sample in standing_data])/len(standing_data)
mean_z_standing = sum([sample[2] for sample in standing_data])/len(standing_data)
"""
# sum for every component how often they are the highest value
"""
sum_high_x = count([sample[0] if max(sample) == sample[0] for sample in standing_data])
sum_high_y = count([sample[1] if max(sample) == sample[1] for sample in standing_data])
sum_high_z = count([sample[2] if max(sample) == sample[2] for sample in standing_data])
"""
return placement_l1_dependend(l1_prop, median_x_standing, median_y_sitting2, median_z_sitting1)
#!/usr/bin/env python
#from wrist_placement.PLOE import *
from PLOE import *
from av.io import read
# perfetkt für Handwashing: mit 15/20 /23 richtigen:)
def most_frequent(arr):
"""
This function returns the most frequent item in the array List, based on
https://www.geeksforgeeks.org/python-find-most-frequent-element-in-a-list/
"""
return max(set(arr), key=arr.count)
def ploe_adaption_max(acc_stream, gyro_stream, sit1=[], sit2=[], stand=[], sub_stream=0, l1_prop="--+", thresholds=(9, 3.5), frequency=100):
"""
Look ploe_adaption, the only difference is that here
if sit1, sit2 or stand is already including data
for the next lower timespan and threshold, respectivly
it will not be overwritten
:args:
l1_prop: Default "--+" is the standard calibration for android smart watches (Uni-Mannheim and Handwashing data set)
"""
try:
freq = acc_stream.info.sample_rate
except AttributeError:
freq = frequency
timespan = 15
non_wear_time = get_none_wear_times(acc_stream, gyro_stream, freq)
# sit1, sit2, stand = get_sit_stand(acc_stream, gyro_stream, freq, timespan, non_wear_time, thresholds)
while (not sit1 or not sit2 or not stand) and timespan > 0.5:
# if a list has a bigger timespan take it and don't splitt it again
# if a list was already set with a better threshold don't overwrite it
if sit1:
if sit2:
_, _, stand = get_sit_stand(acc_stream, gyro_stream, freq, timespan, non_wear_time, thresholds)
elif stand:
_, sit2, _ = get_sit_stand(acc_stream, gyro_stream, freq, timespan, non_wear_time, thresholds)
else:
_, sit2, stand = get_sit_stand(acc_stream, gyro_stream, freq, timespan, non_wear_time, thresholds)
elif sit2:
if stand:
sit1, _, _ = get_sit_stand(acc_stream, gyro_stream, freq, timespan, non_wear_time, thresholds)
else:
sit1, _, stand = get_sit_stand(acc_stream, gyro_stream, freq, timespan, non_wear_time, thresholds)
elif stand:
sit1, sit2, _ = get_sit_stand(acc_stream, gyro_stream, freq, timespan, non_wear_time, thresholds)
else:
sit1, sit2, stand = get_sit_stand(acc_stream, gyro_stream, freq, timespan, non_wear_time, thresholds)
# reduce timespan to become able to find data segements of sit1, sit2 and stand
if timespan > 1:
timespan = timespan - 1
else:
timespan = timespan - 0.1
if not sit1 or not sit2 or not stand:
#print(thresholds)
#print(sit1, sit2, stand)
if thresholds[0]-1.5 > thresholds[1]:
if thresholds[0] > 7.5:
tr0 = thresholds[0]-0.5
else:
tr0 = 7.5
return ploe_adaption_max(acc_stream, gyro_stream, sit1=sit1, sit2=sit2, stand=stand, l1_prop=l1_prop, thresholds=(tr0, thresholds[1]+0.5), frequency=freq)
return "The adapted PLOE alogrithm could not find the placement of the sensor, since there are no static sequences longer than 0.4sec", 0
print("Chosen timespan=", timespan, "Chosen Threshold:", thresholds)
placements = []
for sSit1, eSit1 in sit1:
for sSit2, eSit2 in sit2:
for sStand, eStand in stand:
placements.append(wrist_localization_orientation(acc_stream[sSit1:eSit1], acc_stream[sSit2:eSit2], acc_stream[sStand:eStand], l1_prop=l1_prop))
placement = most_frequent(placements), placements.count(most_frequent(placements))/len(placements)
return placement, timespan, thresholds, 1
if __name__ == "__main__":
"""
# one test case
res20 = []
import os
#for file in os.listdir("../../../data/2020-Handwashing"):
parent = "../../../data/My_Recordings/Daily_Living"
for file in os.listdir(parent):
f1 = file
file = parent+ "/"+ f1
#audio = read("a:", file=file)
#sub = read("s:", file=file)
#acc_stream = audio[1]
#gyro_stream = audio[2]
#sub_stream = sub
f = np.load(file)
acc_stream = [i[:3] for i in f]
gyro_stream = [i[3:] for i in f]
res20.append((f1, ploe_adaption_max(acc_stream, gyro_stream, l1_prop="--+")))
print(res20)
res = res20
# in res2020 or res are all data stored for handwashing 2020
r2 = [i for i in res if "Position R2" in i[1][0][0]]
r = [i for i in res if "Position R" in i[1][0][0]]
l = [i for i in res if "Position L" in i[1][0][0]]
al = [i for i in res if "Position L" in i[1][0][0] or "Position R" in i[1][0][0]]