| ## @file | |
| # This file hooks file and directory creation and removal | |
| # | |
| # Copyright (c) 2014 - 2018, Intel Corporation. All rights reserved.<BR> | |
| # | |
| # SPDX-License-Identifier: BSD-2-Clause-Patent | |
| # | |
| ''' | |
| File hook | |
| ''' | |
| import os | |
| import stat | |
| import time | |
| import zipfile | |
| from time import sleep | |
| from Library import GlobalData | |
| __built_in_remove__ = os.remove | |
| __built_in_mkdir__ = os.mkdir | |
| __built_in_rmdir__ = os.rmdir | |
| __built_in_chmod__ = os.chmod | |
| __built_in_open__ = open | |
| _RMFILE = 0 | |
| _MKFILE = 1 | |
| _RMDIR = 2 | |
| _MKDIR = 3 | |
| _CHMOD = 4 | |
| gBACKUPFILE = 'file.backup' | |
| gEXCEPTION_LIST = ['Conf'+os.sep+'DistributionPackageDatabase.db', '.tmp', gBACKUPFILE] | |
| class _PathInfo: | |
| def __init__(self, action, path, mode=-1): | |
| self.action = action | |
| self.path = path | |
| self.mode = mode | |
| class RecoverMgr: | |
| def __init__(self, workspace): | |
| self.rlist = [] | |
| self.zip = None | |
| self.workspace = os.path.normpath(workspace) | |
| self.backupfile = gBACKUPFILE | |
| self.zipfile = os.path.join(self.workspace, gBACKUPFILE) | |
| def _createzip(self): | |
| if self.zip: | |
| return | |
| self.zip = zipfile.ZipFile(self.zipfile, 'w', zipfile.ZIP_DEFLATED) | |
| def _save(self, tmp, path): | |
| if not self._tryhook(path): | |
| return | |
| self.rlist.append(_PathInfo(tmp, path)) | |
| def bkrmfile(self, path): | |
| arc = self._tryhook(path) | |
| if arc and os.path.isfile(path): | |
| self._createzip() | |
| self.zip.write(path, arc.encode('utf_8')) | |
| sta = os.stat(path) | |
| oldmode = stat.S_IMODE(sta.st_mode) | |
| self.rlist.append(_PathInfo(_CHMOD, path, oldmode)) | |
| self.rlist.append(_PathInfo(_RMFILE, path)) | |
| __built_in_remove__(path) | |
| def bkmkfile(self, path, mode, bufsize): | |
| if not os.path.exists(path): | |
| self._save(_MKFILE, path) | |
| return __built_in_open__(path, mode, bufsize) | |
| def bkrmdir(self, path): | |
| if os.path.exists(path): | |
| sta = os.stat(path) | |
| oldmode = stat.S_IMODE(sta.st_mode) | |
| self.rlist.append(_PathInfo(_CHMOD, path, oldmode)) | |
| self._save(_RMDIR, path) | |
| __built_in_rmdir__(path) | |
| def bkmkdir(self, path, mode): | |
| if not os.path.exists(path): | |
| self._save(_MKDIR, path) | |
| __built_in_mkdir__(path, mode) | |
| def bkchmod(self, path, mode): | |
| if self._tryhook(path) and os.path.exists(path): | |
| sta = os.stat(path) | |
| oldmode = stat.S_IMODE(sta.st_mode) | |
| self.rlist.append(_PathInfo(_CHMOD, path, oldmode)) | |
| __built_in_chmod__(path, mode) | |
| def rollback(self): | |
| if self.zip: | |
| self.zip.close() | |
| self.zip = None | |
| index = len(self.rlist) - 1 | |
| while index >= 0: | |
| item = self.rlist[index] | |
| exist = os.path.exists(item.path) | |
| if item.action == _MKFILE and exist: | |
| #if not os.access(item.path, os.W_OK): | |
| # os.chmod(item.path, S_IWUSR) | |
| __built_in_remove__(item.path) | |
| elif item.action == _RMFILE and not exist: | |
| if not self.zip: | |
| self.zip = zipfile.ZipFile(self.zipfile, 'r', zipfile.ZIP_DEFLATED) | |
| arcname = os.path.normpath(item.path) | |
| arcname = arcname[len(self.workspace)+1:].encode('utf_8') | |
| if os.sep != "/" and os.sep in arcname: | |
| arcname = arcname.replace(os.sep, '/') | |
| mtime = self.zip.getinfo(arcname).date_time | |
| content = self.zip.read(arcname) | |
| filep = __built_in_open__(item.path, "wb") | |
| filep.write(content) | |
| filep.close() | |
| intime = time.mktime(mtime + (0, 0, 0)) | |
| os.utime(item.path, (intime, intime)) | |
| elif item.action == _MKDIR and exist: | |
| while True: | |
| try: | |
| __built_in_rmdir__(item.path) | |
| break | |
| except IOError: | |
| # Sleep a short time and try again | |
| # The anti-virus software may delay the file removal in this directory | |
| sleep(0.1) | |
| elif item.action == _RMDIR and not exist: | |
| __built_in_mkdir__(item.path) | |
| elif item.action == _CHMOD and exist: | |
| try: | |
| __built_in_chmod__(item.path, item.mode) | |
| except EnvironmentError: | |
| pass | |
| index -= 1 | |
| self.commit() | |
| def commit(self): | |
| if self.zip: | |
| self.zip.close() | |
| __built_in_remove__(self.zipfile) | |
| # Check if path needs to be hooked | |
| def _tryhook(self, path): | |
| path = os.path.normpath(path) | |
| works = self.workspace if str(self.workspace).endswith(os.sep) else (self.workspace + os.sep) | |
| if not path.startswith(works): | |
| return '' | |
| for exceptdir in gEXCEPTION_LIST: | |
| full = os.path.join(self.workspace, exceptdir) | |
| if full == path or path.startswith(full + os.sep) or os.path.split(full)[0] == path: | |
| return '' | |
| return path[len(self.workspace)+1:] | |
| def _hookrm(path): | |
| if GlobalData.gRECOVERMGR: | |
| GlobalData.gRECOVERMGR.bkrmfile(path) | |
| else: | |
| __built_in_remove__(path) | |
| def _hookmkdir(path, mode=0o777): | |
| if GlobalData.gRECOVERMGR: | |
| GlobalData.gRECOVERMGR.bkmkdir(path, mode) | |
| else: | |
| __built_in_mkdir__(path, mode) | |
| def _hookrmdir(path): | |
| if GlobalData.gRECOVERMGR: | |
| GlobalData.gRECOVERMGR.bkrmdir(path) | |
| else: | |
| __built_in_rmdir__(path) | |
| def _hookmkfile(path, mode='r', bufsize=-1): | |
| if GlobalData.gRECOVERMGR: | |
| return GlobalData.gRECOVERMGR.bkmkfile(path, mode, bufsize) | |
| return __built_in_open__(path, mode, bufsize) | |
| def _hookchmod(path, mode): | |
| if GlobalData.gRECOVERMGR: | |
| GlobalData.gRECOVERMGR.bkchmod(path, mode) | |
| else: | |
| __built_in_chmod__(path, mode) | |
| def SetRecoverMgr(mgr): | |
| GlobalData.gRECOVERMGR = mgr | |
| os.remove = _hookrm | |
| os.mkdir = _hookmkdir | |
| os.rmdir = _hookrmdir | |
| os.chmod = _hookchmod | |
| __FileHookOpen__ = _hookmkfile |