- #!/usr/bin/env python
- #
- # Connector for elFinder File Manager
- # author Troex Nevelin
- import hashlib
- import mimetypes
- import os
- import os.path
- import re
- import shutil
- import time
- from datetime import datetime
- class connector():
- """Connector for elFinder"""
- _options = {
- 'root': '',
- 'URL': '',
- 'rootAlias': 'Home',
- 'dotFiles': False,
- 'dirSize': True,
- 'fileMode': 0644,
- 'dirMode': 0755,
- 'imgLib': 'auto',
- 'tmbDir': '.tmb',
- 'tmbAtOnce': 5,
- 'tmbSize': 48,
- 'fileURL': True,
- 'uploadMaxSize': 256,
- 'uploadWriteChunk': 8192,
- 'uploadAllow': [],
- 'uploadDeny': [],
- 'uploadOrder': ['deny', 'allow'],
- # 'aclObj': None, # TODO
- # 'aclRole': 'user', # TODO
- 'defaults': {
- 'read': True,
- 'write': True,
- 'rm': True
- },
- 'perms': {},
- 'archiveMimes': {},
- 'archivers': {},
- 'disabled': [],
- 'debug': False
- }
- _commands = {
- 'open': '__open',
- 'reload': '__reload',
- 'mkdir': '__mkdir',
- 'mkfile': '__mkfile',
- 'rename': '__rename',
- 'upload': '__upload',
- 'paste': '__paste',
- 'rm': '__rm',
- 'duplicate': '__duplicate',
- 'read': '__read',
- 'edit': '__edit',
- 'extract': '__extract',
- 'archive': '__archive',
- 'resize': '__resize',
- 'tmb': '__thumbnails',
- 'ping': '__ping'
- }
- _mimeType = {
- # text
- 'txt': 'text/plain',
- 'conf': 'text/plain',
- 'ini': 'text/plain',
- 'php': 'text/x-php',
- 'html': 'text/html',
- 'htm': 'text/html',
- 'js' : 'text/javascript',
- 'css': 'text/css',
- 'rtf': 'text/rtf',
- 'rtfd': 'text/rtfd',
- 'py' : 'text/x-python',
- 'java': 'text/x-java-source',
- 'rb' : 'text/x-ruby',
- 'sh' : 'text/x-shellscript',
- 'pl' : 'text/x-perl',
- 'sql': 'text/x-sql',
- # apps
- 'doc': 'application/msword',
- 'ogg': 'application/ogg',
- '7z': 'application/x-7z-compressed',
- # video
- 'ogm': 'appllication/ogm',
- 'mkv': 'video/x-matroska'
- }
- _time = 0
- _request = {}
- _response = {}
- _errorData = {}
- _form = {}
- _im = None
- _sp = None
- _today = 0
- _yesterday = 0
- # public variables
- httpAllowedParameters = ('cmd', 'target', 'targets[]', 'current', 'tree', 'name',
- 'content', 'src', 'dst', 'cut', 'init', 'type', 'width', 'height', 'upload[]')
- # return variables
- httpStatusCode = 0
- httpHeader = {}
- httpResponse = None
- def __init__(self, opts):
- for opt in opts:
- self._options[opt] = opts.get(opt)
- self._response['debug'] = {}
- self._options['URL'] = self._options['URL'].rstrip('/')
- self._options['root'] = self._options['root'].rstrip(os.sep)
- self.__debug('URL', self._options['URL'])
- self.__debug('root', self._options['root'])
- for cmd in self._options['disabled']:
- if cmd in self._commands:
- del self._commands[cmd]
- if self._options['tmbDir']:
- self._options['tmbDir'] = os.path.join(self._options['root'], self._options['tmbDir'])
- if not os.path.exists(self._options['tmbDir']):
- self._options['tmbDir'] = False
- def __reset(self):
- """Flush per request variables"""
- self.httpStatusCode = 0
- self.httpHeader = {}
- self.httpResponse = None
- self._request = {}
- self._response = {}
- self._errorData = {}
- self._form = {}
- self._time = time.time()
- t = datetime.fromtimestamp(self._time)
- self._today = time.mktime(datetime(t.year, t.month, t.day).timetuple())
- self._yesterday = self._today - 86400
- self._response['debug'] = {}
- def run(self, httpRequest = []):
- """main function"""
- self.__reset()
- rootOk = True
- if not os.path.exists(self._options['root']) or self._options['root'] == '':
- rootOk = False
- self._response['error'] = 'Invalid backend configuration'
- elif not self.__isAllowed(self._options['root'], 'read'):
- rootOk = False
- self._response['error'] = 'Access denied'
- for field in self.httpAllowedParameters:
- if field in httpRequest:
- self._request[field] = httpRequest[field]
- if rootOk is True:
- if 'cmd' in self._request:
- if self._request['cmd'] in self._commands:
- cmd = self._commands[self._request['cmd']]
- func = getattr(self, '_' + self.__class__.__name__ + cmd, None)
- if callable(func):
- try:
- func()
- except Exception, e:
- self._response['error'] = 'Command Failed'
- self.__debug('exception', str(e))
- else:
- self._response['error'] = 'Unknown command'
- else:
- self.__open()
- if 'init' in self._request:
- self.__checkArchivers()
- self._response['disabled'] = self._options['disabled']
- if not self._options['fileURL']:
- url = ''
- else:
- url = self._options['URL']
- self._response['params'] = {
- 'dotFiles': self._options['dotFiles'],
- 'uplMaxSize': str(self._options['uploadMaxSize']) + 'M',
- 'archives': self._options['archiveMimes'],
- 'extract': self._options['archivers']['extract'].keys(),
- 'url': url
- }
- if self._errorData:
- self._response['errorData'] = self._errorData
- if self._options['debug']:
- self.__debug('time', (time.time() - self._time))
- else:
- if 'debug' in self._response:
- del self._response['debug']
- if self.httpStatusCode < 100:
- self.httpStatusCode = 200
- if not 'Content-type' in self.httpHeader:
- if ('cmd' in self._request and self._request['cmd'] == 'upload') or self._options['debug']:
- self.httpHeader['Content-type'] = 'text/html'
- else:
- self.httpHeader['Content-type'] = 'application/json'
- self.httpResponse = self._response
- return self.httpStatusCode, self.httpHeader, self.httpResponse
- def __open(self):
- """Open file or directory"""
- # try to open file
- if 'current' in self._request:
- curDir = self.__findDir(self._request['current'], None)
- curFile = self.__find(self._request['target'], curDir)
- if not curDir or not curFile or os.path.isdir(curFile):
- self.httpStatusCode = 404
- self.httpHeader['Content-type'] = 'text/html'
- self.httpResponse = 'File not found'
- return
- if not self.__isAllowed(curDir, 'read') or not self.__isAllowed(curFile, 'read'):
- self.httpStatusCode = 403
- self.httpHeader['Content-type'] = 'text/html'
- self.httpResponse = 'Access denied'
- return
- if os.path.islink(curFile):
- curFile = self.__readlink(curFile)
- if not curFile or os.path.isdir(curFile):
- self.httpStatusCode = 404
- self.httpHeader['Content-type'] = 'text/html'
- self.httpResponse = 'File not found'
- return
- if (
- not self.__isAllowed(os.path.dirname(curFile), 'read')
- or not self.__isAllowed(curFile, 'read')
- ):
- self.httpStatusCode = 403
- self.httpHeader['Content-type'] = 'text/html'
- self.httpResponse = 'Access denied'
- return
- mime = self.__mimetype(curFile)
- parts = mime.split('/', 2)
- if parts[0] == 'image': disp = 'image'
- elif parts[0] == 'text': disp = 'inline'
- else: disp = 'attachments'
- self.httpStatusCode = 200
- self.httpHeader['Content-type'] = mime
- self.httpHeader['Content-Disposition'] = disp + '; filename=' + os.path.basename(curFile)
- self.httpHeader['Content-Location'] = curFile.replace(self._options['root'], '')
- self.httpHeader['Content-Transfer-Encoding'] = 'binary'
- self.httpHeader['Content-Length'] = str(os.lstat(curFile).st_size)
- self.httpHeader['Connection'] = 'close'
- self._response['file'] = open(curFile, 'r')
- return
- # try dir
- else:
- path = self._options['root']
- if 'target' in self._request:
- target = self.__findDir(self._request['target'], None)
- if not target:
- self._response['error'] = 'Invalid parameters'
- elif not self.__isAllowed(target, 'read'):
- self._response['error'] = 'Access denied'
- else:
- path = target
- self.__content(path, 'tree' in self._request)
- pass
- def __rename(self):
- """Rename file or dir"""
- current = name = target = None
- curDir = curName = newName = None
- if 'name' in self._request and 'current' in self._request and 'target' in self._request:
- name = self._request['name']
- current = self._request['current']
- target = self._request['target']
- curDir = self.__findDir(current, None)
- curName = self.__find(target, curDir)
- newName = os.path.join(curDir, name)
- if not curDir or not curName:
- self._response['error'] = 'File not found'
- elif not self.__isAllowed(curDir, 'write') and self.__isAllowed(curName, 'rm'):
- self._response['error'] = 'Access denied'
- elif not self.__checkName(name):
- self._response['error'] = 'Invalid name'
- elif os.path.exists(newName):
- self._response['error'] = 'File or folder with the same name already exists'
- else:
- self.__rmTmb(curName)
- try:
- os.rename(curName, newName)
- self._response['select'] = [self.__hash(newName)]
- self.__content(curDir, os.path.isdir(newName))
- except:
- self._response['error'] = 'Unable to rename file'
- def __mkdir(self):
- """Create new directory"""
- current = None
- path = None
- newDir = None
- if 'name' in self._request and 'current' in self._request:
- name = self._request['name']
- current = self._request['current']
- path = self.__findDir(current, None)
- newDir = os.path.join(path, name)
- if not path:
- self._response['error'] = 'Invalid parameters'
- elif not self.__isAllowed(path, 'write'):
- self._response['error'] = 'Access denied'
- elif not self.__checkName(name):
- self._response['error'] = 'Invalid name'
- elif os.path.exists(newDir):
- self._response['error'] = 'File or folder with the same name already exists'
- else:
- try:
- os.mkdir(newDir, int(self._options['dirMode']))
- self._response['select'] = [self.__hash(newDir)]
- self.__content(path, True)
- except:
- self._response['error'] = 'Unable to create folder'
- def __mkfile(self):
- """Create new file"""
- name = current = None
- curDir = newFile = None
- if 'name' in self._request and 'current' in self._request:
- name = self._request['name']
- current = self._request['current']
- curDir = self.__findDir(current, None)
- newFile = os.path.join(curDir, name)
- if not curDir or not name:
- self._response['error'] = 'Invalid parameters'
- elif not self.__isAllowed(curDir, 'write'):
- self._response['error'] = 'Access denied'
- elif not self.__checkName(name):
- self._response['error'] = 'Invalid name'
- elif os.path.exists(newFile):
- self._response['error'] = 'File or folder with the same name already exists'
- else:
- try:
- open(newFile, 'w').close()
- self._response['select'] = [self.__hash(newFile)]
- self.__content(curDir, False)
- except:
- self._response['error'] = 'Unable to create file'
- def __rm(self):
- """Delete files and directories"""
- current = rmList = None
- curDir = rmFile = None
- if 'current' in self._request and 'targets[]' in self._request:
- current = self._request['current']
- rmList = self._request['targets[]']
- curDir = self.__findDir(current, None)
- if not rmList or not curDir:
- self._response['error'] = 'Invalid parameters'
- return False
- if not isinstance(rmList, list):
- rmList = [rmList]
- for rm in rmList:
- rmFile = self.__find(rm, curDir)
- if not rmFile: continue
- self.__remove(rmFile)
- # TODO if errorData not empty return error
- self.__content(curDir, True)
- def __upload(self):
- """Upload files"""
- try: # Windows needs stdio set for binary mode.
- import msvcrt
- msvcrt.setmode (0, os.O_BINARY) # stdin = 0
- msvcrt.setmode (1, os.O_BINARY) # stdout = 1
- except ImportError:
- pass
- if 'current' in self._request:
- curDir = self.__findDir(self._request['current'], None)
- if not curDir:
- self._response['error'] = 'Invalid parameters'
- return
- if not self.__isAllowed(curDir, 'write'):
- self._response['error'] = 'Access denied'
- return
- if not 'upload[]' in self._request:
- self._response['error'] = 'No file to upload'
- return
- upFiles = self._request['upload[]']
- # invalid format
- # must be dict('filename1': 'filedescriptor1', 'filename2': 'filedescriptor2', ...)
- if not isinstance(upFiles, dict):
- self._response['error'] = 'Invalid parameters'
- return
- self._response['select'] = []
- total = 0
- upSize = 0
- maxSize = self._options['uploadMaxSize'] * 1024 * 1024
- for name, data in upFiles.iteritems():
- if name:
- total += 1
- name = os.path.basename(name)
- if not self.__checkName(name):
- self.__errorData(name, 'Invalid name')
- else:
- name = os.path.join(curDir, name)
- try:
- f = open(name, 'wb', self._options['uploadWriteChunk'])
- for chunk in self.__fbuffer(data):
- f.write(chunk)
- f.close()
- upSize += os.lstat(name).st_size
- if self.__isUploadAllow(name):
- os.chmod(name, self._options['fileMode'])
- self._response['select'].append(self.__hash(name))
- else:
- self.__errorData(name, 'Not allowed file type')
- try:
- os.unlink(name)
- except:
- pass
- except:
- self.__errorData(name, 'Unable to save uploaded file')
- if upSize > maxSize:
- try:
- os.unlink(name)
- self.__errorData(name, 'File exceeds the maximum allowed filesize')
- except:
- pass
- # TODO ?
- self.__errorData(name, 'File was only partially uploaded')
- break
- if self._errorData:
- if len(self._errorData) == total:
- self._response['error'] = 'Unable to upload files'
- else:
- self._response['error'] = 'Some files was not uploaded'
- self.__content(curDir, False)
- return
- def __paste(self):
- """Copy or cut files/directories"""
- if 'current' in self._request and 'src' in self._request and 'dst' in self._request:
- curDir = self.__findDir(self._request['current'], None)
- src = self.__findDir(self._request['src'], None)
- dst = self.__findDir(self._request['dst'], None)
- if not curDir or not src or not dst or not 'targets[]' in self._request:
- self._response['error'] = 'Invalid parameters'
- return
- files = self._request['targets[]']
- if not isinstance(files, list):
- files = [files]
- cut = False
- if 'cut' in self._request:
- if self._request['cut'] == '1':
- cut = True
- if not self.__isAllowed(src, 'read') or not self.__isAllowed(dst, 'write'):
- self._response['error'] = 'Access denied'
- return
- for fhash in files:
- f = self.__find(fhash, src)
- if not f:
- self._response['error'] = 'File not found'
- return
- newDst = os.path.join(dst, os.path.basename(f))
- if dst.find(f) == 0:
- self._response['error'] = 'Unable to copy into itself'
- return
- if cut:
- if not self.__isAllowed(f, 'rm'):
- self._response['error'] = 'Move failed'
- self._errorData(f, 'Access denied')
- self.__content(curDir, True)
- return
- # TODO thumbs
- if os.path.exists(newDst):
- self._response['error'] = 'Unable to move files'
- self._errorData(f, 'File or folder with the same name already exists')
- self.__content(curDir, True)
- return
- try:
- os.rename(f, newDst)
- self.__rmTmb(f)
- continue
- except:
- self._response['error'] = 'Unable to move files'
- self._errorData(f, 'Unable to move')
- self.__content(curDir, True)
- return
- else:
- if not self.__copy(f, newDst):
- self._response['error'] = 'Unable to copy files'
- self.__content(curDir, True)
- return
- continue
- self.__content(curDir, True)
- else:
- self._response['error'] = 'Invalid parameters'
- return
- def __duplicate(self):
- """Create copy of files/directories"""
- if 'current' in self._request and 'target' in self._request:
- curDir = self.__findDir(self._request['current'], None)
- target = self.__find(self._request['target'], curDir)
- if not curDir or not target:
- self._response['error'] = 'Invalid parameters'
- return
- if not self.__isAllowed(target, 'read') or not self.__isAllowed(curDir, 'write'):
- self._response['error'] = 'Access denied'
- newName = self.__uniqueName(target)
- if not self.__copy(target, newName):
- self._response['error'] = 'Unable to create file copy'
- return
- self.__content(curDir, True)
- return
- def __resize(self):
- """Scale image size"""
- if not (
- 'current' in self._request and 'target' in self._request
- and 'width' in self._request and 'height' in self._request
- ):
- self._response['error'] = 'Invalid parameters'
- return
- width = int(self._request['width'])
- height = int(self._request['height'])
- curDir = self.__findDir(self._request['current'], None)
- curFile = self.__find(self._request['target'], curDir)
- if width < 1 or height < 1 or not curDir or not curFile:
- self._response['error'] = 'Invalid parameters'
- return
- if not self.__isAllowed(curFile, 'write'):
- self._response['error'] = 'Access denied'
- return
- if not self.__mimetype(curFile).find('image') == 0:
- self._response['error'] = 'File is not an image'
- return
- self.__debug('resize ' + curFile, str(width) + ':' + str(height))
- self.__initImgLib()
- try:
- im = self._im.open(curFile)
- imResized = im.resize((width, height), self._im.ANTIALIAS)
- imResized.save(curFile)
- except Exception, e:
- self.__debug('resizeFailed_' + path, str(e))
- self._response['error'] = 'Unable to resize image'
- return
- self._response['select'] = [self.__hash(curFile)]
- self.__content(curDir, False)
- return
- def __thumbnails(self):
- """Create previews for images"""
- if 'current' in self._request:
- curDir = self.__findDir(self._request['current'], None)
- if not curDir or curDir == self._options['tmbDir']:
- return False
- else:
- return False
- self.__initImgLib()
- if self.__canCreateTmb():
- if self._options['tmbAtOnce'] > 0:
- tmbMax = self._options['tmbAtOnce']
- else:
- tmbMax = 5
- self._response['current'] = self.__hash(curDir)
- self._response['images'] = {}
- i = 0
- for f in os.listdir(curDir):
- path = os.path.join(curDir, f)
- fhash = self.__hash(path)
- if self.__canCreateTmb(path) and self.__isAllowed(path, 'read'):
- tmb = os.path.join(self._options['tmbDir'], fhash + '.png')
- if not os.path.exists(tmb):
- if self.__tmb(path, tmb):
- self._response['images'].update({
- fhash: self.__path2url(tmb)
- })
- i += 1
- if i >= tmbMax:
- self._response['tmb'] = True
- break
- else:
- return False
- return
- def __content(self, path, tree):
- """CWD + CDC + maybe(TREE)"""
- self.__cwd(path)
- self.__cdc(path)
- if tree:
- self._response['tree'] = self.__tree(self._options['root'])
- def __cwd(self, path):
- """Current Working Directory"""
- name = os.path.basename(path)
- if path == self._options['root']:
- name = self._options['rootAlias']
- root = True
- else:
- root = False
- if self._options['rootAlias']:
- basename = self._options['rootAlias']
- else:
- basename = os.path.basename(self._options['root'])
- rel = basename + path[len(self._options['root']):]
- self._response['cwd'] = {
- 'hash': self.__hash(path),
- 'name': self.__checkUtf8(name),
- 'mime': 'directory',
- 'rel': self.__checkUtf8(rel),
- 'size': 0,
- 'date': datetime.fromtimestamp(os.stat(path).st_mtime).strftime("%d %b %Y %H:%M"),
- 'read': True,
- 'write': self.__isAllowed(path, 'write'),
- 'rm': not root and self.__isAllowed(path, 'rm')
- }
- def __cdc(self, path):
- """Current Directory Content"""
- files = []
- dirs = []
- for f in sorted(os.listdir(path)):
- if not self.__isAccepted(f): continue
- pf = os.path.join(path, f)
- info = {}
- info = self.__info(pf)
- info['hash'] = self.__hash(pf)
- if info['mime'] == 'directory':
- dirs.append(info)
- else:
- files.append(info)
- dirs.extend(files)
- self._response['cdc'] = dirs
- def __info(self, path):
- mime = ''
- filetype = 'file'
- if os.path.isfile(path): filetype = 'file'
- if os.path.isdir(path): filetype = 'dir'
- if os.path.islink(path): filetype = 'link'
- stat = os.lstat(path)
- statDate = datetime.fromtimestamp(stat.st_mtime)
- fdate = ''
- if stat.st_mtime >= self._today:
- fdate = 'Today ' + statDate.strftime("%H:%M")
- elif stat.st_mtime >= self._yesterday and stat.st_mtime < self._today:
- fdate = 'Yesterday ' + statDate.strftime("%H:%M")
- else:
- fdate = statDate.strftime("%d %b %Y %H:%M")
- info = {
- 'name': self.__checkUtf8(os.path.basename(path)),
- 'hash': self.__hash(path),
- 'mime': 'directory' if filetype == 'dir' else self.__mimetype(path),
- 'date': fdate,
- 'size': self.__dirSize(path) if filetype == 'dir' else stat.st_size,
- 'read': self.__isAllowed(path, 'read'),
- 'write': self.__isAllowed(path, 'write'),
- 'rm': self.__isAllowed(path, 'rm')
- }
- if filetype == 'link':
- lpath = self.__readlink(path)
- if not lpath:
- info['mime'] = 'symlink-broken'
- return info
- if os.path.isdir(lpath):
- info['mime'] = 'directory'
- else:
- info['parent'] = self.__hash(os.path.dirname(lpath))
- info['mime'] = self.__mimetype(lpath)
- if self._options['rootAlias']:
- basename = self._options['rootAlias']
- else:
- basename = os.path.basename(self._options['root'])
- info['link'] = self.__hash(lpath)
- info['linkTo'] = basename + lpath[len(self._options['root']):]
- info['read'] = info['read'] and self.__isAllowed(lpath, 'read')
- info['write'] = info['write'] and self.__isAllowed(lpath, 'write')
- info['rm'] = self.__isAllowed(lpath, 'rm')
- else:
- lpath = False
- if not info['mime'] == 'directory':
- if self._options['fileURL'] and info['read'] is True:
- if lpath:
- info['url'] = self.__path2url(lpath)
- else:
- info['url'] = self.__path2url(path)
- if info['mime'][0:5] == 'image':
- if self.__canCreateTmb():
- dim = self.__getImgSize(path)
- if dim:
- info['dim'] = dim
- info['resize'] = True
- # if we are in tmb dir, files are thumbs itself
- if os.path.dirname(path) == self._options['tmbDir']:
- info['tmb'] = self.__path2url(path)
- return info
- tmb = os.path.join(self._options['tmbDir'], info['hash'] + '.png')
- if os.path.exists(tmb):
- tmbUrl = self.__path2url(tmb)
- info['tmb'] = tmbUrl
- else:
- self._response['tmb'] = True
- return info
- def __tree(self, path):
- """Return directory tree starting from path"""
- if not os.path.isdir(path): return ''
- if os.path.islink(path): return ''
- if path == self._options['root'] and self._options['rootAlias']:
- name = self._options['rootAlias']
- else:
- name = os.path.basename(path)
- tree = {
- 'hash': self.__hash(path),
- 'name': self.__checkUtf8(name),
- 'read': self.__isAllowed(path, 'read'),
- 'write': self.__isAllowed(path, 'write'),
- 'dirs': []
- }
- if self.__isAllowed(path, 'read'):
- for d in sorted(os.listdir(path)):
- pd = os.path.join(path, d)
- if os.path.isdir(pd) and not os.path.islink(pd) and self.__isAccepted(d):
- tree['dirs'].append(self.__tree(pd))
- return tree
- def __uniqueName(self, path, copy = ' copy'):
- """Generate unique name for file copied file"""
- curDir = os.path.dirname(path)
- curName = os.path.basename(path)
- lastDot = curName.rfind('.')
- ext = newName = ''
- if not os.path.isdir(path) and re.search(r'\..{3}\.(gz|bz|bz2)$', curName):
- pos = -7
- if curName[-1:] == '2':
- pos -= 1
- ext = curName[pos:]
- oldName = curName[0:pos]
- newName = oldName + copy
- elif os.path.isdir(path) or lastDot <= 0:
- oldName = curName
- newName = oldName + copy
- pass
- else:
- ext = curName[lastDot:]
- oldName = curName[0:lastDot]
- newName = oldName + copy
- pos = 0
- if oldName[-len(copy):] == copy:
- newName = oldName
- elif re.search(r'' + copy +'\s\d+$', oldName):
- pos = oldName.rfind(copy) + len(copy)
- newName = oldName[0:pos]
- else:
- newPath = os.path.join(curDir, newName + ext)
- if not os.path.exists(newPath):
- return newPath
- # if we are here then copy already exists or making copy of copy
- # we will make new indexed copy *black magic*
- idx = 1
- if pos > 0: idx = int(oldName[pos:])
- while True:
- idx += 1
- newNameExt = newName + ' ' + str(idx) + ext
- newPath = os.path.join(curDir, newNameExt)
- if not os.path.exists(newPath):
- return newPath
- # if idx >= 1000: break # possible loop
- return
- def __remove(self, target):
- """Internal remove procedure"""
- if not self.__isAllowed(target, 'rm'):
- self.__errorData(target, 'Access denied')
- if not os.path.isdir(target):
- try:
- os.unlink(target)
- return True
- except:
- self.__errorData(target, 'Remove failed')
- return False
- else:
- for i in os.listdir(target):
- if self.__isAccepted(i):
- self.__remove(os.path.join(target, i))
- try:
- os.rmdir(target)
- return True
- except:
- self.__errorData(target, 'Remove failed')
- return False
- pass
- def __copy(self, src, dst):
- """Internal copy procedure"""
- dstDir = os.path.dirname(dst)
- if not self.__isAllowed(src, 'read'):
- self.__errorData(src, 'Access denied')
- return False
- if not self.__isAllowed(dstDir, 'write'):
- self.__errorData(dstDir, 'Access denied')
- return False
- if os.path.exists(dst):
- self.__errorData(dst, 'File or folder with the same name already exists')
- return False
- if not os.path.isdir(src):
- try:
- shutil.copyfile(src, dst)
- shutil.copymode(src, dst)
- return True
- except:
- self.__errorData(src, 'Unable to copy files')
- return False
- else:
- try:
- os.mkdir(dst)
- shutil.copymode(src, dst)
- except:
- self.__errorData(src, 'Unable to copy files')
- return False
- for i in os.listdir(src):
- newSrc = os.path.join(src, i)
- newDst = os.path.join(dst, i)
- if not self.__copy(newSrc, newDst):
- self.__errorData(newSrc, 'Unable to copy files')
- return False
- return True
- def __checkName(self, name):
- """Check for valid file/dir name"""
- pattern = r'[\/\\\:\<\>]'
- if re.search(pattern, name):
- return False
- return True
- def __findDir(self, fhash, path):
- """Find directory by hash"""
- fhash = str(fhash)
- if not path:
- path = self._options['root']
- if fhash == self.__hash(path):
- return path
- if not os.path.isdir(path):
- return None
- for d in os.listdir(path):
- pd = os.path.join(path, d)
- if os.path.isdir(pd) and not os.path.islink(pd):
- if fhash == self.__hash(pd):
- return pd
- else:
- ret = self.__findDir(fhash, pd)
- if ret:
- return ret
- return None
- def __find(self, fhash, parent):
- """Find file/dir by hash"""
- fhash = str(fhash)
- if os.path.isdir(parent):
- for i in os.listdir(parent):
- path = os.path.join(parent, i)
- if fhash == self.__hash(path):
- return path
- return None
- def __read(self):
- if 'current' in self._request and 'target' in self._request:
- curDir = self.__findDir(self._request['current'], None)
- curFile = self.__find(self._request['target'], curDir)
- if curDir and curFile:
- if self.__isAllowed(curFile, 'read'):
- self._response['content'] = open(curFile, 'r').read()
- else:
- self._response['error'] = 'Access denied'
- return
- self._response['error'] = 'Invalid parameters'
- return
- def __edit(self):
- """Save content in file"""
- error = ''
- if 'current' in self._request and 'target' in self._request and 'content' in self._request:
- curDir = self.__findDir(self._request['current'], None)
- curFile = self.__find(self._request['target'], curDir)
- error = curFile
- if curFile and curDir:
- if self.__isAllowed(curFile, 'write'):
- try:
- f = open(curFile, 'w+')
- f.write(self._request['content'])
- f.close()
- self._response['target'] = self.__info(curFile)
- except:
- self._response['error'] = 'Unable to write to file'
- else:
- self._response['error'] = 'Access denied'
- return
- self._response['error'] = 'Invalid parameters'
- return
- def __archive(self):
- """Compress files/directories to archive"""
- self.__checkArchivers()
- if (
- not self._options['archivers']['create'] or not 'type' in self._request
- or not 'current' in self._request
- or not 'targets[]' in self._request
- or not 'name' in self._request
- ):
- self._response['error'] = 'Invalid parameters'
- return
- curDir = self.__findDir(self._request['current'], None)
- archiveType = self._request['type']
- if (
- not archiveType in self._options['archivers']['create']
- or not archiveType in self._options['archiveMimes']
- or not curDir
- or not self.__isAllowed(curDir, 'write')
- ):
- self._response['error'] = 'Unable to create archive'
- return
- files = self._request['targets[]']
- if not isinstance(files, list):
- files = [files]
- realFiles = []
- for fhash in files:
- curFile = self.__find(fhash, curDir)
- if not curFile:
- self._response['error'] = 'File not found'
- return
- realFiles.append(os.path.basename(curFile))
- arc = self._options['archivers']['create'][archiveType]
- if len(realFiles) > 1:
- archiveName = self._request['name']
- else:
- archiveName = realFiles[0]
- archiveName += '.' + arc['ext']
- archiveName = self.__uniqueName(archiveName, '')
- archivePath = os.path.join(curDir, archiveName)
- cmd = [arc['cmd']]
- for a in arc['argc'].split():
- cmd.append(a)
- cmd.append(archiveName)
- for f in realFiles:
- cmd.append(f)
- curCwd = os.getcwd()
- os.chdir(curDir)
- self.__runSubProcess(cmd)
- os.chdir(curCwd)
- if os.path.exists(archivePath):
- self.__content(curDir, False)
- self._response['select'] = [self.__hash(archivePath)]
- else:
- self._response['error'] = 'Unable to create archive'
- return
- def __extract(self):
- """Uncompress archive"""
- if not 'current' in self._request or not 'target' in self._request:
- self._response['error'] = 'Invalid parameters'
- return
- curDir = self.__findDir(self._request['current'], None)
- curFile = self.__find(self._request['target'], curDir)
- mime = self.__mimetype(curFile)
- self.__checkArchivers()
- if (
- not mime in self._options['archivers']['extract']
- or not curDir
- or not curFile
- or not self.__isAllowed(curDir, 'write')
- ):
- self._response['error'] = 'Invalid parameters'
- return
- arc = self._options['archivers']['extract'][mime]
- cmd = [arc['cmd']]
- for a in arc['argc'].split():
- cmd.append(a)
- cmd.append(curFile)
- curCwd = os.getcwd()
- os.chdir(curDir)
- ret = self.__runSubProcess(cmd)
- os.chdir(curCwd)
- if ret:
- self.__content(curDir, True)
- else:
- self._response['error'] = 'Unable to extract files from archive'
- return
- def __ping(self):
- """Workaround for Safari"""
- self.httpStatusCode = 200
- self.httpHeader['Connection'] = 'close'
- return
- def __mimetype(self, path):
- """Detect mimetype of file"""
- mime = mimetypes.guess_type(path)[0] or 'unknown'
- ext = path[path.rfind('.') + 1:]
- if mime == 'unknown' and ('.' + ext) in mimetypes.types_map:
- mime = mimetypes.types_map['.' + ext]
- if mime == 'text/plain' and ext == 'pl':
- mime = self._mimeType[ext]
- if mime == 'application/vnd.ms-office' and ext == 'doc':
- mime = self._mimeType[ext]
- if mime == 'unknown':
- if os.path.basename(path) in ['README', 'ChangeLog']:
- mime = 'text/plain'
- else:
- if ext in self._mimeType:
- mime = self._mimeType[ext]
- # self.__debug('mime ' + os.path.basename(path), ext + ' ' + mime)
- return mime
- def __tmb(self, path, tmb):
- """Internal thumbnail create procedure"""
- try:
- im = self._im.open(path).copy()
- size = self._options['tmbSize'], self._options['tmbSize']
- box = self.__cropTuple(im.size)
- if box:
- im = im.crop(box)
- im.thumbnail(size, self._im.ANTIALIAS)
- im.save(tmb, 'PNG')
- except Exception, e:
- self.__debug('tmbFailed_' + path, str(e))
- return False
- return True
- def __rmTmb(self, path):
- tmb = self.__tmbPath(path)
- if self._options['tmbDir']:
- if os.path.exists(tmb):
- try:
- os.unlink(tmb)
- except:
- pass
- def __cropTuple(self, size):
- w, h = size
- if w > h: # landscape
- l = int((w - h) / 2)
- u = 0
- r = l + h
- d = h
- return (l, u, r, d)
- elif h > w: # portrait
- l = 0
- u = int((h - w) / 2)
- r = w
- d = u + w
- return (l, u, r, d)
- else: # cube
- pass
- return False
- def __readlink(self, path):
- """Read link and return real path if not broken"""
- target = os.readlink(path);
- if not target[0] == '/':
- target = os.path.join(os.path.dirname(path), target)
- target = os.path.normpath(target)
- if os.path.exists(target):
- if not target.find(self._options['root']) == -1:
- return target
- return False
- def __dirSize(self, path):
- total_size = 0
- if self._options['dirSize']:
- for dirpath, dirnames, filenames in os.walk(path):
- for f in filenames:
- fp = os.path.join(dirpath, f)
- if os.path.exists(fp):
- total_size += os.stat(fp).st_size
- else:
- total_size = os.lstat(path).st_size
- return total_size
- def __fbuffer(self, f, chunk_size = _options['uploadWriteChunk']):
- while True:
- chunk = f.read(chunk_size)
- if not chunk: break
- yield chunk
- def __canCreateTmb(self, path = None):
- if self._options['imgLib'] and self._options['tmbDir']:
- if path is not None:
- mime = self.__mimetype(path)
- if not mime[0:5] == 'image':
- return False
- return True
- else:
- return False
- def __tmbPath(self, path):
- tmb = False
- if self._options['tmbDir']:
- if not os.path.dirname(path) == self._options['tmbDir']:
- tmb = os.path.join(self._options['tmbDir'], self.__hash(path) + '.png')
- return tmb
- def __isUploadAllow(self, name):
- allow = False
- deny = False
- mime = self.__mimetype(name)
- if 'all' in self._options['uploadAllow']:
- allow = True
- else:
- for a in self._options['uploadAllow']:
- if mime.find(a) == 0:
- allow = True
- if 'all' in self._options['uploadDeny']:
- deny = True
- else:
- for d in self._options['uploadDeny']:
- if mime.find(d) == 0:
- deny = True
- if self._options['uploadOrder'][0] == 'allow': # ,deny
- if deny is True:
- return False
- elif allow is True:
- return True
- else:
- return False
- else: # deny,allow
- if allow is True:
- return True
- elif deny is True:
- return False
- else:
- return True
- def __isAccepted(self, target):
- if target == '.' or target == '..':
- return False
- if target[0:1] == '.' and not self._options['dotFiles']:
- return False
- return True
- def __isAllowed(self, path, access):
- if not os.path.exists(path):
- return False
- if access == 'read':
- if not os.access(path, os.R_OK):
- self.__errorData(path, access)
- return False
- elif access == 'write':
- if not os.access(path, os.W_OK):
- self.__errorData(path, access)
- return False
- elif access == 'rm':
- if not os.access(os.path.dirname(path), os.W_OK):
- self.__errorData(path, access)
- return False
- else:
- return False
- path = path[len(os.path.normpath(self._options['root'])):]
- for ppath in self._options['perms']:
- regex = r'' + ppath
- if re.search(regex, path) and access in self._options['perms'][ppath]:
- return self._options['perms'][ppath][access]
- return self._options['defaults'][access]
- def __hash(self, path):
- """Hash of the path"""
- m = hashlib.md5()
- m.update(path)
- return str(m.hexdigest())
- def __path2url(self, path):
- curDir = path
- length = len(self._options['root'])
- url = str(self._options['URL'] + curDir[length:]).replace(os.sep, '/')
- try:
- import urllib
- url = urllib.quote(url, '/:~')
- except:
- pass
- return url
- def __errorData(self, path, msg):
- """Collect error/warning messages"""
- self._errorData[path] = msg
- def __initImgLib(self):
- if not self._options['imgLib'] is False and self._im is None:
- try:
- import Image
- Image
- self._im = Image
- self._options['imgLib'] = 'PIL'
- except:
- self._options['imgLib'] = False
- self._im = False
- self.__debug('imgLib', self._options['imgLib'])
- return self._options['imgLib']
- def __getImgSize(self, path):
- self.__initImgLib();
- if self.__canCreateTmb():
- try:
- im = self._im.open(path)
- return str(im.size[0]) + 'x' + str(im.size[1])
- except:
- pass
- return False
- def __debug(self, k, v):
- if self._options['debug']:
- self._response['debug'].update({k: v})
- return
- def __checkArchivers(self):
- # import subprocess
- # sp = subprocess.Popen(['tar', '--version'], shell = False,
- # stdout = subprocess.PIPE, stderr=subprocess.PIPE)
- # out, err = sp.communicate()
- # print 'out:', out, '\nerr:', err, '\n'
- archive = { 'create': {}, 'extract': {} }
- c = archive['create']
- e = archive['extract']
- tar = self.__runSubProcess(['tar', '--version'])
- gzip = self.__runSubProcess(['gzip', '--version'])
- bzip2 = self.__runSubProcess(['bzip2', '--version'])
- zipc = self.__runSubProcess(['zip', '--version'])
- unzip = self.__runSubProcess(['unzip', '--help'])
- rar = self.__runSubProcess(['rar', '--version'], validReturn = [0, 7])
- unrar = self.__runSubProcess(['unrar'], validReturn = [0, 7])
- p7z = self.__runSubProcess(['7z', '--help'])
- p7za = self.__runSubProcess(['7za', '--help'])
- p7zr = self.__runSubProcess(['7zr', '--help'])
- # tar = False
- # tar = gzip = bzip2 = zipc = unzip = rar = unrar = False
- # print tar, gzip, bzip2, zipc, unzip, rar, unrar, p7z, p7za, p7zr
- if tar:
- mime = 'application/x-tar'
- c.update({mime: {'cmd': 'tar', 'argc': '-cf', 'ext': 'tar'}})
- e.update({mime: {'cmd': 'tar', 'argc': '-xf', 'ext': 'tar'}})
- if tar and gzip:
- mime = 'application/x-gzip'
- c.update({mime: {'cmd': 'tar', 'argc': '-czf', 'ext': 'tar.gz'}})
- e.update({mime: {'cmd': 'tar', 'argc': '-xzf', 'ext': 'tar.gz'}})
- if tar and bzip2:
- mime = 'application/x-bzip2'
- c.update({mime: {'cmd': 'tar', 'argc': '-cjf', 'ext': 'tar.bz2'}})
- e.update({mime: {'cmd': 'tar', 'argc': '-xjf', 'ext': 'tar.bz2'}})
- mime = 'application/zip'
- if zipc:
- c.update({mime: {'cmd': 'zip', 'argc': '-r9', 'ext': 'zip'}})
- if unzip:
- e.update({mime: {'cmd': 'unzip', 'argc': '', 'ext': 'zip'}})
- mime = 'application/x-rar'
- if rar:
- c.update({mime: {'cmd': 'rar', 'argc': 'a -inul', 'ext': 'rar'}})
- e.update({mime: {'cmd': 'rar', 'argc': 'x -y', 'ext': 'rar'}})
- elif unrar:
- e.update({mime: {'cmd': 'unrar', 'argc': 'x -y', 'ext': 'rar'}})
- p7zip = None
- if p7z:
- p7zip = '7z'
- elif p7za:
- p7zip = '7za'
- elif p7zr:
- p7zip = '7zr'
- if p7zip:
- mime = 'application/x-7z-compressed'
- c.update({mime: {'cmd': p7zip, 'argc': 'a -t7z', 'ext': '7z'}})
- e.update({mime: {'cmd': p7zip, 'argc': 'e -y', 'ext': '7z'}})
- mime = 'application/x-tar'
- if not mime in c:
- c.update({mime: {'cmd': p7zip, 'argc': 'a -ttar', 'ext': 'tar'}})
- if not mime in e:
- e.update({mime: {'cmd': p7zip, 'argc': 'e -y', 'ext': 'tar'}})
- mime = 'application/x-gzip'
- if not mime in c:
- c.update({mime: {'cmd': p7zip, 'argc': 'a -tgzip', 'ext': 'gz'}})
- if not mime in e:
- e.update({mime: {'cmd': p7zip, 'argc': 'e -y', 'ext': 'tar.gz'}})
- mime = 'application/x-bzip2'
- if not mime in c:
- c.update({mime: {'cmd': p7zip, 'argc': 'a -tbzip2', 'ext': 'bz2'}})
- if not mime in e:
- e.update({mime: {'cmd': p7zip, 'argc': 'e -y', 'ext': 'tar.bz2'}})
- mime = 'application/zip'
- if not mime in c:
- c.update({mime: {'cmd': p7zip, 'argc': 'a -tzip', 'ext': 'zip'}})
- if not mime in e:
- e.update({mime: {'cmd': p7zip, 'argc': 'e -y', 'ext': 'zip'}})
- if not self._options['archiveMimes']:
- self._options['archiveMimes'] = c.keys()
- else:
- pass
- self._options['archivers'] = archive
- pass
- def __runSubProcess(self, cmd, validReturn = [0]):
- if self._sp is None:
- import subprocess
- self._sp = subprocess
- try:
- sp = self._sp.Popen(cmd, shell = False, stdout = self._sp.PIPE, stderr = self._sp.PIPE, stdin = self._sp.PIPE)
- out, err = sp.communicate('')
- ret = sp.returncode
- # print cmd, ret, out, err
- except:
- return False
- if not ret in validReturn:
- return False
- return True
- def __checkUtf8(self, name):
- try:
- name.decode('utf-8')
- except UnicodeDecodeError:
- name = unicode(name, 'utf-8', 'replace')
- self.__debug('invalid encoding', name)
- #name += ' (invalid encoding)'
- return name