Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dataSync.py 12.18 KiB
#!/usr/bin/python3
# clean, sort, merge raw data
# output should be one huge table with all necessary fields for further processing/vizualization
#
############################
# TODO 	     *most important
############################
#	parse moonmap and skyglowdict from skyglow Project sources (submodule ... ?)
#
#
#	include temperature and humidity data?
#	integrate with ST
#		> make Skyglow optional -> TEST
#		> ST-Filenames should include 'unit-X'!
#
# how to process skylog error in merge()
#
# Speed Up! (takes 16minutes for 8mio lines...)
# 	1. Threads 
# 	> see https://realpython.com/python-concurrency/#multiprocessing-version
# 	use threads for 
# 	> reading files (easy)
# 	> merging (intermediate)
#		* parse chunks of trackData. call sort at end
#		* calc indices of data dependent on numThreads
#
#	2. rewrite animalmerge for loops
#	3. Consider using Pandas, NumPy, Cython, Pypy, Koalas, ...
#	4. inline C++/C/Bash
#	5. sql (SQLAlchemy)
#
#	how to profile to find bottlenecks?
#
#	for lists: use append() instead of + / += operators
#
# Experiment Times
####################
# Track start 	 7.7
# Light start 	20.7
# Dark: 10.7 - 20.7 
# Block1: 21.7 - 18.8
# Block2: 15.9 - 13.10
#

import sys, getopt, os, re, datetime, string
from pprint import pprint as pp
from datetime import datetime as dt
import time

PROJ = "ecoTron"
# PROJ = "schrebatron"

TIME_FMT='%Y-%m-%d %H:%M:00'
startTime = dt.strptime('2020-07-07 00:00:00', TIME_FMT)
endTime = dt.strptime('2020-11-30 00:00:00', TIME_FMT)
# endTime = dt.strptime('2020-08-18 00:00:00', TIME_FMT)
noTime = dt.fromtimestamp(0)

TAG_LEN=len("04B94A7F7288588022")

# trackfile
T_DELIM=';'
TIMESTAMP_LEN=10
T_COL_MS=1
T_COL_DATE=2
T_COL_TAG=4
T_MINLEN = 38 # without newline.
T_MAXLEN = 40 # ms column varies 1-3. Will be padded later for uniform len
T_NCOLS = 6

# lightfile
L_DELIM='\t'
L_COL_MOON_REAL=1
L_COL_MOON_DMX=2
L_COL_SKYGLOW=3
L_MINLEN = 61 #2020-07-21 15:15:00 	0.0	0	[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
L_MAXLEN = 76 #2020-07-21 15:09:00 	0.0	0	[15, 54, 4, 19, 0, 1, 5, 0, 40, 12, 123, 123]
L_NCOLS = 4

# animalfile
A_DELIM='\t'
A_MINLEN = 25 #guess
A_MAXLEN = 30
A_NCOLS = 4
A_COL_SPECIES=0
A_COL_SPECIES_IND=1
A_COL_WEIGHT_WO_TAG_MG=2
A_COL_TAG=3

# out file
OUT_FMT='%Y%m%d'
DELIM=','
COL_UID=4
COL_TAG=7

#mlx (idx is also dmx-byte value)
moonMap = [ 1.409,
		 8.369,
		12.333,
		16.210,
		20.563,
		24.716,
		28.907,
		33.363,
		36.843,
		41.137,
		46.527,
		50.880,
		54.727,
		59.617,
		63.967,
		67.817,
		73.173,
		77.213,
		82.827,
		87.037,
		91.033,
		95.320,
		99.470,
		104.366,
		108.833,
		113.667, #25
		123.733,
		128.567,
		133.2,
		138.3,
		143.367,
		148.233,
		152.233,
		156.933,
		162.5,
		167.5,
		172.1,
		176.767,
		181.567,
		187,
		192.067,
		196.433,
		201.467, #42 > max-moon is 201.9mlx
		206.5,
		211.733,
		216.4,
		221.033,
		226,
		231.1,
		235.733,
		240.4, #50
		# 274 # > new max!
	]

