import codecs import re import types import sys from constants import EOF, spaceCharacters, asciiLetters, asciiUppercase from constants import encodings, ReparseException import utils #Non-unicode versions of constants for use in the pre-parser spaceCharactersBytes = frozenset([str(item) for item in spaceCharacters]) asciiLettersBytes = frozenset([str(item) for item in asciiLetters]) asciiUppercaseBytes = frozenset([str(item) for item in asciiUppercase]) spacesAngleBrackets = spaceCharactersBytes | frozenset([">", "<"]) invalid_unicode_re = re.compile(u"[\u0001-\u0008\u000B\u000E-\u001F\u007F-\u009F\uD800-\uDFFF\uFDD0-\uFDEF\uFFFE\uFFFF\U0001FFFE\U0001FFFF\U0002FFFE\U0002FFFF\U0003FFFE\U0003FFFF\U0004FFFE\U0004FFFF\U0005FFFE\U0005FFFF\U0006FFFE\U0006FFFF\U0007FFFE\U0007FFFF\U0008FFFE\U0008FFFF\U0009FFFE\U0009FFFF\U000AFFFE\U000AFFFF\U000BFFFE\U000BFFFF\U000CFFFE\U000CFFFF\U000DFFFE\U000DFFFF\U000EFFFE\U000EFFFF\U000FFFFE\U000FFFFF\U0010FFFE\U0010FFFF]") non_bmp_invalid_codepoints = set([0x1FFFE, 0x1FFFF, 0x2FFFE, 0x2FFFF, 0x3FFFE, 0x3FFFF, 0x4FFFE, 0x4FFFF, 0x5FFFE, 0x5FFFF, 0x6FFFE, 0x6FFFF, 0x7FFFE, 0x7FFFF, 0x8FFFE, 0x8FFFF, 0x9FFFE, 0x9FFFF, 0xAFFFE, 0xAFFFF, 0xBFFFE, 0xBFFFF, 0xCFFFE, 0xCFFFF, 0xDFFFE, 0xDFFFF, 0xEFFFE, 0xEFFFF, 0xFFFFE, 0xFFFFF, 0x10FFFE, 0x10FFFF]) ascii_punctuation_re = re.compile(ur"[\u0009-\u000D\u0020-\u002F\u003A-\u0040\u005B-\u0060\u007B-\u007E]") # Cache for charsUntil() charsUntilRegEx = {} class BufferedStream: """Buffering for streams that do not have buffering of their own The buffer is implemented as a list of chunks on the assumption that joining many strings will be slow since it is O(n**2) """ def __init__(self, stream): self.stream = stream self.buffer = [] self.position = [-1,0] #chunk number, offset def tell(self): pos = 0 for chunk in self.buffer[:self.position[0]]: pos += len(chunk) pos += self.position[1] return pos def seek(self, pos): assert pos < self._bufferedBytes() offset = pos i = 0 while len(self.buffer[i]) < offset: offset -= pos i += 1 self.position = [i, offset] def read(self, bytes): if not self.buffer: return self._readStream(bytes) elif (self.position[0] == len(self.buffer) and self.position[1] == len(self.buffer[-1])): return self._readStream(bytes) else: return self._readFromBuffer(bytes) def _bufferedBytes(self): return sum([len(item) for item in self.buffer]) def _readStream(self, bytes): data = self.stream.read(bytes) self.buffer.append(data) self.position[0] += 1 self.position[1] = len(data) return data def _readFromBuffer(self, bytes): remainingBytes = bytes rv = [] bufferIndex = self.position[0] bufferOffset = self.position[1] while bufferIndex < len(self.buffer) and remainingBytes != 0: assert remainingBytes > 0 bufferedData = self.buffer[bufferIndex] if remainingBytes <= len(bufferedData) - bufferOffset: bytesToRead = remainingBytes self.position = [bufferIndex, bufferOffset + bytesToRead] else: bytesToRead = len(bufferedData) - bufferOffset self.position = [bufferIndex, len(bufferedData)] bufferIndex += 1 data = rv.append(bufferedData[bufferOffset: bufferOffset + bytesToRead]) remainingBytes -= bytesToRead bufferOffset = 0 if remainingBytes: rv.append(self._readStream(remainingBytes)) return "".join(rv) class HTMLInputStream: """Provides a unicode stream of characters to the HTMLTokenizer. This class takes care of character encoding and removing or replacing incorrect byte-sequences and also provides column and line tracking. """ _defaultChunkSize = 10240 def __init__(self, source, encoding=None, parseMeta=True, chardet=True): """Initialises the HTMLInputStream. HTMLInputStream(source, [encoding]) -> Normalized stream from source for use by html5lib. source can be either a file-object, local filename or a string. The optional encoding parameter must be a string that indicates the encoding. If specified, that encoding will be used, regardless of any BOM or later declaration (such as in a meta element) parseMeta - Look for a element containing encoding information """ #Craziness if len(u"\U0010FFFF") == 1: self.reportCharacterErrors = self.characterErrorsUCS4 self.replaceCharactersRegexp = re.compile(u"[\uD800-\uDFFF]") else: self.reportCharacterErrors = self.characterErrorsUCS2 self.replaceCharactersRegexp = re.compile(u"([\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?= self.chunkSize: if not self.readChunk(): return EOF chunkOffset = self.chunkOffset char = self.chunk[chunkOffset] self.chunkOffset = chunkOffset + 1 return char def readChunk(self, chunkSize=None): if chunkSize is None: chunkSize = self._defaultChunkSize self.prevNumLines, self.prevNumCols = self._position(self.chunkSize) self.chunk = u"" self.chunkSize = 0 self.chunkOffset = 0 data = self.dataStream.read(chunkSize) #Deal with CR LF and surrogates broken across chunks if self._bufferedCharacter: data = self._bufferedCharacter + data self._bufferedCharacter = None elif not data: # We have no more data, bye-bye stream return False if len(data) > 1: lastv = ord(data[-1]) if lastv == 0x0D or 0xD800 <= lastv <= 0xDBFF: self._bufferedCharacter = data[-1] data = data[:-1] self.reportCharacterErrors(data) # Replace invalid characters # Note U+0000 is dealt with in the tokenizer data = self.replaceCharactersRegexp.sub(u"\ufffd", data) data = data.replace(u"\r\n", u"\n") data = data.replace(u"\r", u"\n") self.chunk = data self.chunkSize = len(data) return True def characterErrorsUCS4(self, data): for i in xrange(len(invalid_unicode_re.findall(data))): self.errors.append("invalid-codepoint") def characterErrorsUCS2(self, data): #Someone picked the wrong compile option #You lose skip = False import sys for match in invalid_unicode_re.finditer(data): if skip: continue codepoint = ord(match.group()) pos = match.start() #Pretty sure there should be endianness issues here if utils.isSurrogatePair(data[pos:pos+2]): #We have a surrogate pair! char_val = utils.surrogatePairToCodepoint(data[pos:pos+2]) if char_val in non_bmp_invalid_codepoints: self.errors.append("invalid-codepoint") skip = True elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and pos == len(data) - 1): self.errors.append("invalid-codepoint") else: skip = False self.errors.append("invalid-codepoint") def charsUntil(self, characters, opposite = False): """ Returns a string of characters from the stream up to but not including any character in 'characters' or EOF. 'characters' must be a container that supports the 'in' method and iteration over its characters. """ # Use a cache of regexps to find the required characters try: chars = charsUntilRegEx[(characters, opposite)] except KeyError: if __debug__: for c in characters: assert(ord(c) < 128) regex = u"".join([u"\\x%02x" % ord(c) for c in characters]) if not opposite: regex = u"^%s" % regex chars = charsUntilRegEx[(characters, opposite)] = re.compile(u"[%s]+" % regex) rv = [] while True: # Find the longest matching prefix m = chars.match(self.chunk, self.chunkOffset) if m is None: # If nothing matched, and it wasn't because we ran out of chunk, # then stop if self.chunkOffset != self.chunkSize: break else: end = m.end() # If not the whole chunk matched, return everything # up to the part that didn't match if end != self.chunkSize: rv.append(self.chunk[self.chunkOffset:end]) self.chunkOffset = end break # If the whole remainder of the chunk matched, # use it all and read the next chunk rv.append(self.chunk[self.chunkOffset:]) if not self.readChunk(): # Reached EOF break r = u"".join(rv) return r def unget(self, char): # Only one character is allowed to be ungotten at once - it must # be consumed again before any further call to unget if char is not None: if self.chunkOffset == 0: # unget is called quite rarely, so it's a good idea to do # more work here if it saves a bit of work in the frequently # called char and charsUntil. # So, just prepend the ungotten character onto the current # chunk: self.chunk = char + self.chunk self.chunkSize += 1 else: self.chunkOffset -= 1 assert self.chunk[self.chunkOffset] == char class EncodingBytes(str): """String-like object with an associated position and various extra methods If the position is ever greater than the string length then an exception is raised""" def __new__(self, value): return str.__new__(self, value.lower()) def __init__(self, value): self._position=-1 def __iter__(self): return self def next(self): p = self._position = self._position + 1 if p >= len(self): raise StopIteration elif p < 0: raise TypeError return self[p] def previous(self): p = self._position if p >= len(self): raise StopIteration elif p < 0: raise TypeError self._position = p = p - 1 return self[p] def setPosition(self, position): if self._position >= len(self): raise StopIteration self._position = position def getPosition(self): if self._position >= len(self): raise StopIteration if self._position >= 0: return self._position else: return None position = property(getPosition, setPosition) def getCurrentByte(self): return self[self.position] currentByte = property(getCurrentByte) def skip(self, chars=spaceCharactersBytes): """Skip past a list of characters""" p = self.position # use property for the error-checking while p < len(self): c = self[p] if c not in chars: self._position = p return c p += 1 self._position = p return None def skipUntil(self, chars): p = self.position while p < len(self): c = self[p] if c in chars: self._position = p return c p += 1 self._position = p return None def matchBytes(self, bytes): """Look for a sequence of bytes at the start of a string. If the bytes are found return True and advance the position to the byte after the match. Otherwise return False and leave the position alone""" p = self.position data = self[p:p+len(bytes)] rv = data.startswith(bytes) if rv: self.position += len(bytes) return rv def jumpTo(self, bytes): """Look for the next sequence of bytes matching a given sequence. If a match is found advance the position to the last byte of the match""" newPosition = self[self.position:].find(bytes) if newPosition > -1: # XXX: This is ugly, but I can't see a nicer way to fix this. if self._position == -1: self._position = 0 self._position += (newPosition + len(bytes)-1) return True else: raise StopIteration class EncodingParser(object): """Mini parser for detecting character encoding from meta elements""" def __init__(self, data): """string - the data to work on for encoding detection""" self.data = EncodingBytes(data) self.encoding = None def getEncoding(self): methodDispatch = ( ("") def handleMeta(self): if self.data.currentByte not in spaceCharactersBytes: #if we have ") def getAttribute(self): """Return a name,value pair for the next attribute in the stream, if one is found, or None""" data = self.data # Step 1 (skip chars) c = data.skip(spaceCharactersBytes | frozenset("/")) # Step 2 if c in (">", None): return None # Step 3 attrName = [] attrValue = [] #Step 4 attribute name while True: if c == "=" and attrName: break elif c in spaceCharactersBytes: #Step 6! c = data.skip() c = data.next() break elif c in ("/", ">"): return "".join(attrName), "" elif c in asciiUppercaseBytes: attrName.append(c.lower()) elif c == None: return None else: attrName.append(c) #Step 5 c = data.next() #Step 7 if c != "=": data.previous() return "".join(attrName), "" #Step 8 data.next() #Step 9 c = data.skip() #Step 10 if c in ("'", '"'): #10.1 quoteChar = c while True: #10.2 c = data.next() #10.3 if c == quoteChar: data.next() return "".join(attrName), "".join(attrValue) #10.4 elif c in asciiUppercaseBytes: attrValue.append(c.lower()) #10.5 else: attrValue.append(c) elif c == ">": return "".join(attrName), "" elif c in asciiUppercaseBytes: attrValue.append(c.lower()) elif c is None: return None else: attrValue.append(c) # Step 11 while True: c = data.next() if c in spacesAngleBrackets: return "".join(attrName), "".join(attrValue) elif c in asciiUppercaseBytes: attrValue.append(c.lower()) elif c is None: return None else: attrValue.append(c) class ContentAttrParser(object): def __init__(self, data): self.data = data def parse(self): try: #Check if the attr name is charset #otherwise return self.data.jumpTo("charset") self.data.position += 1 self.data.skip() if not self.data.currentByte == "=": #If there is no = sign keep looking for attrs return None self.data.position += 1 self.data.skip() #Look for an encoding between matching quote marks if self.data.currentByte in ('"', "'"): quoteMark = self.data.currentByte self.data.position += 1 oldPosition = self.data.position if self.data.jumpTo(quoteMark): return self.data[oldPosition:self.data.position] else: return None else: #Unquoted value oldPosition = self.data.position try: self.data.skipUntil(spaceCharactersBytes) return self.data[oldPosition:self.data.position] except StopIteration: #Return the whole remaining value return self.data[oldPosition:] except StopIteration: return None def codecName(encoding): """Return the python codec name corresponding to an encoding or None if the string doesn't correspond to a valid encoding.""" if (encoding is not None and type(encoding) in types.StringTypes): canonicalName = ascii_punctuation_re.sub("", encoding).lower() return encodings.get(canonicalName, None) else: return None