#!/usr/bin/python
# encoding: utf-8
#
# EitSupport
# Copyright (C) 2011 betonme
# Copyright (C) 2016 Wolfgang Fahl
#
# This EITParser is based on:
# https://github.com/betonme/e2openplugin-EnhancedMovieCenter/blob/master/src/EitSupport.py
#
# In case of reuse of this source code please do not remove this copyright.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# For more information on the GNU General Public License see:
# <http://www.gnu.org/licenses/>.
#
import pathlib
import os
import re
import struct
import sys
import time
import chardet
from datetime import datetime
#from Components.config import config
#from Components.Language import language
#from EMCTasker import emcDebugOut
#from IsoFileSupport import IsoSupport
#from MetaSupport import getInfoFile
[docs]def parseMJD(MJD):
# Parse 16 bit unsigned int containing Modified Julian Date,
# as per DVB-SI spec
# returning year,month,day
YY = int( (MJD - 15078.2) / 365.25 )
MM = int( (MJD - 14956.1 - int(YY*365.25) ) / 30.6001 )
D = MJD - 14956 - int(YY*365.25) - int(MM * 30.6001)
K=0
if MM == 14 or MM == 15: K=1
return (1900 + YY+K), (MM-1-K*12), D
[docs]def unBCD(byte):
return (byte>>4)*10 + (byte & 0xf)
#from Tools.ISO639 import LanguageCodes
# -*- coding: iso-8859-2 -*-
LanguageCodes = { }
LanguageCodes["deu"] = LanguageCodes["ger"] = LanguageCodes["de"] = ("German", "Germanic")
LanguageCodes["fra"] = LanguageCodes["fre"] = LanguageCodes["fr"] = ("French", "Romance")
[docs]def language_iso639_2to3(alpha2):
ret = alpha2
if alpha2 in LanguageCodes:
language = LanguageCodes[alpha2]
for alpha, name in LanguageCodes.items():
if name == language:
if len(alpha) == 3:
return alpha
return ret
[docs]def bord(b):
'''
binary ord - just for code compatibility
'''
return b
[docs]class Bytes(object):
def __init__(self):
self.bytes=bytearray()
[docs] def append(self,b):
self.bytes.append(b)
[docs] def toString(self):
text=bytes(self.bytes).decode()
return text
[docs] @staticmethod
def join(blist):
'''
join the list of bytes
'''
br=Bytes()
for b in blist:
br.bytes.extend(b.bytes)
return br
[docs] def strip(self):
'''
strip me
'''
# https://stackoverflow.com/questions/9560759/python-3-how-to-make-strip-work-for-bytes
self.bytes=self.bytes.strip()
return self
[docs]class Event(object):
def __init__(self,eitList,name):
self.eitList=eitList
self.name=name
self.description=Bytes()
self.descriptor = []
self.descriptor_multi = []
self.codepage = None
[docs] def log(self,msg):
self.eitList.log(msg)
[docs] @staticmethod
def readLanguageCode(data,ofs):
'''
read the language code from the given offset in the data
'''
languageCode=Bytes()
for i in range (ofs,ofs+3):
languageCode.append(data[i])
languageCode = languageCode.toString().upper()
[docs] def readDescription(self,data,ofsStart,ofsEnd=None):
'''
read my description from the given offset
'''
if ofsEnd is None:
self.event_name_length = bord(data[ofsStart])
ofsStart=ofsStart+1
ofsEnd=ofsStart+1+self.event_name_length
for i in range (ofsStart,ofsEnd):
try:
if str(bord(data[i]))=="10" or int(str(bord(data[i])))>31:
self.description.append(data[i])
except IndexError as e:
self.log("[META] Exception in readEitFile: " + str(e))
[docs] def appendDescription(self,lang, ISO_639_language_code,prev1_ISO_639_language_code,delim="\n\n"):
if ISO_639_language_code == lang:
self.descriptor.append(self.description)
if (ISO_639_language_code == prev1_ISO_639_language_code) or (prev1_ISO_639_language_code == "x"):
self.descriptor_multi.append(self.description)
else:
self.descriptor_multi.append(delim+ self.description)
[docs] def joinDescriptor(self):
if self.descriptor:
self.descriptor = Bytes.join(self.descriptor)
else:
self.descriptor = Bytes.join(self.descriptor_multi).strip()
[docs] def fixEncoding(self):
if self.descriptor:
try:
# get back the raw bytes
self.descriptor=bytes(self.descriptor.bytes)
if self.codepage:
if self.codepage != 'utf-8':
self.descriptor = self.descriptor.decode(self.codepage)
else:
self.descriptor=self.descriptor.decode('utf-8')
else:
encdata = chardet.detect(self.descriptor)
enc = encdata['encoding'].lower()
confidence = str(encdata['confidence'])
self.log("[META] Detected %s event encoding-type: %s ( %s )" % (self.name,enc,confidence))
if enc == "utf-8":
self.descriptor.decode(enc)
else:
self.descriptor = self.descriptor.decode(enc)
except (UnicodeDecodeError, AttributeError) as e:
self.log("[META] Exception in readEitFile: " + str(e))
return self.descriptor
[docs] def readCodepage(self,data,ofs):
if self.codepage:
return
try:
byte1 = str(bord(data[ofs]))
except:
byte1 = ''
if byte1=="1": self.codepage = 'iso-8859-5'
elif byte1=="2": self.codepage = 'iso-8859-6'
elif byte1=="3": self.codepage = 'iso-8859-7'
elif byte1=="4": self.codepage = 'iso-8859-8'
elif byte1=="5": self.codepage = 'iso-8859-9'
elif byte1=="6": self.codepage = 'iso-8859-10'
elif byte1=="7": self.codepage = 'iso-8859-11'
elif byte1=="9": self.codepage = 'iso-8859-13'
elif byte1=="10": self.codepage = 'iso-8859-14'
elif byte1=="11": self.codepage = 'iso-8859-15'
elif byte1=="21": self.codepage = 'utf-8'
if self.codepage:
self.log("[META] Found %s encoding-type: %s" % (self.name,self.codepage))
# Eit File support class
# Description
# http://de.wikipedia.org/wiki/Event_Information_Table
[docs]class EitList():
EIT_SHORT_EVENT_DESCRIPTOR = 0x4d
EIT_EXTENDED_EVENT_DESCRIPOR = 0x4e
def __init__(self, path=None,debug=False):
self.eit_file = None
self.eit_mtime = 0
self.debug=debug
#TODO
# The dictionary implementation could be very slow
self.eit = {}
self.iso = None
self.__newPath(path)
self.__readEitFile()
[docs] def log(self,msg):
if self.debug:
print(msg,file=sys.stderr)
[docs] @staticmethod
def readeit(eitroot,debug=False):
if os.path.isdir(eitroot):
for p in pathlib.Path(eitroot).iterdir():
if p.is_file():
if p.name.endswith(".eit"):
EitList.readeitFile(p,debug)
elif os.path.isfile(eitroot):
EitList.readeitFile(eitroot,debug)
[docs] @staticmethod
def readeitFile(eitfile,debug=False):
eitlist=EitList(eitfile,debug=debug)
print(eitlist.getEitName());
print(eitlist.getEitStartDate());
print(eitlist.getEitDescription());
def __newPath(self, path):
name = None
if path:
#TODO Too slow
#if path.endswith(".iso"):
# if not self.iso:
# self.iso = IsoSupport(path)
# name = self.iso and self.iso.getIsoName()
# if name and len(name):
# path = "/home/root/dvd-" + name
#el
exts = [".eit"]
#fpath = getInfoFile(path, exts)[1]
#path = os.path.splitext(fpath)[0]
#if not os.path.exists(path + ".eit"):
# # Strip existing cut number
# if path[-4:-3] == "_" and path[-3:].isdigit():
# path = path[:-4]
#path += ".eit"
if self.eit_file != path:
self.eit_file = path
self.eit_mtime = 0
def __mk_int(self, s):
return int(s) if s else 0
def __toDate(self, d, t):
if d and t:
#TODO Is there another fast and safe way to get the datetime
try:
return datetime(int(d[0]), int(d[1]), int(d[2]), int(t[0]), int(t[1]))
except ValueError:
return None
else:
return None
##############################################################################
## Get Functions
[docs] def getEitsid(self):
return self.eit.get('service', "") #TODO
[docs] def getEitTsId(self):
return self.eit.get('transportstream', "") #TODO
[docs] def getEitWhen(self):
return self.eit.get('when', "")
[docs] def getEitStartDate(self):
return self.eit.get('startdate', "")
[docs] def getEitStartTime(self):
return self.eit.get('starttime', "")
[docs] def getEitDuration(self):
return self.eit.get('duration', "")
[docs] def getEitName(self):
return self.eit.get('name', "").strip()
[docs] def getEitDescription(self):
return self.eit.get('description', "").strip()
[docs] def getEitShortDescription(self):
return self.eit.get('short_description', "").strip()
[docs] def getEitExtendedDescription(self):
return self.getEitDescription()
[docs] def getEitLengthInSeconds(self):
length = self.eit.get('duration', "")
#TODO Is there another fast and safe way to get the length
if len(length)>2:
return self.__mk_int((length[0]*60 + length[1])*60 + length[2])
elif len(length)>1:
return self.__mk_int(length[0]*60 + length[1])
else:
return self.__mk_int(length)
[docs] def getEitDate(self):
return self.__toDate(self.getEitStartDate(), self.getEitStartTime())
##############################################################################
## File IO Functions
def __readEitFile(self,lang='de'):
data = ""
path = self.eit_file
lang = (language_iso639_2to3(lang)).upper()
if path and os.path.exists(path):
mtime = os.path.getmtime(path)
if self.eit_mtime == mtime:
# File has not changed
pass
else:
#print "EMC TEST count Eit " + str(path)
# New path or file has changed
self.eit_mtime = mtime
# Read data from file
# OE1.6 with Pyton 2.6
#with open(self.eit_file, 'r') as file: lines = file.readlines()
f = None
try:
f = open(path, 'rb')
#lines = f.readlines()
data = f.read()
except Exception as e:
self.log("[META] Exception in readEitFile: " + str(e))
finally:
if f is not None:
f.close()
# Parse the data
if data and 12 <= len(data):
# go through events
pos = 0
e = struct.unpack(">HHBBBBBBH", data[pos:pos+12])
event_id = e[0]
date = parseMJD(e[1]) # Y, M, D
time = unBCD(e[2]), unBCD(e[3]), unBCD(e[4]) # HH, MM, SS
duration = unBCD(e[5]), unBCD(e[6]), unBCD(e[7]) # HH, MM, SS
running_status = (e[8] & 0xe000) >> 13
free_CA_mode = e[8] & 0x1000
descriptors_len = e[8] & 0x0fff
if running_status in [1,2]:
self.eit['when'] = "NEXT"
elif running_status in [3,4]:
self.eit['when'] = "NOW"
self.eit['startdate'] = date
self.eit['starttime'] = time
self.eit['duration'] = duration
pos = pos + 12
name_event=Event(self,"name")
short_event=Event(self,"short")
extended_event=Event(self,"extended")
component_descriptor = []
content_descriptor = []
linkage_descriptor = []
parental_rating_descriptor = []
endpos = len(data) - 1
prev1_ISO_639_language_code = "x"
prev2_ISO_639_language_code = "x"
while pos < endpos:
rec = bord(data[pos])
if pos+1>=endpos:
break
length = bord(data[pos+1]) + 2
#if pos+length>=endpos:
# break
if rec == 0x4D:
descriptor_tag = bord(data[pos+1])
descriptor_length = bord(data[pos+2])
ISO_639_language_code = str(data[pos+2:pos+5]).upper()
name_event.readDescription(data,pos+5)
name_event.readCodepage(data,pos+6)
short_event.readCodepage(data, pos+7+name_event.event_name_length)
short_event.readDescription(data, pos+7+name_event.event_name_length,pos+length)
short_event.appendDescription(lang, ISO_639_language_code,prev1_ISO_639_language_code)
name_event.appendDescription(lang, ISO_639_language_code,prev1_ISO_639_language_code," ")
prev1_ISO_639_language_code = ISO_639_language_code
elif rec == 0x4E:
ISO_639_language_code = Event.readLanguageCode(data,pos+3)
extended_event.readCodepage(data, pos+8)
extended_event.readDescription(data, pos+8,pos+length)
extended_event.appendDescription(lang, ISO_639_language_code, prev2_ISO_639_language_code)
prev2_ISO_639_language_code = ISO_639_language_code
elif rec == 0x50:
component_descriptor.append(data[pos+8:pos+length])
elif rec == 0x54:
content_descriptor.append(data[pos+8:pos+length])
elif rec == 0x4A:
linkage_descriptor.append(data[pos+8:pos+length])
elif rec == 0x55:
parental_rating_descriptor.append(data[pos+2:pos+length])
else:
# print "unsupported descriptor: %x %x" %(rec, pos + 12)
# print data[pos:pos+length]
pass
pos += length
name_event.joinDescriptor()
short_event.joinDescriptor()
extended_event.joinDescriptor()
if not(extended_event.descriptor):
extended_event.descriptor = short_event.descriptor
extended_event.codepage = short_event.codepage
self.eit['name'] = name_event.fixEncoding()
self.eit['short_description'] = short_event.fixEncoding()
# This will fix EIT data of RTL group with missing line breaks in extended event description
description=extended_event.fixEncoding()
if description:
description = re.sub('((?:Moderat(?:ion:|or(?:in){0,1})|Vorsitz: |Jur(?:isten|y): |G(?:\xC3\xA4|a)st(?:e){0,1}: |Mit (?:Staatsanwalt|Richter(?:in){0,1}|den Schadenregulierern) |Julia Leisch).*?[a-z]+)(\'{0,1}[0-9A-Z\'])', r'\1\n\n\2', description)
self.eit['description'] = description
else:
# No date clear all
self.eit = {}
else:
# No path or no file clear all
self.eit = {}
"""Module docstring.
Read Eit File and show the information.
"""
import getopt
[docs]def main():
# parse command line options
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help"])
except getopt.error as msg:
print(msg)
print("for help use --help")
sys.exit(2)
# process options
for o, a in opts:
if o in ("-h", "--help"):
print(__doc__)
sys.exit(0)
# process arguments
for arg in args:
EitList.readeit(arg) # process() is defined elsewhere
if __name__ == "__main__":
main()