#Mapping skyglow dmx+nled to lux
skyglowDict = { 
			 15: 0.08,
			 54: 0.3,
			  4: 1,
			 19: 0.1,
			  0: 0,
			  1: 0.01,
			  5: 0.03,
			  0: 0,
			 40: 10,
			 12: 3,
			123: 30,
			123: 30,
}

def hasLight():
	return PROJ.lower() == "ecotron"

def getUID(fileName):
	""" extract and return unit-id from fileName """
	f=fileName.split('-')
	idx=f.index("unit")+1
	uid=int(f[int(idx)])
	return 1 if uid == 13 else uid

class Data:

	def __init__(self,animalFile, skyglowFile, trackFiles):

		self.trackData = []
		for file in trackFiles:
			self.trackData.extend( self.getTrackData(file) )

		self.trackData.sort()
		self.startTrack = self.getTrackTime( self.trackData[0] )
		self.endTrack = self.getTrackTime( self.trackData[-1] )

		self.animalData = self.getAnimalData(animalFile)

		if hasLight():
			self.lightData = self.getLightData(skyglowFile)
			self.startLight = self.getLightTime( self.lightData[0] )
			self.endLight = self.getLightTime( self.lightData[-1] )

		pp(self.header())

	def getTrackData(self, fileName):
		""" get lines from tracklog and add unit number as column"""

		pp(f"processing: {fileName}")
		with open(INPUTDIR+'/'+fileName) as f:
			lines = f.readlines()
			lines = self.clean(lines, T_MINLEN, T_MAXLEN, T_NCOLS, T_DELIM, self.getTrackTime)

			if not lines:
				pp("-> empty!")
				return []

		uid=getUID(fileName)
		idx_after_ms = TIMESTAMP_LEN + len('999') + len(DELIM)
		for idx, line in enumerate(lines):

			# pad ms column with zeros to fit three digits (ecotron)
			ms_digits=len(line.split(DELIM)[T_COL_MS])
			if ms_digits < 3:
				line = line[:TIMESTAMP_LEN+1] + '0'*(3-ms_digits) + line[TIMESTAMP_LEN+1:]

			trackTime = self.getTrackTime(line)
			date = trackTime.strftime('%Y-%m-%d')
			time = trackTime.strftime('%H:%M')

			# add date,time,uid columns
			line = line[:idx_after_ms] + DELIM + date + DELIM + time + DELIM + str(uid) + line[idx_after_ms:]

			# remove signalstrength column
			line = line[:line.rindex(DELIM)]

			lines[idx] = line

		assert (lines)
		return lines

	def getLightData(self,file):
		
		with open(file) as f:
			lines = f.readlines()
		
			lines = self.clean(lines, L_MINLEN, L_MAXLEN, L_NCOLS, L_DELIM, self.getLightTime)
			pattern = re.compile(r"\s\t") # skylog used to have an xtra space (changed 200915) TODO remove
			lines = [re.sub(pattern, "\t", x) for x in lines ]

		assert (lines)
		return lines

	def getAnimalData(self,file):
		
		with open(file) as f:
			lines = f.readlines()

		lines = self.clean(lines, A_MINLEN, A_MAXLEN, A_NCOLS, A_DELIM)
		
		#TODO: check for duplicate tags and print warning / ignore?

		assert (lines)
		return lines

	def clean(self, lines, minLen, maxLen, nCols, sep, timeFunc=None):
		""" remove trailing newline, empty and comments and remove lines before startTime or after endTime"""

		lines = [l.strip() for l in lines]
		lines = [l for l in lines if minLen <= len(l) <= maxLen] # EcoTron: ms are variable in length (0-999), SchrebaTron: ms is padded with zeros
		lines = [l for l in lines if len(l.split(sep)) == nCols]
		lines = [l.replace(sep,DELIM) for l in lines]
		lines = list(filter(lambda q: q and q[0] != '#', lines))
		if timeFunc:
			lines = list(filter(lambda q: startTime <= timeFunc(q) < endTime, lines)) #only between start and endTime

		return lines

	def parseLightLine(self, line, uid):
		if not hasLight():
			return

		cols=line.split(DELIM)
		moon_real = float(cols[L_COL_MOON_REAL])*1000 #convert to mLux
		moon_dmx = int(cols[L_COL_MOON_DMX])
		moon_eco = moonMap[moon_dmx]

		#int list from string format like '[1, 2, 3, 4, 5]'
		skyglowList = cols[L_COL_SKYGLOW:]
		skyglowList[0] = skyglowList[0][1:] 	#remove '['
		skyglowList[-1] = skyglowList[-1][:-1]	#remove ']'
		skyglow = skyglowDict.get(int(skyglowList[uid-1]),'')

		return (moon_real, moon_eco, skyglow)

	def merge(self):
		
		self.merge_animals()
		if hasLight():
			self.merge_light()

	def merge_animals(self):
		""" merge AnimalData into TrackData on TagID """
		pp("merging animal data into track data (this might take a while)")

		#TODO SPEED UP!!! 8Mio lines take 16Minutes... 32mins

		# ttags_not_found=[]
		dflt=f'{DELIM}X{DELIM}X00{DELIM}0'
		notFound=0
		for idx, tLine in enumerate(self.trackData):
			found=False
			ttag = tLine.split(DELIM)[COL_TAG]
			
			for aLine in self.animalData:
				# print(tLine)
				# print(aLine)
				# print(atag)
				aLine = aLine.split(DELIM)
				atag = aLine[A_COL_TAG]

				if ttag == atag:
					found=True
					aLine = DELIM + aLine[A_COL_SPECIES_IND] + DELIM + aLine[A_COL_SPECIES] + DELIM + aLine[A_COL_WEIGHT_WO_TAG_MG]
					self.trackData[idx] += aLine
					break;

			if not found:
				self.trackData[idx] += dflt
				notFound+=1
				# ttags_not_found.append(ttag)

		pp(f"Didn't find Tags for {notFound} tracking events!")

	def merge_light(self):
		"""merge LightData into TrackData on Time"""
		pp("merging light data into track data")

		track_idx=0
		light_idx=0
		lightTime = self.startLight
		trackTime = self.startTrack
		
		while( trackTime < lightTime ):
			self.trackData[track_idx] += f'{DELIM}0{DELIM}0{DELIM}0'
			track_idx+=1
			if( track_idx >= len(self.trackData) ):
				pp("skyglow.log starts after track.log...")
				return
			trackTime = self.getTrackTime(self.trackData[track_idx])

		pp(f"start merge @ {trackTime} -- idx {track_idx}")

		for trackLine in self.trackData[track_idx:]:
			trackTime = self.getTrackTime(trackLine)
			
			uid = int(trackLine.split(DELIM)[COL_UID])

			while lightTime < trackTime and light_idx < len(self.lightData) - 1: # -> minute precision
				light_idx+=1
				lightTime = self.getLightTime(self.lightData[light_idx])

			#add light cols
			moon_real, moon_eco, skyglow = self.parseLightLine(self.lightData[light_idx],uid)
			trackLine += DELIM + "{:.3f}".format(moon_real) + DELIM + "{:.3f}".format(moon_eco) + DELIM + str(skyglow)

			self.trackData[track_idx] = trackLine
			track_idx+=1

	def getLightTime(self,line):
		if not hasLight():
			return

		cols = line.strip().split(DELIM)
		sTime = cols[0].strip()
		if "Error" in sTime:
			return noTime
		
		lightTime = dt.strptime(sTime,TIME_FMT)

		#convert to UTC from local BerlinTZ and respect DST sun 25 october 3AM -> -1 hour = 2AM again
		winterTime = dt.strptime("2020-10-25 03:00:00",TIME_FMT)
		hours= ( 2 if lightTime < winterTime else 1 )

		lightTime -= datetime.timedelta(hours)
		return lightTime


	""" convert timestamp string to datetime """
	def getTrackTime(self,line):
		try:
			timestamp=int(line[:TIMESTAMP_LEN])
		except ValueError as e:
			print("Caught:",e)
			print("Presumably binary data in txt file... or some other corruption")
			return noTime

		return dt.fromtimestamp(timestamp)

	def header(self):
		h= f"# {PROJ.upper()} TRACK DATA\n" \
		f"#################################################################################################\n" \
		f"#     len(Track): {len(self.trackData)}\n" \
		f"#     startTrack: {self.startTrack}\n" \
		f"#       endTrack: {self.endTrack}\n" \
		f"#    len(animal): {len(self.animalData)}\n"
		if hasLight():
			h += 	f"#     len(Light): {len(self.lightData)}\n" \
					f"#     startLight: {self.startLight}\n" \
					f"#       endLight: {self.endLight}\n"
		h+= f"#################################################################################################\n" \
			f"#timestamp, milliseconds, date, time, unit, x, y, tagID, species, speciesnumber, weight_without_tag[mg]{', moon_real[mLux], moon_eco[mLux], skyglow[Lux]' if hasLight() else ''}\n" \
			f"#################################################################################################\n"
		return  h

	def write(self):

		fileName = OUTPUTDIR+f"/{PROJ}-data-{self.startTrack.strftime(OUT_FMT)}-{self.endTrack.strftime(OUT_FMT)}"
		pp("write to " + fileName)
		
		os.makedirs(os.path.dirname(fileName), exist_ok=True)
		with open(fileName, "w") as f:
			f.writelines(self.header())
			self.trackData=map(lambda x:x+'\n', self.trackData)
			f.writelines(self.trackData)

