Source code for pacifica.archiveinterface.backends.posix.archive

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Posix Backend Archive Module.

Module that implements the abstract_backend_archive class for a posix
backend.
"""
import os
import stat
import shutil
from ...archive_utils import un_abs_path, bytes_type
from ...config import get_config
from ...id2filename import id2filename
from ...exception import ArchiveInterfaceError
from .extendedfile import extended_file_factory
from ..abstract.archive import AbstractBackendArchive


[docs]class PosixBackendArchive(AbstractBackendArchive): """Posix Backend Archive Class. Class that implements the abstract base class for the posix archive interface backend. """
[docs] def __init__(self, prefix): """Constructor for Posix Backend Archive.""" super(PosixBackendArchive, self).__init__(prefix) self._prefix = prefix self._file = None self._filepath = None self._id2filename = lambda x: x if get_config().getboolean("posix", "use_id2filename"): self._id2filename = lambda x: id2filename(int(x))
[docs] def open(self, filepath, mode): """Open a posix file.""" # want to close any open files first try: self.close() except ArchiveInterfaceError as ex: err_str = ( "Can't close previous posix file before opening new " "one with error: " + str(ex) ) raise ArchiveInterfaceError(err_str) try: fpath = un_abs_path(self._id2filename(filepath)) filename = os.path.join(self._prefix, fpath) dirname = os.path.dirname(filename) if not os.path.isdir(dirname): os.makedirs(dirname, 0o755) self._filepath = filename self._file = extended_file_factory(self._filepath, mode) return self except Exception as ex: err_str = "Can't open posix file with error: " + str(ex) raise ArchiveInterfaceError(err_str)
[docs] def close(self): """Close a posix file.""" try: if self._file: self._file.close() self._file = None except Exception as ex: err_str = "Can't close posix file with error: " + str(ex) raise ArchiveInterfaceError(err_str)
[docs] def read(self, blocksize): """Read a posix file.""" try: if self._file: return self._file.read(blocksize) except Exception as ex: err_str = "Can't read posix file with error: " + str(ex) raise ArchiveInterfaceError(err_str) err_str = "Internal file handle invalid" raise ArchiveInterfaceError(err_str)
[docs] def seek(self, offset): """Seek in the file to the offset.""" try: if self._file: return self._file.seek(offset) except Exception as ex: err_str = "Can't seek posix file with error: " + str(ex) raise ArchiveInterfaceError(err_str) err_str = "Internal file handle invalid" raise ArchiveInterfaceError(err_str)
[docs] def write(self, buf): """Write a posix file to the archive.""" try: if self._file: # pylint: disable=too-many-function-args return self._file.write(bytes_type(buf)) # pylint: enable=too-many-function-args except Exception as ex: err_str = "Can't write posix file with error: " + str(ex) raise ArchiveInterfaceError(err_str) err_str = "Internal file handle invalid" raise ArchiveInterfaceError(err_str)
[docs] def set_mod_time(self, mod_time): """Set the mod time on a posix file.""" try: if self._filepath: os.utime(self._filepath, (mod_time, mod_time)) except Exception as ex: err_str = "Can't set posix file mod time with error: " + str(ex) raise ArchiveInterfaceError(err_str)
[docs] def set_file_permissions(self): """Set the file permissions for a posix file.""" try: if self._filepath: os.chmod(self._filepath, 0o444) except Exception as ex: err_str = "Can't set posix file permissions with error: " + str(ex) raise ArchiveInterfaceError(err_str)
[docs] def stage(self): """Stage a posix file (no-opt essentially).""" try: if self._file: return self._file.stage() except Exception as ex: err_str = "Can't stage posix file with error: " + str(ex) raise ArchiveInterfaceError(err_str) err_str = "Internal file handle invalid" raise ArchiveInterfaceError(err_str)
[docs] def status(self): """Get the status of a posix file.""" try: if self._file: return self._file.status() except Exception as ex: err_str = "Can't get posix file status with error: " + str(ex) raise ArchiveInterfaceError(err_str) err_str = "Internal file handle invalid" raise ArchiveInterfaceError(err_str)
[docs] def patch(self, file_id, old_path): """Move a posix file.""" try: fpath = un_abs_path(self._id2filename(file_id)) new_filepath = os.path.join(self._prefix, fpath) new_directories = os.path.dirname(new_filepath) if not os.path.exists(new_directories): os.makedirs(new_directories) shutil.move(old_path, new_filepath) except Exception as ex: err_str = "Can't move posix file with error: " + str(ex) raise ArchiveInterfaceError(err_str)
[docs] def remove(self): """Remove the file permissions for a posix file.""" try: if self._filepath: os.chmod(self._filepath, stat.S_IWRITE) os.unlink(self._filepath) self._filepath = None except Exception as ex: err_str = "Can't remove posix file with error: {}".format(ex) raise ArchiveInterfaceError(err_str)