import codecs
import re
import types
import sys
from constants import EOF, spaceCharacters, asciiLetters, asciiUppercase
from constants import encodings, ReparseException
import utils
#Non-unicode versions of constants for use in the pre-parser
spaceCharactersBytes = frozenset([str(item) for item in spaceCharacters])
asciiLettersBytes = frozenset([str(item) for item in asciiLetters])
asciiUppercaseBytes = frozenset([str(item) for item in asciiUppercase])
spacesAngleBrackets = spaceCharactersBytes | frozenset([">", "<"])
invalid_unicode_re = re.compile(u"[\u0001-\u0008\u000B\u000E-\u001F\u007F-\u009F\uD800-\uDFFF\uFDD0-\uFDEF\uFFFE\uFFFF\U0001FFFE\U0001FFFF\U0002FFFE\U0002FFFF\U0003FFFE\U0003FFFF\U0004FFFE\U0004FFFF\U0005FFFE\U0005FFFF\U0006FFFE\U0006FFFF\U0007FFFE\U0007FFFF\U0008FFFE\U0008FFFF\U0009FFFE\U0009FFFF\U000AFFFE\U000AFFFF\U000BFFFE\U000BFFFF\U000CFFFE\U000CFFFF\U000DFFFE\U000DFFFF\U000EFFFE\U000EFFFF\U000FFFFE\U000FFFFF\U0010FFFE\U0010FFFF]")
non_bmp_invalid_codepoints = set([0x1FFFE, 0x1FFFF, 0x2FFFE, 0x2FFFF, 0x3FFFE,
0x3FFFF, 0x4FFFE, 0x4FFFF, 0x5FFFE, 0x5FFFF,
0x6FFFE, 0x6FFFF, 0x7FFFE, 0x7FFFF, 0x8FFFE,
0x8FFFF, 0x9FFFE, 0x9FFFF, 0xAFFFE, 0xAFFFF,
0xBFFFE, 0xBFFFF, 0xCFFFE, 0xCFFFF, 0xDFFFE,
0xDFFFF, 0xEFFFE, 0xEFFFF, 0xFFFFE, 0xFFFFF,
0x10FFFE, 0x10FFFF])
ascii_punctuation_re = re.compile(ur"[\u0009-\u000D\u0020-\u002F\u003A-\u0040\u005B-\u0060\u007B-\u007E]")
# Cache for charsUntil()
charsUntilRegEx = {}
class BufferedStream:
"""Buffering for streams that do not have buffering of their own
The buffer is implemented as a list of chunks on the assumption that
joining many strings will be slow since it is O(n**2)
"""
def __init__(self, stream):
self.stream = stream
self.buffer = []
self.position = [-1,0] #chunk number, offset
def tell(self):
pos = 0
for chunk in self.buffer[:self.position[0]]:
pos += len(chunk)
pos += self.position[1]
return pos
def seek(self, pos):
assert pos < self._bufferedBytes()
offset = pos
i = 0
while len(self.buffer[i]) < offset:
offset -= pos
i += 1
self.position = [i, offset]
def read(self, bytes):
if not self.buffer:
return self._readStream(bytes)
elif (self.position[0] == len(self.buffer) and
self.position[1] == len(self.buffer[-1])):
return self._readStream(bytes)
else:
return self._readFromBuffer(bytes)
def _bufferedBytes(self):
return sum([len(item) for item in self.buffer])
def _readStream(self, bytes):
data = self.stream.read(bytes)
self.buffer.append(data)
self.position[0] += 1
self.position[1] = len(data)
return data
def _readFromBuffer(self, bytes):
remainingBytes = bytes
rv = []
bufferIndex = self.position[0]
bufferOffset = self.position[1]
while bufferIndex < len(self.buffer) and remainingBytes != 0:
assert remainingBytes > 0
bufferedData = self.buffer[bufferIndex]
if remainingBytes <= len(bufferedData) - bufferOffset:
bytesToRead = remainingBytes
self.position = [bufferIndex, bufferOffset + bytesToRead]
else:
bytesToRead = len(bufferedData) - bufferOffset
self.position = [bufferIndex, len(bufferedData)]
bufferIndex += 1
data = rv.append(bufferedData[bufferOffset:
bufferOffset + bytesToRead])
remainingBytes -= bytesToRead
bufferOffset = 0
if remainingBytes:
rv.append(self._readStream(remainingBytes))
return "".join(rv)
class HTMLInputStream:
"""Provides a unicode stream of characters to the HTMLTokenizer.
This class takes care of character encoding and removing or replacing
incorrect byte-sequences and also provides column and line tracking.
"""
_defaultChunkSize = 10240
def __init__(self, source, encoding=None, parseMeta=True, chardet=True):
"""Initialises the HTMLInputStream.
HTMLInputStream(source, [encoding]) -> Normalized stream from source
for use by html5lib.
source can be either a file-object, local filename or a string.
The optional encoding parameter must be a string that indicates
the encoding. If specified, that encoding will be used,
regardless of any BOM or later declaration (such as in a meta
element)
parseMeta - Look for a element containing encoding information
"""
#Craziness
if len(u"\U0010FFFF") == 1:
self.reportCharacterErrors = self.characterErrorsUCS4
self.replaceCharactersRegexp = re.compile(u"[\uD800-\uDFFF]")
else:
self.reportCharacterErrors = self.characterErrorsUCS2
self.replaceCharactersRegexp = re.compile(u"([\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?= self.chunkSize:
if not self.readChunk():
return EOF
chunkOffset = self.chunkOffset
char = self.chunk[chunkOffset]
self.chunkOffset = chunkOffset + 1
return char
def readChunk(self, chunkSize=None):
if chunkSize is None:
chunkSize = self._defaultChunkSize
self.prevNumLines, self.prevNumCols = self._position(self.chunkSize)
self.chunk = u""
self.chunkSize = 0
self.chunkOffset = 0
data = self.dataStream.read(chunkSize)
#Deal with CR LF and surrogates broken across chunks
if self._bufferedCharacter:
data = self._bufferedCharacter + data
self._bufferedCharacter = None
elif not data:
# We have no more data, bye-bye stream
return False
if len(data) > 1:
lastv = ord(data[-1])
if lastv == 0x0D or 0xD800 <= lastv <= 0xDBFF:
self._bufferedCharacter = data[-1]
data = data[:-1]
self.reportCharacterErrors(data)
# Replace invalid characters
# Note U+0000 is dealt with in the tokenizer
data = self.replaceCharactersRegexp.sub(u"\ufffd", data)
data = data.replace(u"\r\n", u"\n")
data = data.replace(u"\r", u"\n")
self.chunk = data
self.chunkSize = len(data)
return True
def characterErrorsUCS4(self, data):
for i in xrange(len(invalid_unicode_re.findall(data))):
self.errors.append("invalid-codepoint")
def characterErrorsUCS2(self, data):
#Someone picked the wrong compile option
#You lose
skip = False
import sys
for match in invalid_unicode_re.finditer(data):
if skip:
continue
codepoint = ord(match.group())
pos = match.start()
#Pretty sure there should be endianness issues here
if utils.isSurrogatePair(data[pos:pos+2]):
#We have a surrogate pair!
char_val = utils.surrogatePairToCodepoint(data[pos:pos+2])
if char_val in non_bmp_invalid_codepoints:
self.errors.append("invalid-codepoint")
skip = True
elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and
pos == len(data) - 1):
self.errors.append("invalid-codepoint")
else:
skip = False
self.errors.append("invalid-codepoint")
def charsUntil(self, characters, opposite = False):
""" Returns a string of characters from the stream up to but not
including any character in 'characters' or EOF. 'characters' must be
a container that supports the 'in' method and iteration over its
characters.
"""
# Use a cache of regexps to find the required characters
try:
chars = charsUntilRegEx[(characters, opposite)]
except KeyError:
if __debug__:
for c in characters:
assert(ord(c) < 128)
regex = u"".join([u"\\x%02x" % ord(c) for c in characters])
if not opposite:
regex = u"^%s" % regex
chars = charsUntilRegEx[(characters, opposite)] = re.compile(u"[%s]+" % regex)
rv = []
while True:
# Find the longest matching prefix
m = chars.match(self.chunk, self.chunkOffset)
if m is None:
# If nothing matched, and it wasn't because we ran out of chunk,
# then stop
if self.chunkOffset != self.chunkSize:
break
else:
end = m.end()
# If not the whole chunk matched, return everything
# up to the part that didn't match
if end != self.chunkSize:
rv.append(self.chunk[self.chunkOffset:end])
self.chunkOffset = end
break
# If the whole remainder of the chunk matched,
# use it all and read the next chunk
rv.append(self.chunk[self.chunkOffset:])
if not self.readChunk():
# Reached EOF
break
r = u"".join(rv)
return r
def unget(self, char):
# Only one character is allowed to be ungotten at once - it must
# be consumed again before any further call to unget
if char is not None:
if self.chunkOffset == 0:
# unget is called quite rarely, so it's a good idea to do
# more work here if it saves a bit of work in the frequently
# called char and charsUntil.
# So, just prepend the ungotten character onto the current
# chunk:
self.chunk = char + self.chunk
self.chunkSize += 1
else:
self.chunkOffset -= 1
assert self.chunk[self.chunkOffset] == char
class EncodingBytes(str):
"""String-like object with an associated position and various extra methods
If the position is ever greater than the string length then an exception is
raised"""
def __new__(self, value):
return str.__new__(self, value.lower())
def __init__(self, value):
self._position=-1
def __iter__(self):
return self
def next(self):
p = self._position = self._position + 1
if p >= len(self):
raise StopIteration
elif p < 0:
raise TypeError
return self[p]
def previous(self):
p = self._position
if p >= len(self):
raise StopIteration
elif p < 0:
raise TypeError
self._position = p = p - 1
return self[p]
def setPosition(self, position):
if self._position >= len(self):
raise StopIteration
self._position = position
def getPosition(self):
if self._position >= len(self):
raise StopIteration
if self._position >= 0:
return self._position
else:
return None
position = property(getPosition, setPosition)
def getCurrentByte(self):
return self[self.position]
currentByte = property(getCurrentByte)
def skip(self, chars=spaceCharactersBytes):
"""Skip past a list of characters"""
p = self.position # use property for the error-checking
while p < len(self):
c = self[p]
if c not in chars:
self._position = p
return c
p += 1
self._position = p
return None
def skipUntil(self, chars):
p = self.position
while p < len(self):
c = self[p]
if c in chars:
self._position = p
return c
p += 1
self._position = p
return None
def matchBytes(self, bytes):
"""Look for a sequence of bytes at the start of a string. If the bytes
are found return True and advance the position to the byte after the
match. Otherwise return False and leave the position alone"""
p = self.position
data = self[p:p+len(bytes)]
rv = data.startswith(bytes)
if rv:
self.position += len(bytes)
return rv
def jumpTo(self, bytes):
"""Look for the next sequence of bytes matching a given sequence. If
a match is found advance the position to the last byte of the match"""
newPosition = self[self.position:].find(bytes)
if newPosition > -1:
# XXX: This is ugly, but I can't see a nicer way to fix this.
if self._position == -1:
self._position = 0
self._position += (newPosition + len(bytes)-1)
return True
else:
raise StopIteration
class EncodingParser(object):
"""Mini parser for detecting character encoding from meta elements"""
def __init__(self, data):
"""string - the data to work on for encoding detection"""
self.data = EncodingBytes(data)
self.encoding = None
def getEncoding(self):
methodDispatch = (
("")
def handleMeta(self):
if self.data.currentByte not in spaceCharactersBytes:
#if we have ")
def getAttribute(self):
"""Return a name,value pair for the next attribute in the stream,
if one is found, or None"""
data = self.data
# Step 1 (skip chars)
c = data.skip(spaceCharactersBytes | frozenset("/"))
# Step 2
if c in (">", None):
return None
# Step 3
attrName = []
attrValue = []
#Step 4 attribute name
while True:
if c == "=" and attrName:
break
elif c in spaceCharactersBytes:
#Step 6!
c = data.skip()
c = data.next()
break
elif c in ("/", ">"):
return "".join(attrName), ""
elif c in asciiUppercaseBytes:
attrName.append(c.lower())
elif c == None:
return None
else:
attrName.append(c)
#Step 5
c = data.next()
#Step 7
if c != "=":
data.previous()
return "".join(attrName), ""
#Step 8
data.next()
#Step 9
c = data.skip()
#Step 10
if c in ("'", '"'):
#10.1
quoteChar = c
while True:
#10.2
c = data.next()
#10.3
if c == quoteChar:
data.next()
return "".join(attrName), "".join(attrValue)
#10.4
elif c in asciiUppercaseBytes:
attrValue.append(c.lower())
#10.5
else:
attrValue.append(c)
elif c == ">":
return "".join(attrName), ""
elif c in asciiUppercaseBytes:
attrValue.append(c.lower())
elif c is None:
return None
else:
attrValue.append(c)
# Step 11
while True:
c = data.next()
if c in spacesAngleBrackets:
return "".join(attrName), "".join(attrValue)
elif c in asciiUppercaseBytes:
attrValue.append(c.lower())
elif c is None:
return None
else:
attrValue.append(c)
class ContentAttrParser(object):
def __init__(self, data):
self.data = data
def parse(self):
try:
#Check if the attr name is charset
#otherwise return
self.data.jumpTo("charset")
self.data.position += 1
self.data.skip()
if not self.data.currentByte == "=":
#If there is no = sign keep looking for attrs
return None
self.data.position += 1
self.data.skip()
#Look for an encoding between matching quote marks
if self.data.currentByte in ('"', "'"):
quoteMark = self.data.currentByte
self.data.position += 1
oldPosition = self.data.position
if self.data.jumpTo(quoteMark):
return self.data[oldPosition:self.data.position]
else:
return None
else:
#Unquoted value
oldPosition = self.data.position
try:
self.data.skipUntil(spaceCharactersBytes)
return self.data[oldPosition:self.data.position]
except StopIteration:
#Return the whole remaining value
return self.data[oldPosition:]
except StopIteration:
return None
def codecName(encoding):
"""Return the python codec name corresponding to an encoding or None if the
string doesn't correspond to a valid encoding."""
if (encoding is not None and type(encoding) in types.StringTypes):
canonicalName = ascii_punctuation_re.sub("", encoding).lower()
return encodings.get(canonicalName, None)
else:
return None