def getFileList():
	animalFile, skyglowFile, trackFiles = '','',[]
	print( "Looking for files containing 'unit-X' or named 'skyglow.log' or 'tags'..." )
	for root, dirs, files in os.walk(INPUTDIR): #walk recursively
		for file in files:
			if "unit" in file:
				trackFiles.append(file)
			elif file == "tags":
				animalFile = INPUTDIR+'/'+file
				print(f"Found animalFile: {animalFile}")
			elif file == "skyglow.log":
				skyglowFile = INPUTDIR+'/'+file
				print(f"Found logfile: {skyglowFile}")
			else: 
				pp(f"Ignoring ({file})")

	if not trackFiles:
		sys.exit("No tracking data found. (filename containing unit-X, where X is in [0-9])")
	if hasLight() and not skyglowFile:
		sys.exit("Couldn't find skyglow.log")
	if not animalFile:
		sys.exit(f"Couldn't find tags")

	return animalFile, skyglowFile, trackFiles

def main():

	print(" INPUTDIR: ", INPUTDIR)
	print("OUTPUTDIR: ", OUTPUTDIR)

	animalFile, skyglowFile, trackFiles = getFileList()
	data = Data(animalFile,skyglowFile,trackFiles)
	data.merge()
	data.write()

if __name__ == "__main__" :
	if len(sys.argv) == 1:
		this = os.path.basename(__file__)
		sys.exit(f'   {this} <dir> - search for "skyglow.log"/"tags" and trackfiles in dir and write processed file to dir'
		f'or\n   {this} <indir> <outdir> - search files in <indir> and write to <outdir>')
	elif len(sys.argv) == 2:
		INPUTDIR = sys.argv[1]
		OUTPUTDIR = INPUTDIR
	elif len(sys.argv) >= 3:
		INPUTDIR = sys.argv[1]
		OUTPUTDIR = sys.argv[2]

	start = time.time()
	main()
	end = time.time()
	print(f"Took {end - start} seconds")