create distinct PE Helper implementation classes
parent
f93393f838
commit
f27026a990
@ -0,0 +1,156 @@
|
|||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import stat
|
||||||
|
import subprocess
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
from .exceptions import PEHelperException, ExternalPEHelperException
|
||||||
|
from .validation import validate_dataset_path, validate_pool_name, validate_property_value
|
||||||
|
|
||||||
|
|
||||||
|
class PEHelperBase:
|
||||||
|
'''
|
||||||
|
Base class for Privilege Escalation (PE) helper implementations.
|
||||||
|
'''
|
||||||
|
def __init__(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return '<PEHelperBase>'
|
||||||
|
|
||||||
|
def zfs_mount(self, fileset: str, mountpoint: Optional[str] = None) -> None:
|
||||||
|
'''
|
||||||
|
Tries to mount ``fileset``. An optional ``mountpoint`` can be given, otherwise it relies on inheritance to set
|
||||||
|
the mountpoint. Note that some implementations may need an explicit mountpoint.
|
||||||
|
|
||||||
|
:raises ValidationError: If parameters do not validate.
|
||||||
|
:raises PEHelperException: If errors are encountered when running the helper.
|
||||||
|
'''
|
||||||
|
raise NotImplementedError(f'{self} has not implemented this function')
|
||||||
|
|
||||||
|
def zfs_set_mountpoint(self, fileset: str, mountpoint: str) -> None:
|
||||||
|
'''
|
||||||
|
Sets the ``mountpoint`` property of the given ``fileset``.
|
||||||
|
|
||||||
|
:raises ValidationError: If parameters do not validate.
|
||||||
|
:raises PEHelperException: If errors are encountered when running the helper.
|
||||||
|
'''
|
||||||
|
raise NotImplementedError(f'{self} has not implemented this function')
|
||||||
|
|
||||||
|
|
||||||
|
class ExternalPEHelper(PEHelperBase):
|
||||||
|
'''
|
||||||
|
Implementation using an external script to safeguard the operations.
|
||||||
|
'''
|
||||||
|
def __init__(self, executable: str) -> None:
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
self.log = logging.getLogger('simplezfs.pe_helper.external')
|
||||||
|
self.executable = executable
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return f'<ExternalPEHelper(executable={self.executable})>'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def executable(self) -> str:
|
||||||
|
return self.__exe
|
||||||
|
|
||||||
|
@executable.setter
|
||||||
|
def executable(self, new_exe: str) -> None:
|
||||||
|
candidate = new_exe.strip()
|
||||||
|
|
||||||
|
mode = os.lstat(candidate).st_mode
|
||||||
|
if not stat.S_ISREG(mode):
|
||||||
|
raise FileNotFoundError('PE helper must be a file')
|
||||||
|
if not os.access(candidate, os.X_OK):
|
||||||
|
raise FileNotFoundError('PE helper must be executable')
|
||||||
|
self.log.debug(f'Setting privilege escalation helper to "{candidate}"')
|
||||||
|
self.__exe = candidate
|
||||||
|
|
||||||
|
def _execute_cmd(self, cmd: List[str]) -> None:
|
||||||
|
|
||||||
|
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
|
||||||
|
if proc.returncode != 0 or len(proc.stderr) > 0:
|
||||||
|
|
||||||
|
if proc.returncode == 1:
|
||||||
|
self.log.error('General error in PE executable: Wrong parameters or configuration problem')
|
||||||
|
msg = 'General error'
|
||||||
|
elif proc.returncode == 2:
|
||||||
|
msg = 'Parent directory does not exist or is not a directory'
|
||||||
|
self.log.error(msg)
|
||||||
|
elif proc.returncode == 3:
|
||||||
|
msg = 'Parent dataset does not exist'
|
||||||
|
self.log.error(msg)
|
||||||
|
elif proc.returncode == 4:
|
||||||
|
msg = 'Target fileset is not a (grand)child of parent or parent does not exist'
|
||||||
|
self.log.error(msg)
|
||||||
|
elif proc.returncode == 5:
|
||||||
|
msg = 'Mountpoint is not inside the parent directory or otherwise invalid'
|
||||||
|
self.log.error(msg)
|
||||||
|
elif proc.returncode == 6:
|
||||||
|
msg = 'Calling the zfs utility failed'
|
||||||
|
self.log.error(msg)
|
||||||
|
else:
|
||||||
|
msg = f'Unknown / Unhandled error with returncode {proc.returncode}'
|
||||||
|
self.log.error(msg)
|
||||||
|
|
||||||
|
raise ExternalPEHelperException(msg, proc.returncode, proc.stdout, proc.stderr)
|
||||||
|
else:
|
||||||
|
self.log.info('PE Helper successful')
|
||||||
|
self.log.debug(f'Return code: {proc.returncode}')
|
||||||
|
self.log.debug(f'Stdout: {proc.stdout}')
|
||||||
|
|
||||||
|
def zfs_set_mountpoint(self, fileset: str, mountpoint: str) -> None:
|
||||||
|
cmd = [self.__exe, 'set_mountpoint', fileset, mountpoint]
|
||||||
|
self._execute_cmd(cmd)
|
||||||
|
|
||||||
|
|
||||||
|
class SudoPEHelper(PEHelperBase):
|
||||||
|
'''
|
||||||
|
Implementation using ``sudo(8)``.
|
||||||
|
'''
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
self.log = logging.getLogger('simplezfs.pe_helper.sudo')
|
||||||
|
|
||||||
|
self._find_executable()
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return f'<SudoPEHelper(executable={self.__exe})>'
|
||||||
|
|
||||||
|
def _find_executable(self) -> None:
|
||||||
|
'''
|
||||||
|
Tries to find an executable named ``sudo``.
|
||||||
|
|
||||||
|
:raises FileNotFoundError: if no executable can be found.
|
||||||
|
'''
|
||||||
|
name = 'sudo'
|
||||||
|
|
||||||
|
candidate = shutil.which(cmd=name)
|
||||||
|
if not candidate:
|
||||||
|
raise FileNotFoundError('Could not find sudo executable')
|
||||||
|
self.__exe = candidate
|
||||||
|
|
||||||
|
def _execute_cmd(self, cmd: List[str]) -> None:
|
||||||
|
'''
|
||||||
|
Executes the given command through sudo. The call to sudo must not be included in ``cmd``.
|
||||||
|
'''
|
||||||
|
args = [self.__exe] + cmd
|
||||||
|
if len(cmd) < 4:
|
||||||
|
raise PEHelperException('Command suspicously short')
|
||||||
|
|
||||||
|
proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
|
||||||
|
if proc.returncode != 0 or len(proc.stderr) > 0:
|
||||||
|
raise PEHelperException(f'Error running command: {proc.stderr}')
|
||||||
|
|
||||||
|
def zfs_set_mountpoint(self, fileset: str, mountpoint: str) -> None:
|
||||||
|
if '/' in fileset:
|
||||||
|
validate_dataset_path(fileset)
|
||||||
|
else:
|
||||||
|
validate_pool_name(fileset)
|
||||||
|
# TODO validate mountpoint fs
|
||||||
|
|
||||||
|
self._execute_cmd(['zfs', 'set', f'mountpoint={mountpoint}', fileset])
|
Loading…
Reference in New Issue