#!/usr/bin/python3 # clean, sort, merge raw data # output should be one huge table with all necessary fields for further processing/vizualization # ############################ # TODO *most important ############################ # parse moonmap and skyglowdict from skyglow Project sources (submodule ... ?) # # # include temperature and humidity data? # integrate with ST # > make Skyglow optional -> TEST # > ST-Filenames should include 'unit-X'! # # how to process skylog error in merge() # # Speed Up! (takes 16minutes for 8mio lines...) # 1. Threads # > see https://realpython.com/python-concurrency/#multiprocessing-version # use threads for # > reading files (easy) # > merging (intermediate) # * parse chunks of trackData. call sort at end # * calc indices of data dependent on numThreads # # 2. rewrite animalmerge for loops # 3. Consider using Pandas, NumPy, Cython, Pypy, Koalas, ... # 4. inline C++/C/Bash # 5. sql (SQLAlchemy) # # how to profile to find bottlenecks? # # for lists: use append() instead of + / += operators # # Experiment Times #################### # Track start 7.7 # Light start 20.7 # Dark: 10.7 - 20.7 # Block1: 21.7 - 18.8 # Block2: 15.9 - 13.10 # import sys, getopt, os, re, datetime, string from pprint import pprint as pp from datetime import datetime as dt import time PROJ = "ecotron" # PROJ = "schrebatron" TIME_FMT='%Y-%m-%d %H:%M:00' startTime = dt.strptime('2020-07-07 00:00:00', TIME_FMT) endTime = dt.strptime('2020-11-30 00:00:00', TIME_FMT) # endTime = dt.strptime('2020-08-18 00:00:00', TIME_FMT) noTime = dt.fromtimestamp(0) TAG_LEN=len("04B94A7F7288588022") # trackfile T_DELIM=';' TIMESTAMP_LEN=10 T_COL_MS=1 T_COL_DATE=2 T_COL_TAG=4 T_MINLEN = 38 # without newline. T_MAXLEN = 40 # ms column varies 1-3. Will be padded later for uniform len T_NCOLS = 6 # lightfile L_DELIM='\t' L_COL_MOON_REAL=1 L_COL_MOON_DMX=2 L_COL_SKYGLOW=3 L_MINLEN = 61 #2020-07-21 15:15:00 0.0 0 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] L_MAXLEN = 76 #2020-07-21 15:09:00 0.0 0 [15, 54, 4, 19, 0, 1, 5, 0, 40, 12, 123, 123] L_NCOLS = 4 # animalfile A_DELIM='\t' A_MINLEN = 25 #guess A_MAXLEN = 30 A_NCOLS = 4 A_COL_SPECIES=0 A_COL_SPECIES_IND=1 A_COL_WEIGHT_WO_TAG_MG=2 A_COL_TAG=3 # out file OUT_FMT='%Y%m%d' DELIM=',' COL_UID=4 COL_TAG=7 #mlx (idx is also dmx-byte value) moonMap = [ 1.409, 8.369, 12.333, 16.210, 20.563, 24.716, 28.907, 33.363, 36.843, 41.137, 46.527, 50.880, 54.727, 59.617, 63.967, 67.817, 73.173, 77.213, 82.827, 87.037, 91.033, 95.320, 99.470, 104.366, 108.833, 113.667, #25 123.733, 128.567, 133.2, 138.3, 143.367, 148.233, 152.233, 156.933, 162.5, 167.5, 172.1, 176.767, 181.567, 187, 192.067, 196.433, 201.467, #42 > max-moon is 201.9mlx 206.5, 211.733, 216.4, 221.033, 226, 231.1, 235.733, 240.4, #50 # 274 # > new max! ] #Mapping skyglow dmx+nled to lux skyglowDict = { 15: 0.08, 54: 0.3, 4: 1, 19: 0.1, 0: 0, 1: 0.01, 5: 0.03, 0: 0, 40: 10, 12: 3, 123: 30, 123: 30, } def hasLight(): return PROJ.lower() == "ecotron" def getUID(fileName): """ extract and return unit-id from fileName """ f=fileName.split('-') idx=f.index("unit")+1 uid=int(f[int(idx)]) return 1 if uid == 13 else uid class Data: def __init__(self,animalFile, skyglowFile, trackFiles): self.trackData = [] for file in trackFiles: self.trackData.extend( self.getTrackData(file) ) self.trackData.sort() self.startTrack = self.getTrackTime( self.trackData[0] ) self.endTrack = self.getTrackTime( self.trackData[-1] ) self.animalData = self.getAnimalData(animalFile) if hasLight(): self.lightData = self.getLightData(skyglowFile) self.startLight = self.getLightTime( self.lightData[0] ) self.endLight = self.getLightTime( self.lightData[-1] ) pp(self.header()) def getTrackData(self, fileName): """ get lines from tracklog and add unit number as column""" pp(f"processing: {fileName}") with open(INPUTDIR+'/'+fileName) as f: lines = f.readlines() lines = self.clean(lines, T_MINLEN, T_MAXLEN, T_NCOLS, T_DELIM, self.getTrackTime) if not lines: pp("-> empty!") return [] uid=getUID(fileName) idx_after_ms = TIMESTAMP_LEN + len('999') + len(DELIM) for idx, line in enumerate(lines): # pad ms column with zeros to fit three digits (ecotron) ms_digits=len(line.split(DELIM)[T_COL_MS]) if ms_digits < 3: line = line[:TIMESTAMP_LEN+1] + '0'*(3-ms_digits) + line[TIMESTAMP_LEN+1:] trackTime = self.getTrackTime(line) date = trackTime.strftime('%Y-%m-%d') time = trackTime.strftime('%H:%M') # add date,time,uid columns line = line[:idx_after_ms] + DELIM + date + DELIM + time + DELIM + str(uid) + line[idx_after_ms:] # remove signalstrength column line = line[:line.rindex(DELIM)] lines[idx] = line assert (lines) return lines def getLightData(self,file): with open(file) as f: lines = f.readlines() lines = self.clean(lines, L_MINLEN, L_MAXLEN, L_NCOLS, L_DELIM, self.getLightTime) pattern = re.compile(r"\s\t") # skylog used to have an xtra space (changed 200915) TODO remove lines = [re.sub(pattern, "\t", x) for x in lines ] assert (lines) return lines def getAnimalData(self,file): with open(file) as f: lines = f.readlines() lines = self.clean(lines, A_MINLEN, A_MAXLEN, A_NCOLS, A_DELIM) #TODO: check for duplicate tags and print warning / ignore? assert (lines) return lines def clean(self, lines, minLen, maxLen, nCols, sep, timeFunc=None): """ remove trailing newline, empty and comments and remove lines before startTime or after endTime""" lines = [l.strip() for l in lines] lines = [l for l in lines if minLen <= len(l) <= maxLen] # EcoTron: ms are variable in length (0-999), SchrebaTron: ms is padded with zeros lines = [l for l in lines if len(l.split(sep)) == nCols] lines = [l.replace(sep,DELIM) for l in lines] lines = list(filter(lambda q: q and q[0] != '#', lines)) if timeFunc: lines = list(filter(lambda q: startTime <= timeFunc(q) < endTime, lines)) #only between start and endTime return lines def parseLightLine(self, line, uid): if not hasLight(): return cols=line.split(DELIM) moon_real = float(cols[L_COL_MOON_REAL])*1000 #convert to mLux moon_dmx = int(cols[L_COL_MOON_DMX]) moon_eco = moonMap[moon_dmx] #int list from string format like '[1, 2, 3, 4, 5]' skyglowList = cols[L_COL_SKYGLOW:] skyglowList[0] = skyglowList[0][1:] #remove '[' skyglowList[-1] = skyglowList[-1][:-1] #remove ']' skyglow = skyglowDict.get(int(skyglowList[uid-1]),'') return (moon_real, moon_eco, skyglow) def merge(self): self.merge_animals() if hasLight(): self.merge_light() def merge_animals(self): """ merge AnimalData into TrackData on TagID """ pp("merging animal data into track data (this might take a while)") #TODO SPEED UP!!! 8Mio lines take 16Minutes... 32mins # ttags_not_found=[] dflt=f'{DELIM}X{DELIM}X00{DELIM}0' notFound=0 for idx, tLine in enumerate(self.trackData): found=False ttag = tLine.split(DELIM)[COL_TAG] for aLine in self.animalData: # print(tLine) # print(aLine) # print(atag) aLine = aLine.split(DELIM) atag = aLine[A_COL_TAG] if ttag == atag: found=True aLine = DELIM + aLine[A_COL_SPECIES_IND] + DELIM + aLine[A_COL_SPECIES] + DELIM + aLine[A_COL_WEIGHT_WO_TAG_MG] self.trackData[idx] += aLine break; if not found: self.trackData[idx] += dflt notFound+=1 # ttags_not_found.append(ttag) pp(f"Didn't find Tags for {notFound} tracking events!") def merge_light(self): """merge LightData into TrackData on Time""" pp("merging light data into track data") track_idx=0 light_idx=0 lightTime = self.startLight trackTime = self.startTrack while( trackTime < lightTime ): self.trackData[track_idx] += f'{DELIM}0{DELIM}0{DELIM}0' track_idx+=1 if( track_idx >= len(self.trackData) ): pp("skyglow.log starts after track.log...") return trackTime = self.getTrackTime(self.trackData[track_idx]) pp(f"start merge @ {trackTime} -- idx {track_idx}") for trackLine in self.trackData[track_idx:]: trackTime = self.getTrackTime(trackLine) uid = int(trackLine.split(DELIM)[COL_UID]) while lightTime < trackTime and light_idx < len(self.lightData) - 1: # -> minute precision light_idx+=1 lightTime = self.getLightTime(self.lightData[light_idx]) #add light cols moon_real, moon_eco, skyglow = self.parseLightLine(self.lightData[light_idx],uid) trackLine += DELIM + "{:.3f}".format(moon_real) + DELIM + "{:.3f}".format(moon_eco) + DELIM + str(skyglow) self.trackData[track_idx] = trackLine track_idx+=1 def getLightTime(self,line): if not hasLight(): return cols = line.strip().split(DELIM) sTime = cols[0].strip() if "Error" in sTime: return noTime lightTime = dt.strptime(sTime,TIME_FMT) #convert to UTC from local BerlinTZ and respect DST sun 25 october 3AM -> -1 hour = 2AM again winterTime = dt.strptime("2020-10-25 03:00:00",TIME_FMT) hours= ( 2 if lightTime < winterTime else 1 ) lightTime -= datetime.timedelta(hours) return lightTime """ convert timestamp string to datetime """ def getTrackTime(self,line): try: timestamp=int(line[:TIMESTAMP_LEN]) except ValueError as e: print("Caught:",e) print("Presumably binary data in txt file... or some other corruption") return noTime return dt.fromtimestamp(timestamp) def header(self): h= f"# {PROJ.upper()} TRACK DATA\n" \ f"#################################################################################################\n" \ f"# len(Track): {len(self.trackData)}\n" \ f"# startTrack: {self.startTrack}\n" \ f"# endTrack: {self.endTrack}\n" \ f"# len(animal): {len(self.animalData)}\n" if hasLight(): h += f"# len(Light): {len(self.lightData)}\n" \ f"# startLight: {self.startLight}\n" \ f"# endLight: {self.endLight}\n" h+= f"#################################################################################################\n" \ f"#timestamp, milliseconds, date, time, unit, x, y, tagID, species, speciesnumber, weight_without_tag[mg]{', moon_real[mLux], moon_eco[mLux], skyglow[Lux]' if hasLight() else ''}\n" \ f"#################################################################################################\n" return h def write(self): fileName = OUTPUTDIR+f"/{PROJ}-data-{self.startTrack.strftime(OUT_FMT)}-{self.endTrack.strftime(OUT_FMT)}" pp("write to " + fileName) os.makedirs(os.path.dirname(fileName), exist_ok=True) with open(fileName, "w") as f: f.writelines(self.header()) self.trackData=map(lambda x:x+'\n', self.trackData) f.writelines(self.trackData) cmd_compress=f"tar -zcvf {fileName}.tgz {fileName}" # ">/dev/null 2>/dev/null" os.system(cmd_compress) def xtract(): for root, dirs, files in os.walk(INPUTDIR): #walk recursively for file in files: if file.endswith(".tgz"): print(f"xtracting {file}\n") cmd_xtract=f"tar -zxvf {file}" os.system(cmd_xtract) return def getFileList(): animalFile, skyglowFile, trackFiles = '','',[] xtract() print( "Looking for files containing 'unit-X' or named 'skyglow.log' or 'tags'..." ) for root, dirs, files in os.walk(INPUTDIR): #walk recursively for file in files: if "unit" in file: trackFiles.append(file) elif file == "tags": animalFile = INPUTDIR+'/'+file print(f"Found animalFile: {animalFile}") elif file == "skyglow.log": skyglowFile = INPUTDIR+'/'+file print(f"Found logfile: {skyglowFile}") else: pp(f"Ignoring ({file})") if not trackFiles: sys.exit("No tracking data found. (filename containing unit-X, where X is in [0-9])") if hasLight() and not skyglowFile: sys.exit("Couldn't find skyglow.log") if not animalFile: sys.exit(f"Couldn't find tags") return animalFile, skyglowFile, trackFiles def main(): print(" INPUTDIR: ", INPUTDIR) print("OUTPUTDIR: ", OUTPUTDIR) animalFile, skyglowFile, trackFiles = getFileList() data = Data(animalFile,skyglowFile,trackFiles) data.merge() data.write() if __name__ == "__main__" : if len(sys.argv) == 1: this = os.path.basename(__file__) sys.exit(f' {this} <dir> - search for "skyglow.log"/"tags" and trackfiles in dir and write processed file to dir' f'or\n {this} <indir> <outdir> - search files in <indir> and write to <outdir>') elif len(sys.argv) == 2: INPUTDIR = sys.argv[1] OUTPUTDIR = INPUTDIR elif len(sys.argv) >= 3: INPUTDIR = sys.argv[1] OUTPUTDIR = sys.argv[2] start = time.time() main() end = time.time() print(f"Took {end - start} seconds")