#!/usr/bin/python3 # # clean, sort, merge raw data # for ecolux and ecotrack Projects # output one huge table with all necessary fields for further processing/vizualization # ############################ # TODO ############################ # sanityParse of Temperature >> sometimes get erratic values (eg -14C°) # Thomas' ControlTag: 049E4C5EF2F44880CE # > ecotrack time is in CEST (both track and temp) => UTC+2 => C'est une problème? # > rewrite getXData to initXData # # > parse moonmap and skyglowdict from skyglow Project sources # -> maybe integrate skyglow project as submodule # > integrate with ST # > ST-Filenames should include 'unit-X'! # > how to process skylog error in merge() # > Speed Up! (takes 2min per 1mio lines) # 1. Use Threads for # https://realpython.com/python-concurrency/#multiprocessing-version # > reading files (easy) # > merging (intermediate) # * parse chunks of trackData. call sort at end # * calc indices of data dependent on numThreads # 2. rewrite animalmerge for loops # 3. Consider using Pandas, NumPy, Cython, Pypy, Koalas, inline C++/C/Bash, sql (SQLAlchemy) #Q: wieviel track-events wurden von clean rausgefiltert? import sys, getopt, os, re, string, time, platform, json, datetime from pprint import pprint as pp from datetime import datetime as dt import dataFilter TIME_FMT='%Y-%m-%d %H:%M:%S' noTime = dt.fromtimestamp(0) TAG_LEN=len("04B94A7F7288588022") RULER=f"{'#'*100}\n" rootPath = os.path.abspath( os.path.dirname( __file__ ) ) #uses cwd(), which is set to scriptdir at start N_UNITS=12 confFileName="./conf.json" min_temp=12 def initFileStructure(): ## trackFiles global T_DELIM, TIMESTAMP_LEN, T_COL_MS, T_COL_X, T_COL_Y, T_COL_TAG, T_MINLEN, T_MAXLEN, T_NCOLS T_DELIM=';' TIMESTAMP_LEN=10 #t_col_timestamp=0 T_COL_MS=1 T_COL_X=2 T_COL_Y=3 T_COL_TAG=4 #t_col_signalstrength=5 if PROJ == "ecolux": #track+light T_MINLEN = 38 # without newline. T_MAXLEN = 40 # ms are variable in length (0-999). Will be padded later for uniform len T_NCOLS = 6 elif PROJ == "ecotrack": #track+temp 2019 T_MINLEN = 36 T_MAXLEN = 38 T_NCOLS = 5 # elif PROJ == "schrebatron": #SchrebaTron: ms is padded with zeros if hasLight(): ## lightFile global L_DELIM, L_COL_MOON_REAL, L_COL_MOON_DMX, L_COL_SKYGLOW, L_MINLEN, L_MAXLEN, L_NCOLS L_DELIM='\t' L_COL_MOON_REAL=1 L_COL_MOON_DMX=2 L_COL_SKYGLOW=3 L_MINLEN = 61 #2020-07-21 15:15:00 0.0 0 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] L_MAXLEN = 76 #2020-07-21 15:09:00 0.0 0 [15, 54, 4, 19, 0, 1, 5, 0, 40, 12, 123, 123] L_NCOLS = 4 if hasTemp(): ## tempFile (C for Celsius) global C_TIME_FMT, C_COL_TIME, C_COL_UNIT, C_COL_HABITAT, C_COL_TEMP, C_MIN_LEN, C_MAX_LEN, C_DELIM, C_NCOLS C_TIME_FMT = "%Y-%m-%d %H:%M:%S.%f" C_COL_TIME = 0 C_COL_UNIT = 1 C_COL_HABITAT = 2 C_COL_TEMP = 3 C_MIN_LEN = 30 #coarse approximation C_MAX_LEN = 40 C_DELIM = ';' C_NCOLS = 4 ## animalFile global A_DELIM, A_MINLEN, A_MAXLEN, A_NCOLS, A_COL_SPECIES, A_COL_SPECIES_IND, A_COL_WEIGHT_WO_TAG_MG, A_COL_TAG if PROJ == "ecolux": A_DELIM='\t' A_MINLEN = 25 A_MAXLEN = 30 A_NCOLS = 4 A_COL_SPECIES=0 A_COL_SPECIES_IND=1 A_COL_WEIGHT_WO_TAG_MG=2 A_COL_TAG=3 elif PROJ == "ecotrack": A_DELIM=';' A_MINLEN = 30 A_MAXLEN = 45 A_NCOLS = 7 #/w comments A_COL_SPECIES=1 A_COL_SPECIES_IND=0 A_COL_WEIGHT_WO_TAG_MG=2 A_COL_TAG=5 # out file ########### global OUT_FMT, DELIM, COL_UID, COL_HID, COL_TAG OUT_FMT='%Y%m%d' DELIM=',' COL_UID=4 COL_TAG=8 #mlx (idx is also dmx-byte value) moonMap = [ 1.409, 8.369, 12.333, 16.210, 20.563, 24.716, 28.907, 33.363, 36.843, 41.137, 46.527, 50.880, 54.727, 59.617, 63.967, 67.817, 73.173, 77.213, 82.827, 87.037, 91.033, 95.320, 99.470, 104.366, 108.833, 113.667, #25 123.733, 128.567, 133.2, 138.3, 143.367, 148.233, 152.233, 156.933, 162.5, 167.5, 172.1, 176.767, 181.567, 187, 192.067, 196.433, 201.467, 206.5, 211.733, 216.4, 221.033, 226, 231.1, 235.733, 240.4, #50 245.6, 250.433, 254.567, 259.5, 263.933, 268.3, 272.6, #274 == new max 277.267, 281.433, 287.467, #60 ] #Mapping skyglow dmx+nled to lux skyglowDict = { 15: 0.08, 54: 0.3, 4: 1, 19: 0.1, 0: 0, 1: 0.01, 5: 0.03, 0: 0, 40: 10, 12: 3, 123: 30, 123: 30, } def hasLight(): return PROJ == "ecolux" def hasTemp(): return PROJ == "ecotrack" def getUID(fileName): """ extract and return unit-id from fileName """ f=fileName.split('-') idx=f.index("unit")+1 uid=int(f[int(idx)]) return 1 if uid == 13 else uid class Data: def __init__(self, animalFile, skyglowFile, tempFile, trackFiles): self.animalData = self.getAnimalData(animalFile) self.trackData = [] for file in trackFiles: self.trackData.extend( self.getTrackData(file) ) self.trackData.sort() self.startTrack = self.getTrackTime( self.trackData[0] ) self.endTrack = self.getTrackTime( self.trackData[-1] ) if hasLight(): self.lightData = self.getLightData(skyglowFile) self.startLight = self.getLightTime( self.lightData[0] ) self.endLight = self.getLightTime( self.lightData[-1] ) if hasTemp(): self.initTempData(tempFile) global OUT_FILE OUT_FILE = f"{PROJ}-data-{self.startTrack.strftime(OUT_FMT)}-{self.endTrack.strftime(OUT_FMT)}" pp(self.header()) def getTrackData(self, fileName): """ get lines from all tracklogs, clean, merge and add/rm columns""" pp(f"processing: {fileName}") with open(INPUTDIR+os.sep+fileName) as f: lines = f.readlines() lines = self.clean(lines, T_MINLEN, T_MAXLEN, T_NCOLS, T_DELIM, self.getTrackTime) lines = [l for l in lines if not l.split(DELIM)[T_COL_TAG].isdigit()] if not lines: pp("-> empty // non-usable") return [] uid=getUID(fileName) idx_after_ms = TIMESTAMP_LEN + len('999') + len(DELIM) for idx, line in enumerate(lines): if PROJ in ["ecolux","ecotrack"]: # pad ms column with zeros to fit three digits cols=line.split(DELIM) ms_digits=len(cols[T_COL_MS]) if ms_digits < 3: line = line[:TIMESTAMP_LEN+1] + '0'*(3-ms_digits) + line[TIMESTAMP_LEN+1:] trackTime = self.getTrackTime(line) date = trackTime.strftime('%Y-%m-%d') time = trackTime.strftime('%H:%M') x = int(cols[T_COL_X]) y = int(cols[T_COL_Y]) habitat = self.calcHabitat(x,y) # add columns (date,time,uid,habitat) line = line[:idx_after_ms] + DELIM + date + DELIM + time + DELIM + str(uid) + DELIM + str(habitat) + line[idx_after_ms:] if PROJ == "ecolux": line = line[:line.rindex(DELIM)] # remove signalstrength column lines[idx] = line assert (lines), "trackData: Not valid. Check times / formatting." return lines def calcHabitat(self,x,y): if PROJ == "ecolux": if x in (1,2) and y in (1,2): return 1 elif x in (5,6) and y in (1,2): return 2 elif x in (1,2) and y in (5,6): return 3 elif x in (5,6) and y in (5,6): return 4 else: return 0 elif PROJ == "ecotrack": if x in (1,2) and y in (1,2,5,6): return 2 elif x in (1,2) and y in (3,4,7,8): return 3 elif x in (3,4) and y in (1,2,5,6): return 3 elif x in (3,4) and y in (3,4,7,8): return 4 else: return "NA" else: return "NA" def getLightData(self,file): with open(file) as f: lines = f.readlines() lines = self.clean(lines, L_MINLEN, L_MAXLEN, L_NCOLS, L_DELIM, self.getLightTime) pattern = re.compile(r"\s\t") # skylog used to have an xtra space (changed 200915) TODO remove lines = [re.sub(pattern, "\t", x) for x in lines ] assert (lines), "skyglow: Not valid. Check times / formatting." return lines def initTempData(self,file): """ get temp data and save in a list of list of tuples >> tempData[unit][habitat] returns (time,temp)""" pp(f"processing: {file}") with open(file) as f: lines = f.readlines() lines = self.clean(lines, C_MIN_LEN, C_MAX_LEN, C_NCOLS, C_DELIM, self.getTempTime) lines.sort() self.startTemp = self.getTempTime( lines[0] ) self.endTemp = self.getTempTime( lines[-1] ) self.lenTemp = len(lines) #list of empty lists self.tempData = [[] for _ in range(N_UNITS) ] for l in lines: cols = l.split(DELIM) uid = int( cols[C_COL_UNIT] ) - 1 time = dt.strptime( cols[C_COL_TIME],C_TIME_FMT ) temp = float( cols[C_COL_TEMP] ) if temp < min_temp: continue lastTime = self.tempData[uid][-1][0] if time == lastTime: self.tempData[uid][-1].append( temp ) else: self.tempData[uid].append( [time,temp] ) #calc mean temp for uid in range(N_UNITS): for i,x in enumerate( self.tempData[uid] ): n=0 mean_temp=0 for temp in x[1:]: mean_temp+=temp n+=1 mean_temp/=n self.tempData[uid][i] = [time,mean_temp] if temp < min_temp: for x in range(N_HABITATS): temptime, temp = self.tempData[uid][i] pp(f" {temptime} : {temp}") input() if temp >= min_temp: break def getAnimalData(self,file): with open(file) as f: lines = f.readlines() lines = self.clean(lines, A_MINLEN, A_MAXLEN, A_NCOLS, A_DELIM) #TODO: check for duplicate tags and print warning / ignore? assert (lines), "tags: Not valid. Check times / formatting." return lines def clean(self, lines, minLen, maxLen, nCols, sep, timeFunc=None): """ remove trailing newline, empty, comments and all-digits and remove lines before startTime or after endTime""" lines = [l.strip() for l in lines] lines = [l for l in lines if minLen <= len(l) <= maxLen] lines = list(filter(lambda q: q and q[0] != '#', lines)) lines = [l for l in lines if len(l.split(sep)) == nCols] lines = [l.replace(sep,DELIM) for l in lines] lines = [l.replace('\ufeff','') for l in lines] #cutout BOM (ByteOrderMark). Showed up in ecotrack temp-csv. if timeFunc: lines = list(filter(lambda q: startTime <= timeFunc(q) < endTime, lines)) #only between start and endTime return lines def parseLightLine(self, line, uid): if not hasLight(): return cols=line.split(DELIM) moon_real = float(cols[L_COL_MOON_REAL])*1000 #convert to mLux moon_dmx = int(cols[L_COL_MOON_DMX]) moon_eco = moonMap[moon_dmx] #int list from string format like '[1, 2, 3, 4, 5]' skyglowList = cols[L_COL_SKYGLOW:] skyglowList[0] = skyglowList[0][1:] #remove '[' skyglowList[-1] = skyglowList[-1][:-1] #remove ']' skyglow = skyglowDict.get(int(skyglowList[uid-1]),'') return (moon_real, moon_eco, skyglow) def merge(self): self.mergeAnimals() if hasLight(): self.mergeLight() if hasTemp(): self.mergeTemp() def mergeAnimals(self): """ merge AnimalData into TrackData on TagID """ pp("merging animal data into track data. This might take a while. Time to move around. Or grab a Coffee.") #TODO SPEED UP!!! # ttags_not_found=[] notFound=0 for idx, tLine in enumerate(self.trackData): found=False ttag = tLine.split(DELIM)[COL_TAG] for aLine in self.animalData: aLine = aLine.split(DELIM) atag = aLine[A_COL_TAG] if ttag == atag: found=True aLine = DELIM + aLine[A_COL_SPECIES_IND] + DELIM + aLine[A_COL_SPECIES] + DELIM + aLine[A_COL_WEIGHT_WO_TAG_MG] self.trackData[idx] += aLine break; if not found: self.trackData[idx] += f'{DELIM}NA{DELIM}NA{DELIM}NA' notFound+=1 # ttags_not_found.append(ttag) pp(f"Didn't find Tags for {notFound} tracking events!") def mergeLight(self): """merge LightData into TrackData on Time""" pp("merging light data into track data") track_idx=0 light_idx=0 lightTime = self.startLight trackTime = self.startTrack while trackTime < lightTime: self.trackData[track_idx] += f'{DELIM}NA{DELIM}NA{DELIM}NA' track_idx+=1 if track_idx >= len(self.trackData): pp("skyglow.log starts after track.log...") return trackTime = self.getTrackTime(self.trackData[track_idx]) pp(f"start light merge @ {trackTime} -- idx {track_idx}") for trackLine in self.trackData[track_idx:]: trackTime = self.getTrackTime(trackLine) uid = int(trackLine.split(DELIM)[COL_UID]) while lightTime < trackTime and light_idx < len(self.lightData) - 1: # -> minute precision light_idx+=1 lightTime = self.getLightTime(self.lightData[light_idx]) #add light cols moon_real, moon_eco, skyglow = self.parseLightLine(self.lightData[light_idx],uid) trackLine += DELIM + "{:.3f}".format(moon_real) + DELIM + "{:.3f}".format(moon_eco) + DELIM + str(skyglow) self.trackData[track_idx] = trackLine track_idx+=1 def mergeTemp(self): """merge tempData into TrackData on Time""" pp("merging temperature data into track data") track_idx = 0 trackTime = self.startTrack tids = [0 for _ in range(N_UNITS) ] #fill NA's for tracking events starting before temperature while trackTime < self.startTemp: self.trackData[track_idx] += f'{DELIM}NA' track_idx+=1 if track_idx >= len(self.trackData): pp("temperature starts after trackdata...") return trackTime = self.getTrackTime(self.trackData[track_idx]) pp(f"start temperature merge @ {trackTime} -- idx {track_idx}") for trackLine in self.trackData[track_idx:]: cols = trackLine.split(DELIM) uid = int(cols[COL_UID]) - 1 trackTime = self.getTrackTime(trackLine) # get temp idx for temp within 2:30min. TempSensors record once every 5 minutes. delta = datetime.timedelta(minutes=2, seconds=30) i = tids[uid] while self.tempData[uid][i][0] < trackTime-delta and i < len(self.tempData[uid]): # pp(f"{self.tempData[uid][hid][i][0]} < {trackTime} - {delta} ?? {self.tempData[uid][hid][i][0] < trackTime-delta}") i += 1 tids[uid]= i tempTime, temp = self.tempData[uid][i] # add temp col trackLine += DELIM + str(temp) self.trackData[track_idx] = trackLine track_idx+=1 def getLightTime(self,line): if not hasLight(): return cols = line.strip().split(DELIM) sTime = cols[0].strip() if "Error" in sTime: return noTime lightTime = dt.strptime(sTime,TIME_FMT) #convert to UTC from local BerlinTZ and respect DST sun 25 october 3AM -> -1 hour = 2AM again winterTime = dt.strptime("2020-10-25 03:00:00",TIME_FMT) hours= ( 2 if lightTime < winterTime else 1 ) lightTime -= datetime.timedelta(hours=hours) # -2h in summmer, -1h in winter return lightTime def getTrackTime(self,line): """ convert timestamp string to datetime """ try: timestamp=int(line[:TIMESTAMP_LEN]) except ValueError as e: print("Caught:",e) print("Presumably binary data in txt file... or some other corruption") return noTime return dt.fromtimestamp(timestamp) def getTempTime(self,line): cols = line.split(DELIM) return dt.strptime(cols[C_COL_TIME],C_TIME_FMT) def header(self): h= f"# {PROJ} data\n" \ f"{RULER}" \ f"# len(Track): {len(self.trackData)}\n" \ f"# startTrack: {self.startTrack}\n" \ f"# endTrack: {self.endTrack}\n" \ f"# len(animal): {len(self.animalData)}\n" if hasLight(): h += f"# len(Light): {len(self.lightData)}\n" \ f"# startLight: {self.startLight}\n" \ f"# endLight: {self.endLight}\n" if hasTemp(): h += f"# len(Temperature): {self.lenTemp}\n" \ f"# startTemp: {self.startTemp}\n" \ f"# endTemp: {self.endTemp}\n" h += RULER h += f"#timestamp, ms, date, time, unit, habitat, x, y, tag, species, speciesnumber, weight_without_tag[mg]" if hasLight(): h+=', moon_real[mLux], moon_eco[mLux], skyglow[Lux]' if hasTemp(): h+=', temp[C°]' h +="\n" h += RULER return h def write(self): os.makedirs(OUTPUTDIR, exist_ok=True) os.chdir(OUTPUTDIR) pp("delete old data") cmd_delete_data=f"rm *-data-*" os.system(cmd_delete_data) pp("write to " + OUT_FILE) with open(OUT_FILE, "w") as f: f.writelines(self.header()) self.trackData=map(lambda x:x+'\n', self.trackData) f.writelines(self.trackData) pp("compress") cmd_compress=f"tar -C {OUTPUTDIR} -zcvf {OUT_FILE}.tgz {OUT_FILE}" # ">/dev/null 2>/dev/null" os.system(cmd_compress) def xtract(): for root, dirs, files in os.walk(INPUTDIR): #walk recursively for file in files: if file.endswith(".tgz") or file.endswith(".tar.gz"): print(f"xtracting {file}\n") cmd_xtract=f"tar -zxvf {INPUTDIR}{os.sep}{file} -C {INPUTDIR}" os.system(cmd_xtract) return def getFileList(): print( "Get files." ) animalFile, skyglowFile, tempFile, trackFiles = '','','',[] animalFiles=["tags","list-individuals.csv"] skyglowFiles=["skyglow.log"] for root, dirs, files in os.walk(INPUTDIR): #walk recursively for file in files: if file.endswith(".tgz") or file.endswith(".tar.gz"): continue elif "unit-" in file: trackFiles.append(file) elif file in animalFiles: animalFile = INPUTDIR+os.sep+file print(f"Found animalFile: {file}") elif hasLight() and file in skyglowFiles: skyglowFile = INPUTDIR+os.sep+file print(f"Found logfile: {file}") elif hasTemp() and "emp" in file : tempFile = INPUTDIR+os.sep+file print(f"Found tempFile: {file}") else: pp(f"Ignoring ({file})") if hasTemp() and not tempFile: sys.exit(f"Couldn't find temperature data (filename containing 'temp' or 'Temp')") if not trackFiles: sys.exit(f"No tracking data found. (filenames containing 'unit-X', where X is a number)") if hasLight() and not skyglowFile: sys.exit(f"Couldn't find {skyglowFiles}") if not animalFile: sys.exit(f"Couldn't find {animalFiles}") return animalFile, skyglowFile, tempFile, trackFiles def make_rel_abs_path(path): path = os.path.expanduser(path) if not os.path.isabs(path): path = os.path.abspath(rootPath+os.sep+path) return os.path.normpath(path) def main(): initFileStructure() # xtract() animalFile, skyglowFile, tempFile, trackFiles = getFileList() data = Data(animalFile,skyglowFile,tempFile,trackFiles) data.merge() data.write() dataFilter.main(OUT_FILE) if platform.system() == "Linux": os.system(rootPath + "/updateGitIgnore.sh") os.system(rootPath + "/unknown-tags.sh " + OUTPUTDIR + " > " + OUTPUTDIR + "/tagInfo") os.system(rootPath + "/unknown-tags.sh " + OUTPUTDIR + " listUnkownTags") def readConf(): global PROJ, OUTPUTDIR, INPUTDIR, startTime, endTime, blox confFile = make_rel_abs_path(confFileName) with open(confFile, 'r') as f: content = json.load(f) conf = content["conf"] known_projects=["ecotrack","ecolux","schrebatron"] print(f" Process raw data for which configuration?\n {RULER}") for idx, c in enumerate(conf): print(f" {idx+1}) Project {c[0]}") print(f" from: {c[3]}") print(f" to: {c[4]}") print(f" in: {c[1]}") print(f" out: {c[2]}\n") try: n=int(input())-1 PROJ = conf[n][0].lower() INPUTDIR = make_rel_abs_path( conf[n][1] ) OUTPUTDIR = make_rel_abs_path( conf[n][2] ) startTime = dt.strptime(f"{conf[n][3]} 00:00:00", TIME_FMT) endTime = dt.strptime(f"{conf[n][4]} 00:00:00", TIME_FMT) if PROJ not in known_projects: raise Exception(f"Unknown Project '{PROJ}'") blox = content["blox"][PROJ] for b in blox: b[1]=dt.strptime(f"{b[1]} 00:00:00",TIME_FMT) b[2]=dt.strptime(f"{b[2]} 00:00:00",TIME_FMT) except Exception as e: print(f"Error: {e}\n") print(f"You can add a configuration to {confFileName}") print(f"It has following form:") print(f"") print(f" ['Project', 'inDir', 'outDir', 'start', 'end']") print(f"") print(f" > Project must be in {known_projects}") print(f" > dirs can be relative. eg: ('./data','../data','~/data')") print(f" > times must have Format like '{dt.now().strftime('%Y-%m-%d')}'") exit() if __name__ == "__main__" : readConf() start = time.time() main() end = time.time() print(f"Took {end - start} seconds")