SickRage/lib/cachecontrol/caches/file_cache.py

82 lines
2.4 KiB
Python

import hashlib
import os
from lockfile import FileLock
def _secure_open_write(filename, fmode):
# We only want to write to this file, so open it in write only mode
flags = os.O_WRONLY
# os.O_CREAT | os.O_EXCL will fail if the file already exists, so we only
# will open *new* files.
# We specify this because we want to ensure that the mode we pass is the
# mode of the file.
flags |= os.O_CREAT | os.O_EXCL
# Do not follow symlinks to prevent someone from making a symlink that
# we follow and insecurely open a cache file.
if hasattr(os, "O_NOFOLLOW"):
flags |= os.O_NOFOLLOW
# On Windows we'll mark this file as binary
if hasattr(os, "O_BINARY"):
flags |= os.O_BINARY
# Before we open our file, we want to delete any existing file that is
# there
try:
os.remove(filename)
except (IOError, OSError):
# The file must not exist already, so we can just skip ahead to opening
pass
# Open our file, the use of os.O_CREAT | os.O_EXCL will ensure that if a
# race condition happens between the os.remove and this line, that an
# error will be raised. Because we utilize a lockfile this should only
# happen if someone is attempting to attack us.
fd = os.open(filename, flags, fmode)
try:
return os.fdopen(fd, "wb")
except:
# An error occurred wrapping our FD in a file object
os.close(fd)
raise
class FileCache(object):
def __init__(self, directory, forever=False, filemode=0o0600,
dirmode=0o0700):
self.directory = directory
self.forever = forever
self.filemode = filemode
if not os.path.isdir(self.directory):
os.makedirs(self.directory, dirmode)
@staticmethod
def encode(x):
return hashlib.sha224(x.encode()).hexdigest()
def _fn(self, name):
return os.path.join(self.directory, self.encode(name))
def get(self, key):
name = self._fn(key)
if not os.path.exists(name):
return None
with open(name, 'rb') as fh:
return fh.read()
def set(self, key, value):
name = self._fn(key)
with FileLock(name) as lock:
with _secure_open_write(lock.path, self.filemode) as fh:
fh.write(value)
def delete(self, key):
name = self._fn(key)
if not self.forever:
os.remove(name)