main
svalouch 5 years ago
commit 27de066f27

147
.gitignore vendored

@ -0,0 +1,147 @@
### Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
### VIM
# Swap
[._]*.s[a-v][a-z]
[._]*.sw[a-p]
[._]s[a-rt-v][a-z]
[._]ss[a-gi-z]
[._]sw[a-p]
# Session
Session.vim
Sessionx.vim
# Temporary
.netrwhist
*~
# Auto-generated tag files
tags
# Persistent undo
[._]*.un~

@ -0,0 +1,31 @@
BSD 3-Clause License
Copyright (c) 2019, Andreas Gonschorek (agonschorek) <simplezfs@andreasgonschorek.de>
Copyright (c) 2019, Stefan Valouch (svalouch) <svalouch@valouch.com>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,50 @@
################
Python-SimpleZFS
################
A thin wrapper around ZFS from the `ZFS on Linux <https://zfsonlinux.org/>`_ project.
The library aims at providing a simple, low-level interface for working with ZFS, either by wrapping the ``zfs(8)`` and ``zpool(8)`` CLI utilities or by accessing the native python API.
It does not provide a high-level interface, however, and does not aim to. It also tries to keep as little state as possible.
Two interface classes make up the API, ``ZFS`` and ``ZPool``, which are wrappers around the functionality of the CLI tools of the same name. They come with two implementations:
* The CLI implementation wraps the executables
* The Native implementation uses the native API released with ZoL 0.8.
In this early stage, the native implementation has not been written.
Usage
*****
One can either get a concrete implementation by calling ``ZFSCli``/``ZFSNative`` or ``ZPoolCli``/``ZPoolNative``, or more conveniently use the functions ``get_zfs(implementation_name)`` or ``get_zpool(implementation_name)``.
First, get an instance:
.. code-block:: python-shell
>>> from simplezfs import get_zfs
>>> zfs = get_zfs('cli') # or "native" for the native API
>>> zfs
<simplezfs.zfs_cli.ZFSCli object at 0x7ffbca7fb9e8>
>>>
>>> for ds in zfs.list_datasets():
... print(ds.name)
...
tank
tank/system
tank/system/rootfs
Compatibility
*************
The library is written with `Python` 3.6 or higher in mind, which was in a stable release in a few of the major Linux distributions we care about (Debian Buster, Ubuntu 18.04 LTS, RHEL 8, Gentoo).
On the ZoL_ side, the code is developed mostly on version ``0.8``, and takes some validation values from that release. The library doesn't make a lot of assumptions, the code should work on ``0.7``, too. If you spot an incompatibility, please let us know via the issue tracker.
Testing
*******
An extensive set of tests are in the ``tests/`` subfolder, it can be run using ``pytest`` from the source of the repository. At this time, only the validation functions and the ZFS Cli API are tested, the tests are non-destructive and won't run the actual commands but instead mock away the ``subprocess`` invocations and supply dummy commands to run (usually ``/bin/true``) should the code be changed in a way that isn't caught by the test framework. Nevertheless, keep in mind that if commands are run for whatever reason, they most likely result in unrecoverable data loss.
It is planned to add a separate set of `destructive` tests that need to be specially activated for testing if the code works when run against an actual Linux system. This can't be done using most of the CI providers, as the nature of ZFS requires having a operating system with loaded modules that may be destroyed during the test run.
.. _ZoL: https://zfsonlinux.org/

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

@ -0,0 +1,90 @@
###
API
###
Enumerations
************
.. autoclass:: simplezfs.types.DatasetType
:members:
.. autoclass:: simplezfs.types.PropertySource
:members:
.. autoclass:: simplezfs.types.ZPoolHealth
:members:
Types
*****
.. autoclass:: simplezfs.types.Dataset
:members:
.. autoclass:: simplezfs.types.Property
:members:
Interfaces
**********
ZFS
===
.. autofunction:: simplezfs.zfs.get_zfs
.. autoclass:: simplezfs.ZFS
:members:
ZPool
=====
.. autofunction:: simplezfs.zpool.get_zpool
.. autoclass:: simplezfs.ZPool
:members:
Implementations
***************
.. autoclass:: simplezfs.zfs_cli.ZFSCli
:members:
.. autoclass:: simplezfs.zfs_native.ZFSNative
:members:
.. autoclass:: simplezfs.zpool_cli.ZPoolCli
:members:
.. autoclass:: simplezfs.zpool_native.ZPoolNative
:members:
Validation functions
********************
A set of validation functions exist to validate names and other data. All of them raise a
:class:`simplezfs.exceptions.ValidationError` as a result of a failed validation and return nothing if everything is okay.
.. autofunction:: simplezfs.validation.validate_dataset_name
.. autofunction:: simplezfs.validation.validate_dataset_path
.. autofunction:: simplezfs.validation.validate_native_property_name
.. autofunction:: simplezfs.validation.validate_metadata_property_name
.. autofunction:: simplezfs.validation.validate_pool_name
.. autofunction:: simplezfs.validation.validate_property_value
Exceptions
**********
.. autoexception:: simplezfs.exceptions.ZFSException
.. autoexception:: simplezfs.exceptions.DatasetNotFound
.. autoexception:: simplezfs.exceptions.PermissionError
.. autoexception:: simplezfs.exceptions.PoolNotFound
.. autoexception:: simplezfs.exceptions.PropertyNotFound
.. autoexception:: simplezfs.exceptions.ValidationError

@ -0,0 +1,57 @@
# Configuration file for the Sphinx documentation builder.
#
# This file only contains a selection of the most common options. For a full
# list see the documentation:
# http://www.sphinx-doc.org/en/master/config
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
# import os
# import sys
# sys.path.insert(0, os.path.abspath('.'))
# -- Project information -----------------------------------------------------
project = 'Python-SimpleZFS'
copyright = '2019, Andreas Gonschorek, Stefan Valouch'
author = 'Andreas Gonschorek, Stefan Valouch'
# The full version, including alpha/beta/rc tags
release = '0.0.1'
# -- General configuration ---------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx_autodoc_typehints',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']

@ -0,0 +1,310 @@
###################
Guide to python-zfs
###################
Overview
********
The interfaces are similar to calling the zfs toolset on the command line. That is, there are not state holding classes
representing filesets or pools, each call must include the entire dataset path or pool name. There is also no way to
collect actions and run them at the end, each action is carried out immediately.
There are, however, two implementations of the functionality, using the ``cli`` tools and another one using the
`libzfs_core` ``native`` library. We'll focus on the ``cli`` version here.
Most of the functions raise a :class:`:: simplezfs.simplezfs.exceptions.ValidationError` with some helpful text if any of the data turns
out not to be valid. For example, including invalid or reserved strings in the dataset name raises this exception.
Let's take a look at the two interfaces **ZFS** and **ZPool**...
.. warning:: All of the commands here attempt to modify something in the pool or dataset given as parameters. If run
with high enough permission (usually ``root``, but there's ``zfs allow`` that can delegate to lower-
privileged users, too) these commands can and will **delete** data! Always run these against pools, disks
and datasets that bear no important data! You have been warned.
The ZFS interface
*****************
The :class:`:: simplezfs.simplezfs.zfs.ZFS` is the interface that corresponds to the ``zfs(8)`` program. It holds very little state,
and it is recommended to get an instance through the function :func:`~zfs.zfs.get_zfs`. It selects the desired
implementation and passes the required parameters. At the very least, it requires the ``api``-parameter, which is a
string that selects the actual implementation, either ``cli`` or ``native``.
All our examples use the ``cli`` implementation for simplicity.
.. code-block:: pycon
>>> from zfs import get_zfs
>>> zfs = get_zfs('cli')
>>> zfs
<zfs.zfs_cli.ZFSCli object at 0x7f9f00faa9b0>
For the remainder of this guide, we're going to assume that the variable ``zfs`` always holds a
:class:`:: simplezfs.simplezfs.zfs_cli.ZFSCli` object.
Viewing data
============
To get an overview over the interface, we'll dive in and inspect our running system. Information is returned in the
form if :class:`:: simplezfs.simplezfs.types.Dataset` instances, which is a named tuple containing a set of fields. For simplicity,
we'll output only a few of its fields to not clobber the screen, so don't be alarmed if there seems to be information
missing: we just omitted the boring parts.
Listing datasets
----------------
By default when listing datasets, all of them are returned regardless of their type. That means that it includes
* volumes
* filesets
* snapshots
* bookmars
.. code-block:: pycon
>>> zfs.list_datasets()
<Dataset pool/system/root>
<Dataset pool/system/root@pre-distupgrade>
<Dataset tank/vol>
<Dataset tank/vol@test>
This is often unneccessary, and it allows to limit both by ``type`` and by including only datasets that are children
of another, and both at the same time:
.. code-block:: pycon
>>> zfs.list_datasets(type=DatasetType.SNAPSHOT)
<Dataset pool/root@pre-distupgrade>
<Dataset tank/vol@test>
>>> zfs.list_datasets(parent='pool/system')
<Dataset pool/root>
<Dataset pool/root@pre-distupgrade>
>>> zfs.list_datasets(parent='pool/system', type=DatasetType.SNAPSHOT)
<Dataset pool/root@pre-distupgrade>
Creating something new
======================
There are functions for creating the four different types of datasets with nice interfaces:
* :func:`~zfs.ZFS.create_fileset` for ordinary filesets, the most commonly used parameter is ``mountpoint`` for
telling it where it should be mounted.
* :func:`~zfs.ZFS.create_volume` creates volumes, or ZVols, this features a parameter ``thin`` for creating thin-
provisioned or sparse volumes.
* :func:`~zfs.ZFS.create_snapshot` creates a snapshot on a volume or fileset.
* :func:`~zfs.ZFS.create_bookmark` creates a bookmark (on recent versions of ZFS).
These essentially call :func:`~zfs.ZFS.create_dataset`, which can be called directly, but its interface is not as
nice as the special purpose create functions.
Filesets
--------
Creating a fileset requires the dataset path, like this:
.. code-block:: pycon
>>> zfs.create_fileset('pool/test', mountpoint='/tmp/test')
<Dataset pool/test>
:todo: add create_dataset
Volumes
-------
Volumes are created similar to filesets, this example creates a thin-provisioned sparse volume:
.. code-block:: pycon
>>> zfs.create_volume('pool/vol', thin=True)
<Dataset pool/vol>
:todo: add create_dataset
Snapshots
---------
Snapshots are, like bookmarks, created on an existing fileset or volume, hence the first parameter to the function is
the dataset that is our base, and the second parameter is the name of the snapshot.
.. code-block:: pycon
>>> zfs.create_snapshot('pool/test', 'pre-distupgrade')
<Dataset pool/test@pre-distupgrade>
Bookmarks
---------
Like snapshots above, bookmarks are created on an existing fileset or volume.
.. code-block:: pycon
>>> zfs.create_bookmark('pool/test', 'book-20190723')
<Dataset pool/test#book-20190723>
Destroying things
=================
After creating some datasets of various kinds and playing around with some of their properties, it's time to clean up.
We'll use the ``destroy_*`` family of methods.
.. warning:: Bear in mind that things happening here are final and cannot be undone. When playing around, always make
sure not to run this on pools containing important data!
Filesets
--------
Volumes
-------
Snapshots
---------
Bookmarks
---------
Properties
==========
Properties are one of the many cool and useful features of ZFS. They control its behaviour (like ``compression``) or
return information about the internal states (like ``creation`` time).
.. note:: The python library does not validate the names of native properties, as these are subject to change with the
ZFS version and it would mean that the library needs an update every time a new ZFS version changes some of
these. Thus, it relies on validating the input for syntax based on the ZFS documentation of the ZFS on Linux
(ZoL) project and ZFS telling it that it did not like a name.
A word on metadata/user properties
----------------------------------
The API allows to get and set properties, for both ``native`` properties (the ones defined by ZFS, exposing information
or altering how it works) and ``user`` properties that we call **metadata properties** in the API.
When working with metadata properties, you need to supply a ``namespace`` to distinguish it from a native property.
This works by separating the namespace and the property name using a ``:`` character, so a property ``myprop``
in the namespace ``com.company.department`` becomes ``com.company.department:myprop`` in the ZFS property system. This
is done automatically for you if you supply a ``metadata_namespace`` when creating the ZFS instance and can be
overwritten when working with the get and set functions. It is also possible not to define the namespace and passing
it to the functions every time.
When you want to get or set a metadata property, set ``metadata`` to **True** when calling
:func:`~zfs.ZFS.get_property` or :func:`~zfs.ZFS.set_property`. This will cause it to automatically prepend the
namespace given on instantiation or to prepend the one given in the ``overwrite_metadata_namespace`` when calling the
functions. The name of the property **must not** include the namespace, though it may contain ``:`` characters on its
own, properties of the form ``zfs:is:cool`` are valid afterall. ``:`` characters are never valid in the context of
native properties, and this is the reason why there is a separate switch to turn on metadata properties when using
these functions.
Error handling
--------------
If a property name is not valid or the value exceeds certain bounds, a :class:`:: simplezfs.simplezfs.exceptions.ValidationError` is
raised. This includes specifying a namespace in the property name if ``metadata`` is **False**, or exceeding the
length allowed for a metadata property (8192 - 1 bytes).
Though not an error for the ``zfs(8)`` utility, getting a non-existing metadata property also raises the above
exception to indicate that the property does not exist.
Getting a property
------------------
Getting properties is fairly straight-forward, especially for native properties:
.. code-block:: pycon
>>> zfs.get_property('tank/system/root', 'mountpoint')
Property(key='mountpoint', value='/', source='local', namespace=None)
For **metadata** properties, one needs to enable their usage by setting ``metadata`` to True. With a globally saved
namespace, it looks like this:
.. code-block:: pycon
>>> zfs = get_zfs('cli', metadata_namespace='com.company')
>>> zfs.get_property('tank/system/root', 'do_backup', metdata=True)
Property(key='do_backup', value='true', source='local', namespace='com.company')
If you don't specify a namespace when calling :func:`~zfs.zfs.get_zfs` or if you want to use a different namespace for
one call, specify the desired namespace in ``overwrite_metadata_namespace`` like so:
.. code-block:: pycon
>>> zfs.get_property('tank/system/root', 'requires', metadata=True, overwrite_metadata_namespace='user')
Property(key='requires', value='coffee', source='local', namespace='user')
This is the equivalent of calling ``zfs get user:requires tank/system/root`` on the shell.
Asking it to get a native property that does not exist results in an error:
.. code-block:: pycon
>>> zfs.get_property('tank/system/root', 'notexisting', metadata=False)
zfs.exceptions.PropertyNotFound: invalid property on dataset tank/test
Setting a property
------------------
The interface for setting both native and metadata properties works exactly like the get interface shown earlier,
though it obviously needs a value to set. We won't go into ZFS delegation system (``zfs allow``) and assume the
following is run using **root** privileges.
.. code-block:: pycon
>>> zfs.set_property('tank/service/backup', 'mountpoint', ''/backup')
Setting a metadata property works like this (again, like above):
.. code-block:: pycon
>>> zfs.set_property('tank/system/root', 'requires', 'tea', metadata=True, overwrite_metadata_namespace='user')
Listing properties
------------------
:todo: ``zfs.get_properties``
The ZPool interface
*******************
The :class:`:: simplezfs.simplezfs.zfs.ZPool` is the interface that corresponds to the ``zpool(8)`` program. It holds very little state,
and it is recommended to get an instance through the function :func:`~simplezfs.zpool.get_zpool`. It selects the desired
implementation and passes the required parameters. At the very least, it requires the ``api``-parameter, which is a
string that selects the actual implementation, either ``cli`` or ``native``.
All our examples use the ``cli`` implementation for simplicity.
.. code-block:: pycon
>>> from simplezfs import get_zpool
>>> zpool = get_zpool('cli')
>>> zpool
<zfs.zpool_cli.ZPoolCli object at 0x7f67d5254940>
For the remainder of this guide, we're going to assume that the variable ``zpool`` always holds a
:class:`:: simplezfs.simplezfs.zpool_cli.ZPoolCli` object.
Error handling
**************
We kept the most important part for last: handling errors. The module defines its own hierarchy with
:class:`:: simplezfs.simplezfs.exceptions.ZFSError` as toplevel exception. Various specific exceptions are based on ot. When working
with :class:`:: simplezfs.simplezfs.zfs.ZFS`, the three most common ones are:
* :class:`:: simplezfs.simplezfs.exceptions.ValidationError` which indicates that a name (e.g. dataset name) was invalid.
* :class:`:: simplezfs.simplezfs.exceptions.DatasetNotFound` is, like FileNotFound in standard python, indicating that the dataset the
module was instructed to work on (e.g. get/set properties, destroy) was not present.
* :class:`:: simplezfs.simplezfs.exceptions.PermissionError` is raised when the current users permissions are not sufficient to perform
the requested operation. While some actions can be delegated using ``zfs allow``, linux, for example, doesn't allow
non-root users to mount filesystems, which means that a non-root user may create filesets with a valid mountpoint
property, but it won't be mounted.
Examples
========
.. code-block:: pycon
>>> zfs.list_dataset(parent=':pool/name/invalid')
zfs.exceptions.ValidationError: malformed name
.. code-block:: pycon
>>> zfs.list_datasets(parent='pool/not/existing')
zfs.exceptions.DatasetNotFound: Dataset "pool/not/existing" not found

@ -0,0 +1,31 @@
##############################
Python-SimpleZFS documentation
##############################
The module implements a simple and straight-forward (hopefully) API for interacting with ZFS. It consists of two main
classes :class:`~simplezfs.zfs.ZFS` and :class:`~simplezfs.zpool.ZPool` that can be thought of as wrappers around the
ZFS command line utilities ``zfs(8)`` and ``zpool(8)``. This module provides two implementations:
* The ``cli``-API wrapps the command line utilities
* And the ``native``-API uses ``libzfs_core``.
At the time of writing, the ``native``-API has not been implemented.
.. toctree::
:maxdepth: 2
:caption: Contents:
quickstart
security
guide
properties_metadata
testing
api
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

@ -0,0 +1,7 @@
#######################
Properties and Metadata
#######################
ZFS features a property system using ``zfs(8) get/set``, which is used to read information about datasets and change their behaviour. Additionally, it allows users to attach arbitrary properties on their own to datasets. ZFS calls these "`user properties`", distinguishing them from its own "`native properties`" by requirim a ``:`` character. This library calls the user properties **metadata** properties.

@ -0,0 +1,54 @@
##########
Quickstart
##########
Installation
************
From the repo
=============
You can pull the code from the repository at URL using pip:
.. code-block:: bash
pip install git+ssh://URL/python-simplezfs
Interfaces
**********
For both ``zfs(8)`` and ``zpool(8)`` there exist two interfaces:
* **cli** calls the ``zfs`` and ``zpool`` binaries in subprocesses
* **native** uses ``libzfs_core`` to achieve the same goals
There exist two functions :func:`~simplezfs.zfs.get_zfs` and :func:`~simplezfs.zpool.get_zpool` that take the implementation as
first argument and return the appropriate implementation. The main documentation about how it works are in the api
documentation for the interface classes :class:`~simplezfs.zfs.ZFS` and :class:`~simplezfs.zpool.ZPool`.
This guide focuses on the ``cli`` variants.
It is not strictly neccessary to get and pass around an instance every time, as the classes hold very little state.
They need to do some initialization however, such as finding the binaries (and thus hitting the local filesystem).
For the rest of the guide, we'll assume that the following has been run and we have a :class:`~simplezfs.zfs.ZFS` instance
in ``zfs``:
.. code-block:: pycon
>>> from simplezfs import get_zfs
>>> zfs = get_zfs('cli')
>>> zfs
<zfs.zfs_cli.ZFSCli object at 0x7f9f00faa9b0>
As well as a :class:`~simplezfs.zpool.ZPool` instance in ``zpool`` after running the following:
.. code-block:: pycon
>>> from simplezfs import get_zpool
>>> zpool = get_zpool('cli')
>>> zpool
<zfs.zpool_cli.ZPoolCli object at 0x7f67d5254940>
To be continued

@ -0,0 +1,2 @@
Sphinx>=2.0
sphinx-autodoc-typehints

@ -0,0 +1,36 @@
########
Security
########
The authors of this library tried their best to make sure that nothing bad happens, but they are only human. Thus:
.. warning:: Use this library at your own risk.
Running as root
***************
The library goes to great length to make sure everything passed to the operating system is safe. Due to the nature of
operating systems, many tasks can only be carried out with elevated privileges, commonly running with the privileges
of the user ``root``. This means that care must be taken when using it with elevated privileges!
Thankfully, ZFS allows to delegate permission to some extend, allowing a user or a group on the local system to carry
out administrative tasks. One exception is mounting, which is handled in the next paragraph.
It is suggested to take a look at the ``zfs(8)`` manpage, especially the part that covers ``zfs allow``.
.. _the_mount_problem:
The mount problem
*****************
On Linux, only root is allowed to manipulate the global namespace. This means that no amount of ``zfs allow`` will
allow any other user to mount a fileset. They can be created with the ``mountpoint`` property set, but can't be
mounted. One workaround is to specify ``legacy``, and usr ``/etc/fstab`` to mount it, the other is to install and use
a special privilege escalation helper.
There are two places where it is needed to have **root** privileges: Setting or changing the ``mountpoint`` property
of a fileset. For this, a helper program is provided as an example. It is intended to be edited by the administrator
before use.
Installed **setuid root**, it allows the caller to mount a fileset if it is below a hardcoded (by the administrator
of the system) dataset and targets a hardcoded subtree of the local filesystem hierarchy. The path to that helper has
to be passed to the create function for filesets.

@ -0,0 +1,194 @@
#######
Testing
#######
``python-zfs`` uses **pytest** for testing. The test code can be found in the ``tests``-subdirectory of the source
tree.
Preparation
===========
Usually, when running the test suite locally in your shell, it is advised to use a virtual environment. Which one
to use is up to the reader. The authors use the ``venv`` module. The requirements can be found in the file
``requirements_develop.txt`` or by installing the module with tests.
.. code-block:: shell-session
$ python3 -m venv venv
$ source venv/bin/activate
(venv) $ pip install -e .[tests]
Running the tests
=================
Then run the tests using pytest: ``pytest -v --cov``. The test suite will be expanded in the future.
Creating local pools and datasets
=================================
Pool with most vdev types
-------------------------
To have a pool with most of the supported vdev types, create a pool from a set of files. The following sequence
creates a set of 64MB files (the minimum size ZFS accepts). The new pool is not really usable for storing data,
mostly because it is based on sparse files and ZFS does not like that. But it should be usable for most of the actions
that one wants to perform on it for testing purposes.
.. code-block:: shell
mkdir /tmp/z
for i in {1..12}; do truncate -s 64M /tmp/z/vol$i; done
sudo zpool create testpool \
raidz /tmp/z/vol1 /tmp/z/vol2 /tmp/z/vol3 \
raidz /tmp/z/vol4 /tmp/z/vol5 /tmp/z/vol6 \
log mirror /tmp/z/vol7 /tmp/z/vol8 \
cache /tmp/z/vol9 /tmp/z/vol10 \
spare /tmp/z/vol11 /tmp/z/vol12
The new pool should now look like this:
.. code-block:: none
pool: testpool
state: ONLINE
scan: none requested
config:
NAME STATE READ WRITE CKSUM
testpool ONLINE 0 0 0
raidz1-0 ONLINE 0 0 0
/tmp/z/vol1 ONLINE 0 0 0
/tmp/z/vol2 ONLINE 0 0 0
/tmp/z/vol3 ONLINE 0 0 0
raidz1-1 ONLINE 0 0 0
/tmp/z/vol4 ONLINE 0 0 0
/tmp/z/vol5 ONLINE 0 0 0
/tmp/z/vol6 ONLINE 0 0 0
logs
mirror-2 ONLINE 0 0 0
/tmp/z/vol7 ONLINE 0 0 0
/tmp/z/vol8 ONLINE 0 0 0
cache
/tmp/z/vol9 ONLINE 0 0 0
/tmp/z/vol10 ONLINE 0 0 0
spares
/tmp/z/vol11 AVAIL
/tmp/z/vol12 AVAIL
errors: No known data errors
For reference, when getting a listing of the pools content, the output should look like this (converted to json using
``json.dumps()`` and pretty-printed for readability:
.. code-block:: json
{
"testpool": {
"drives": [
{
"type": "raidz1",
"health": "ONLINE",
"size": 184549376,
"alloc": 88064,
"free": 184461312,
"frag": 0,
"cap": 0,
"members": [
{
"name": "/tmp/z/vol1",
"health": "ONLINE"
},
{
"name": "/tmp/z/vol2",
"health": "ONLINE"
},
{
"name": "/tmp/z/vol3",
"health": "ONLINE"
}
]
},
{
"type": "raidz1",
"health": "ONLINE",
"size": 184549376,
"alloc": 152576,
"free": 184396800,
"frag": 0,
"cap": 0,
"members": [
{
"name": "/tmp/z/vol4",
"health": "ONLINE"
},
{
"name": "/tmp/z/vol5",
"health": "ONLINE"
},
{
"name": "/tmp/z/vol6",
"health": "ONLINE"
}
]
}
],
"log": [
{
"type": "mirror",
"size": 50331648,
"alloc": 0,
"free": 50331648,
"frag": 0,
"cap": 0,
"members": [
{
"name": "/tmp/z/vol7",
"health": "ONLINE"
},
{
"name": "/tmp/z/vol8",
"health": "ONLINE"
}
]
}
],
"cache": [
{
"type": "none",
"members": [
{
"name": "/tmp/z/vol9",
"health": "ONLINE"
},
{
"name": "/tmp/z/vol10",
"health": "ONLINE"
}
]
}
],
"spare": [
{
"type": "none",
"members": [
{
"name": "/tmp/z/vol11",
"health": "AVAIL"
},
{
"name": "/tmp/z/vol12",
"health": "AVAIL"
}
]
}
],
"size": 369098752,
"alloc": 240640,
"free": 368858112,
"chkpoint": null,
"expandsz": null,
"frag": 0,
"cap": 0,
"dedup": 1,
"health": "ONLINE",
"altroot": null
}
}

@ -0,0 +1 @@
pydantic

@ -0,0 +1,4 @@
flake8
mypy
pytest
pytest-cov

@ -0,0 +1,25 @@
[flake8]
ignore = E501,E402
max-line-length = 120
exclude = .git,.tox,build,_build,env,venv,__pycache__
[tool:pytest]
testpaths = tests
python_files =
test_*.py
*_test.py
tests.py
addopts =
-ra
--strict
--tb=short
# potentially dangerous!
# --doctest-modules
# --doctest-glob=\*.rst
[coverage:run]
omit =
venv/*
tests/*

@ -0,0 +1,47 @@
from setuptools import setup
with open('README.rst', 'r') as fh:
long_description = fh.read()
setup(
name='simplezfs',
version='0.0.1',
author='Andreas Gonschorek, Stefan Valouch',
description='Simple, low-level ZFS API',
long_description=long_description,
packages=['simplezfs'],
package_data={'simplezfs': ['py.typed']},
package_dir={'': 'src'},
include_package_data=True,
zip_safe=False,
license='BSD-3-Clause',
url='https://github.com/svalouch/python-simplezfs',
platforms='any',
python_requires='>=3.6',
install_requires=[
'pydantic',
],
extras_require={
'tests': [
'mypy',
'pytest',
'pytest-cov',
],
'docs': [
'Sphinx>=2.0',
],
},
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'License :: OSI Approved :: BSD License',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
],
)

@ -0,0 +1,16 @@
'''
Python ZFS API.
'''
__version__ = '0.0.1'
from .zfs import ZFS, get_zfs
from .zpool import ZPool, get_zpool
__all__ = [
'ZFS',
'get_zfs',
'ZPool',
'get_zpool',
]

@ -0,0 +1,48 @@
'''
Exceptions
'''
class ZFSException(Exception):
'''
Base for the exception tree used by this library.
:note: Some functions throw ordinary python base exceptions as well.
'''
pass
class DatasetNotFound(ZFSException):
'''
A dataset was not found.
'''
pass
class PermissionError(ZFSException):
'''
Permissions are not sufficient to carry out the task.
'''
pass
class PoolNotFound(ZFSException):
'''
A pool was not found.
'''
pass
class PropertyNotFound(ZFSException):
'''
A property was not found.
'''
pass
class ValidationError(ZFSException):
'''
Indicates that a value failed validation.
'''
pass

@ -0,0 +1,181 @@
'''
Type declarations
'''
from enum import Enum, unique
from typing import NamedTuple, Optional
@unique
class DatasetType(str, Enum):
'''
Enumeration of dataset types that ZFS supports.
'''
#: Dataset is an ordinary fileset
FILESET = 'fileset'
#: Dataset is a ZVOL
VOLUME = 'volume'
#: Dataset is a snapshot
SNAPSHOT = 'snapshot'
#: Dataset is a bookmark
BOOKMARK = 'bookmark'
@staticmethod
def from_string(value: str) -> 'DatasetType':
'''
Helper to convert a string to an instance.
:param value: The string to converts.
:returns: The enum value
:raises ValueError: If the supplied value is not found in the enumeration.
'''
if not isinstance(value, str):
raise ValueError('only string types allowed')
val = value.lower()
if val == 'fileset':
return DatasetType.FILESET
elif val == 'volume':
return DatasetType.VOLUME
elif val == 'snapshot':
return DatasetType.SNAPSHOT
elif val == 'bookmark':
return DatasetType.BOOKMARK
else:
raise ValueError(f'Value {value} is not a valid DatasetType')
class Dataset(NamedTuple):
'''
Container describing a single dataset.
'''
#: Name of the dataset (excluding the path)
name: str
#: Full path to and including the dataset itself
full_path: str
#: Pool name
pool: str
#: Parent dataset, or None for the topmost dataset (pool)
parent: Optional[str]
#: Dataset type
type: DatasetType
@unique
class PropertySource(str, Enum):
'''
Enumeration of the valid property sources in ZFS.
'''
#: Property is at default
DEFAULT = 'default'
#: Property was inerited
INHERITED = 'inherited'
#: Property is temporary
TEMPORARY = 'temporary'
#: Property is set to the value that it had due to the sender of a "zfs send/receive" operation having it set this way.
RECEIVED = 'received'
#: Property is set on the dataset in question
NONE = 'none'
@staticmethod
def from_string(value: str) -> 'PropertySource':
'''
Helper to convert a string to an instance.
:param value: The string to convert.
:returns: The enum value.
:raises ValueError: If the supplied value is not found in the enumeration.
'''
if not isinstance(value, str):
raise ValueError('only string types allowed')
val = value.lower()
if val == 'default':
return PropertySource.DEFAULT
elif val == 'inherited':
return PropertySource.INHERITED
elif val == 'temporary':
return PropertySource.TEMPORARY
elif val == 'received':
return PropertySource.RECEIVED
elif val == 'none':
return PropertySource.NONE
else:
raise ValueError(f'Value {value} is not a valid PropertySource')
class Property(NamedTuple):
'''
Container for a single ZFS property.
'''
#: Key or name of the property (excluding namespace for non-native properties)
key: str
#: Value of the property
value: str
#: Source of the property value
source: PropertySource = PropertySource.NONE
#: Namespace name of the property, None for native properties
namespace: Optional[str] = None
class VDevType(str, Enum):
'''
Type of a vdev. vdevs are either one of two storages (disk or file) or one of the special parts (mirror, raidz*,
spare, log, dedup, special and cache).
When reading zpool output, it is not always clear what type a vdev is when it comes to "disk" vs. "file", so a
special type "DISK_OR_FILE" is used. This type is invalid when setting values.
'''
DISK = 'disk'
FILE = 'file'
DISK_OR_FILE = 'disk_or_file'
MIRROR = 'mirror'
RAIDZ1 = 'raidz1'
RAIDZ2 = 'raidz2'
RAIDZ3 = 'raidz3'
SPARE = 'spare'
LOG = 'log'
DEDUP = 'dedup'
SPECIAL = 'special'
CACHE = 'cache'
class ZPoolHealth(str, Enum):
'''
'''
ONLINE = 'ONLINE'
DEGRADED = 'DEGRADED'
FAULTED = 'FAULTED'
OFFLINE = 'OFFLINE'
UNAVAIL = 'UNAVAIL'
REMOVED = 'REMOVED'
#: This one is used for spares
AVAIL = 'AVAIL'
@staticmethod
def from_string(value: str) -> 'ZPoolHealth':
'''
Helper to convert a string to an instance.
:param value: The string to convert.
:returns: The enum value.
:raises ValueError: If the supplied value is not found in the enumeration.
'''
if not isinstance(value, str):
raise ValueError('only string types are allowed')
val = value.lower()
if val == 'online':
return ZPoolHealth.ONLINE
elif val == 'degraded':
return ZPoolHealth.DEGRADED
elif val == 'faulted':
return ZPoolHealth.FAULTED
elif val == 'offline':
return ZPoolHealth.OFFLINE
elif val == 'unavail':
return ZPoolHealth.UNAVAIL
elif val == 'removed':
return ZPoolHealth.REMOVED
elif val == 'avail':
return ZPoolHealth.AVAIL
else:
raise ValueError(f'Value {value} is not a valid ZPoolHealth')

@ -0,0 +1,174 @@
'''
Functions for validating data.
'''
import re
from .exceptions import ValidationError
#: Maximum length of a dataset (from zfs(8)) in bytes
MAXNAMELEN: int = 256
#: Maximum length of a native property name (assumed, TODO check in source)
NATIVE_PROPERTY_NAME_LEN_MAX: int = MAXNAMELEN
#: Maximum length of a metadata property name (assumed, TODO check in source)
METADATA_PROPERTY_NAME_LEN_MAX: int = MAXNAMELEN
#: Maximum length of a metadata property value in bytes
METADATA_PROPERTY_VALUE_LEN_MAX: int = 8192
#: Regular expression for validating dataset names, handling both the name itself as well as snapshot or bookmark names.
DATASET_NAME_RE = re.compile(r'^(?P<dataset>[a-zA-Z0-9_\-.:]+)(?P<detail>(@|#)[a-zA-Z0-9_\-.:]+)?$')
#: Regular expression for validating a native property name
NATIVE_PROPERTY_NAME_RE = re.compile(r'^[a-z]([a-z0-9]+)?$')
#: Regular expression for validating the syntax of a user property (metadata property in python-zfs)
METADATA_PROPERTY_NAME_RE = re.compile(r'^([a-z0-9_.]([a-z0-9_.\-]+)?)?:([a-z0-9:_.\-]+)$')
def validate_pool_name(name: str) -> None:
'''
Validates a pool name.
:raises ValidationError: Indicates validation failed
'''
try:
b_name = name.encode('utf-8')
except UnicodeEncodeError as err:
raise ValidationError(f'unicode encoding error: {err}') from err
if len(b_name) < 1:
raise ValidationError('too short')
# TODO find maximum pool name length
# The pool name must begin with a letter, and can only contain alphanumeric characters as well as underscore
# ("_"), dash ("-"), colon (":"), space (" "), and period (".").
if not re.match(r'^[a-z]([a-z0-9\-_: .]+)?$', name):
raise ValidationError('malformed name')
# The pool names mirror, raidz, spare and log are reserved,
if name in ['mirror', 'raidz', 'spare', 'log']:
raise ValidationError('reserved name')
# as are names beginning with mirror, raidz, spare,
for word in ['mirror', 'raidz', 'spare']:
if name.startswith(word):
raise ValidationError(f'starts with invalid token {word}')
# and the pattern c[0-9].
if re.match(r'^c[0-9]', name):
raise ValidationError('begins with reserved sequence c[0-9]')
def validate_dataset_name(name: str, *, strict: bool = False) -> None:
'''
Validates a dataset name. By default (``strict`` set to **False**) a snapshot (``name@snapshotname``) or bookmark
(``name#bookmarkname``) postfix are allowed. Otherwise, these names are rejected. To validate a complete path, see
:func:`validate_dataset_path`.
Example:
>>> from zfs.validation import validate_dataset_name
>>> validate_dataset_name('swap')
>>> validate_dataset_name('backup', strict=True)
>>> validate_dataset_name('backup@20190525', strict=True)
zfs.validation.ValidationError: snapshot or bookmark identifier are not allowed in strict mode
>>> validate_dataset_name('')
zfs.validation.ValidationError: name is too short
>>> validate_dataset_name('pool/system')
zfs.validation.ValidationError: name contains disallowed characters
>>> validate_dataset_name('a' * 1024) # really long name
zfs.validation.ValidationError: length > 255
:param name: The name to validate
:param strict: Whether to allow (``False``) or disallow (``True``) snapshot and bookmark names.
:raises ValidationError: Indicates validation failed
'''
try:
b_name = name.encode('utf-8')
except UnicodeEncodeError as err:
raise ValidationError(f'unicode encoding error: {err}') from err
if len(b_name) < 1:
raise ValidationError('name is too short')
if len(b_name) > MAXNAMELEN - 1:
raise ValidationError(f'length > {MAXNAMELEN - 1}')
match = DATASET_NAME_RE.match(name)
if not match:
raise ValidationError('name contains disallowed characters')
elif strict and match.group('detail') is not None:
raise ValidationError('snapshot or bookmark identifier are not allowed in strict mode')
def validate_dataset_path(path: str) -> None:
'''
Validates a path of datasets. While :func:`validate_dataset_name` validates only a single entry in a path to a
dataset, this function validates the whole path beginning with the pool.
:raises ValidationError: Indicates validation failed
'''
try:
b_name = path.encode('utf-8')
except UnicodeEncodeError as err:
raise ValidationError(f'unicode encoding error: {err}') from err
if len(b_name) < 3: # a/a is the smallest path
raise ValidationError('path is too short')
if '/' not in path:
raise ValidationError('Not a path')
if path.startswith('/'):
raise ValidationError('zfs dataset paths are never absolute')
tokens = path.split('/')
# the first token is the pool name
validate_pool_name(tokens[0])
# second token to second-to-last token are normal datasets that must not be snapshots or bookmarks
for dataset in tokens[1:-1]:
validate_dataset_name(dataset, strict=True)
# last token is the actual dataset, this may be a snapshot or bookmark
validate_dataset_name(tokens[-1])
def validate_native_property_name(name: str) -> None:
'''
Validates the name of a native property. Length and syntax is checked.
:note: No check is performed to match the name against the actual names the target ZFS version supports.
:raises ValidationError: Indicates that the validation failed.
'''
try:
b_name = name.encode('utf-8')
except UnicodeEncodeError as err:
raise ValidationError(f'unicode encoding error: {err}') from err
if len(b_name) < 1:
raise ValidationError('name is too short')
if len(b_name) > NATIVE_PROPERTY_NAME_LEN_MAX - 1:
raise ValidationError(f'length > {NATIVE_PROPERTY_NAME_LEN_MAX - 1}')
if not NATIVE_PROPERTY_NAME_RE.match(name):
raise ValidationError('property name does not match')
def validate_metadata_property_name(name: str) -> None:
'''
Validate the name of a metadata property (user property in ZFS manual).
:raises ValidationError: Indicates that the validation failed.
'''
try:
b_name = name.encode('utf-8')
except UnicodeEncodeError as err:
raise ValidationError(f'unicode encoding error: {err}') from err
if len(b_name) < 1:
raise ValidationError('name is too short')
if len(b_name) > METADATA_PROPERTY_NAME_LEN_MAX - 1:
raise ValidationError(f'length > {METADATA_PROPERTY_NAME_LEN_MAX - 1}')
if not METADATA_PROPERTY_NAME_RE.match(name):
raise ValidationError('property name does not match')
def validate_property_value(value: str) -> None:
'''
Validates the value of a property. This works for both native properties, where the driver will tell us if the
value was good or not, as well metadata (or user) properties where the only limit is its length.
:param value: The value to validate.
:raises ValidationError: Indicates that the validation failed.
'''
try:
b_value = value.encode('utf-8')
except UnicodeEncodeError as err:
raise ValidationError(f'unicode encoding error: {err}') from err
if len(b_value) > METADATA_PROPERTY_VALUE_LEN_MAX - 1:
raise ValidationError(f'length > {METADATA_PROPERTY_VALUE_LEN_MAX - 1}')

@ -0,0 +1,550 @@
'''
ZFS frontend API
'''
from typing import Dict, List, Optional, Union
from .exceptions import (
DatasetNotFound,
PermissionError,
PoolNotFound,
PropertyNotFound,
ValidationError,
)
from .types import Dataset, DatasetType, Property
from .validation import (
validate_dataset_path,
validate_metadata_property_name,
validate_native_property_name,
validate_pool_name,
validate_property_value,
)
class ZFS:
'''
ZFS interface class. This API generally covers only the zfs(8) tool, for zpool(8) please see :class:`~ZPool`.
**ZFS implementation**
There are two ways how the API actually communicates with the ZFS filesystem:
* Using the CLI tools (:class:`~zfs.zfs_cli.ZFSCli`)
* Using the native API (:class:`~zfs.zfs_native.ZFSNative`)
You can select the API by either creating an instance of one of them or using :func:`~zfs.zfs.get_zfs`.
**Properties and Metadata**
The functions :func:`set_property`, :func:`get_property` and :func:`get_properties` wraps the ZFS get/set
functionality. To support so-alled `user properties`, which are called `metadata` in this API, a default namespace
can be stored using `metadata_namespace` when instantiating the interface or by calling :func:`set_metadata_namespace`
at any time.
:note: Not setting a metadata namespace means that one can't set or get metadata properties, unless the overwrite
parameter for the get/set functions is used.
:param api: API to use, either ``cli`` for zfs(8) or ``native`` for the `libzfs_core` api.
:param metadata_namespace: Default namespace
:param kwargs: Extra arguments TODO
'''
def __init__(self, *, metadata_namespace: Optional[str] = None, **kwargs) -> None:
self.metadata_namespace = metadata_namespace
@property
def metadata_namespace(self) -> Optional[str]:
'''
Returns the metadata namespace, which may be None if not set.
'''
return self._metadata_namespace
@metadata_namespace.setter
def metadata_namespace(self, namespace: str) -> None:
'''
Sets a new metadata namespace
:todo: validate!
'''
self._metadata_namespace = namespace
def dataset_exists(self, name: str ) -> bool:
'''
Checks is a dataset exists. This is done by querying for its `type` property.
:param name: Name of the dataset to check for.
:return: Whether the dataset exists.
'''
try:
return self.get_property(name, 'type') is not None
except (DatasetNotFound, PermissionError, PoolNotFound):
return False
except PropertyNotFound:
return True
return False
def list_datasets(self, *, parent: Union[str, Dataset] = None) -> List[Dataset]:
'''
Lists all datasets known to the system. If ``parent`` is set to a pool or dataset name (or a :class:`~zfs.types.Dataset`),
lists all the children of that dataset.
:param parent: If set, list all child datasets.
:return: The list of datasets.
'''
raise NotImplementedError(f'{self} has not implemented this function')
def set_property(self, dataset: str, key: str, value: str, *, metadata: bool = False, overwrite_metadata_namespace: Optional[str] = None) -> None:
'''
Sets the ``value`` of the native property ``key``. By default, only native properties can be set. If
``metadata`` is set to **True**, the default metadata namespace is prepended or the one in
``overwrite_metadata_namespace`` is used if set to a valid string.
Example:
>>> z = ZFSCli(namespace='foo')
>>> z.set_property('tank/test', 'testprop', 'testval', metadata=True, overwrite_metadata_namespace='bar')
>>> z.get_property('tank/test', 'testprop')
Exception
>>> z.get_property('tank/test', 'testprop', metadata=True)
Exception
>>> z.get_property('tank/test', 'testprop', metadata=True, overwrite_metadata_namespace='bar')
Property(key='testprop', val='testval', source='local', namespace='bar')
:param dataset: Name of the dataset to set the property. Expects the full path beginning with the pool name.
:param key: Name of the property to set. For non-native properties, set ``metadata`` to **True** and overwrite
the default namespace using ``overwrite_metadata_namespace`` if required.
:param value: The new value to set.
:param metadata: If **True**, prepend the namespace to set a user (non-native) property.
:param overwrite_metadata_namespace: Overwrite the default metadata namespace for user (non-native) properties
:raises DatasetNotFound: If the dataset could not be found.
:raises ValidationError: If validating the parameters failed.
'''
if key.strip() == 'all' and not metadata:
raise ValidationError('"all" is not a valid property name')
if '/' not in dataset:
validate_pool_name(dataset)
else:
validate_dataset_path(dataset)
if metadata:
if overwrite_metadata_namespace:
prop_name = f'{overwrite_metadata_namespace}:{key}'
elif self.metadata_namespace:
prop_name = f'{self.metadata_namespace}:{key}'
else:
raise ValidationError('no metadata namespace set')
validate_metadata_property_name(prop_name)
else:
validate_native_property_name(key)
prop_name = key
validate_property_value(value)
self._set_property(dataset, prop_name, value, metadata)
def _set_property(self, dataset: str, key: str, value: str, is_metadata: bool) -> None:
'''
Actual implementation of the set_property function. This is to be implemented by the specific APIs. It is
called by set_property, which has done all the validation and thus the parameters can be trusted to be valid.
:param dataset: Name of the dataset to set the property. Expects the full path beginning with the pool name.
:param key: Name of the property to set. For non-native properties, set ``metadata`` to **True** and overwrite
the default namespace using ``overwrite_metadata_namespace`` if required.
:param value: The new value to set.
:param is_metadata: Indicates we're dealing with a metadata property.
:raises DatasetNotFound: If the dataset could not be found.
'''
raise NotImplementedError(f'{self} has not implemented this function')
def get_property(self, dataset: str, key: str, *, metadata: bool = False, overwrite_metadata_namespace: Optional[str] = None) -> Property:
'''
Gets a specific property named ``key`` from the ``dataset``. By default, only native properties are returned.
This behaviour can be changed by setting ``metadata`` to **True**, which uses the default `metadata_namespace` to
select the namespace. The namespace can be overwritten using ``overwrite_metadata_namespace`` for the duration of the
method invocaton.
:param dataset: Name of the dataset to get the property. Expects the full path beginning with the pool name.
:param key: Name of the property to set. For non-native properties, set ``metadata`` to **True** and overwrite
the default namespace using ``overwrite_metadata_namespace`` if required.
:param metadata: If **True**, prepend the namespace to set a user (non-native) property.
:param overwrite_metadata_namespace: Overwrite the default namespace for user (non-native) properties.
:raises DatasetNotFound: If the dataset does not exist.
:raises ValidationError: If validating the parameters failed.
'''
if key.strip() == 'all' and not metadata:
raise ValidationError('"all" is not a valid property, use get_properties instead')
if '/' not in dataset:
# got a pool here
validate_pool_name(dataset)
else:
validate_dataset_path(dataset)
if metadata:
if overwrite_metadata_namespace:
prop_name = f'{overwrite_metadata_namespace}:{key}'
elif self.metadata_namespace:
prop_name = f'{self.metadata_namespace}:{key}'
else:
raise ValidationError('no metadata namespace set')
validate_metadata_property_name(prop_name)
else:
validate_native_property_name(key)
prop_name = key
return self._get_property(dataset, prop_name, metadata)
def _get_property(self, dataset: str, key: str, is_metadata: bool) -> Property:
'''
Actual implementation of the get_property function. This is to be implemented by the specific APIs. It is
called by get_property, which has done all the validation and thus the parameters can be trusted to be valid.
:param dataset: Name of the dataset to get the property. Expects the full path beginning with the pool name.
:param key: Name of the property to set. For non-native properties, set ``metadata`` to **True** and overwrite
the default namespace using ``overwrite_metadata_namespace`` if required.
:param is_metadata: Indicates we're dealing with a metadata property.
:raises DatasetNotFound: If the dataset does not exist.
'''
raise NotImplementedError(f'{self} has not implemented this function')
def get_properties(self, dataset: str, *, include_metadata: bool = False) -> List[Property]:
'''
Gets all properties from the ``dataset``. By default, only native properties are returned. To include metadata
properties, set ``include_metadata`` to **True**. In this mode, all properties are included, regardless of
``metadata_namespace``, it is up to the user to filter the metadata.
:param dataset: Name of the dataset to get properties from. Expects the full path beginning with the pool name.
:param include_metadata: If **True**, returns metadata (user) properties in addition to native properties.
:raises DatasetNotFound: If the dataset does not exist.
:raises ValidationError: If validating the parameters failed.
'''
if '/' not in dataset:
validate_pool_name(dataset)
else:
validate_dataset_path(dataset)
return self._get_properties(dataset, include_metadata)
def _get_properties(self, dataset: str, include_metadata: bool):
'''
Actual implementation of the get_properties function. This is to be implemented by the specific APIs. It is
called by get_properties, which has done all the validation and thus the parameters can be trusted to be valid.
:param dataset: Name of the dataset to get properties from. Expects the full path beginning with the pool name.
:param include_metadata: If **True**, returns metadata (user) properties in addition to native properties.
:raises DatasetNotFound: If the dataset does not exist.
:return: A list of properties.
'''
raise NotImplementedError(f'{self} has not implemented this function')
def create_snapshot(
self,
dataset: str,
name: str,
*,
properties: Dict[str, str] = None,
metadata_properties: Dict[str, str] = None
) -> Dataset:
'''
Create a new snapshot from an existing dataset.
:param dataset: The dataset to snapshot.
:param name: Name of the snapshot (the part after the ``@``)
:param properties: Dict of native properties to set.
:param metadata_properties: Dict of native properties to set. For namespaces other than the default (or when
no default has been set, format the key using ``namespace:key``.
:return: Info about the newly created dataset.
:raises ValidationError: If validating the parameters failed.
'''
return self.create_dataset(
f'{dataset}@{name}',
dataset_type=DatasetType.SNAPSHOT,
properties=properties,
metadata_properties=metadata_properties
)
def create_bookmark(
self,
dataset: str,
name: str,
*,
properties: Dict[str, str] = None,
metadata_properties: Dict[str, str] = None
) -> Dataset:
'''
Create a new bookmark from an existing dataset.
:param dataset: The dataset to attach a bookmark to.
:param name: Name of the bookmark (the part after the ``#``)
:param properties: Dict of native properties to set.
:param metadata_properties: Dict of native properties to set. For namespaces other than the default (or when
no default has been set, format the key using ``namespace:key``.
:param recursive: Recursively create the parent fileset. Refer to the ZFS documentation about the `-p`
parameter for zfs create``.
:return: Info about the newly created dataset.
:raises ValidationError: If validating the parameters failed.
'''
return self.create_dataset(
f'{dataset}#{name}',
dataset_type=DatasetType.BOOKMARK,
properties=properties,
metadata_properties=metadata_properties
)
def create_fileset(
self,
name: str,
*,
mountpoint: str = None,
properties: Dict[str, str] = None,
metadata_properties: Dict[str, str] = None,
mount_helper: str = None,
recursive: bool = True
) -> Dataset:
'''
Create a new fileset. For convenience, a ``mountpoint`` parameter can begiven. If not **None**, it will
overwrite any ´mountpoint` value in the ``properties`` dict.
:param name: Name of the new fileset (complete path in the ZFS hierarchy).
:ppram mountpoint: Convenience parameter for setting/overwriting the moutpoint property
:param properties: Dict of native properties to set.
:param metadata_properties: Dict of native properties to set. For namespaces other than the default (or when
no default has been set, format the key using ``namespace:key``.
:param mount_helper: Mount helper for Linux when not running as root. See :ref:`the_mount_problem` for details.
:param recursive: Recursively create the parent fileset. Refer to the ZFS documentation about the `-p`
parameter for zfs create``.
:return: Info about the newly created dataset.
:raises ValidationError: If validating the parameters failed.
'''
if mountpoint is not None:
if properties is None:
properties = dict()
# TODO validate path
properties['mountpoint'] = mountpoint
return self.create_dataset(
name,
dataset_type=DatasetType.FILESET,
properties=properties,
metadata_properties=metadata_properties,
mount_helper=mount_helper
)
def create_volume(
self,
name: str,
*,
size: int,
sparse: bool = False,
blocksize: Optional[int] = None,
properties: Dict[str, str] = None,
metadata_properties: Dict[str, str] = None,
recursive: bool = False
) -> Dataset:
'''
Create a new volume of the given ``size`` (in bytes). If ``sparse`` is **True**, a sparse volume (also known
as thin provisioned) will be created. If ``blocksize`` is given, it overwrites the ``blocksize`` property.
:param name: Name of the new volume (complete path in the ZFS hierarchy).
:param size: The size (in `bytes`) for the new volume.
:param sparse: Whether to create a sparse volume. Requires ``size`` to be set.
:param blocksize: If set, overwrites the `blocksize` property. Provided for convenience.
:param properties: Dict of native properties to set.
:param metadata_properties: Dict of native properties to set. For namespaces other than the default (or when
no default has been set, format the key using ``namespace:key``.
:param recursive: Recursively create the parent fileset. Refer to the ZFS documentation about the `-p`
parameter for zfs create``.
:return: Info about the newly created dataset.
:raises ValidationError: If validating the parameters failed.
'''
if blocksize is not None:
if properties is None:
properties = dict()
properties['blocksize'] = f'{blocksize}'
return self.create_dataset(
name,
dataset_type=DatasetType.VOLUME,
properties=properties,
metadata_properties=metadata_properties,
sparse=sparse,
size=size,
recursive=recursive
)
def create_dataset(
self,
name: str,
*,
dataset_type: DatasetType = DatasetType.FILESET,
properties: Dict[str, str] = None,
metadata_properties: Dict[str, str] = None,
mount_helper: str = None,
sparse: bool = False,
size: Optional[int] = None,
recursive: bool = False
) -> Dataset:
'''
Create a new dataset. The ``dataset_type`` parameter contains the type of dataset to create. This is a generic
function to create datasets, you may want to take a look at the more specific functions (that essentially call
this one) for convenience:
* :func:`~ZFS.create_bookmark`
* :func:`~ZFS.create_fileset`
* :func:`~ZFS.create_snapshot`
* :func:`~ZFS.create_volume`
Properties specified with this call will be included in the create operation and are thus atomic. An exception
applies for `filesets` with mountpoints that are neither ``none`` nor ``legacy`` on `Linux`.
.. note::
On Linux, only root is allowed to manipulate the namespace (aka `mount`). Refer to :ref:`the_mount_problem` in
the documentation.
:param name: The name of the new dataset. This includes the full path, e.g. ``tank/data/newdataset``.
:param dataset_type: Indicates the type of the dataset to be created.
:param properties: A dict containing the properties for this new dataset. These are the native properties.
:param metadata_properties: The metadata properties to set. To use a different namespace than the default (or
when no default is set), use the ``namespace:key`` format for the dict keys.
:param mount_helper: An executable that has the permission to manipulate the namespace, aka mount the fileset.
:param sparse: For volumes, specifies whether a sparse (thin provisioned) or normal (thick provisioned) volume
should be created.
:param size: For volumes, specifies the size in bytes.
:param recursive: Recursively create the parent fileset. Refer to the ZFS documentation about the `-p`
parameter for zfs create``. This does not apply to types other than volumes or filesets.
'''
if '/' in name:
validate_dataset_path(name)
else:
validate_pool_name(name)
# we can't create a toplevel element
if '/' not in name and type in (DatasetType.FILESET, DatasetType.VOLUME):
raise ValidationError('Can\'t create a toplevel fileset or volume, use ZPool instead.')
# check the syntax of the properties
if properties is not None:
for k, v in properties:
validate_native_property_name(k)
validate_property_value(v)
_metadata_properties = dict() # type: Dict[str, str]
if metadata_properties is not None:
for k, v in metadata_properties:
# if the name has no namespace, add the default one if set
if ':' not in k:
if not self._metadata_namespace:
raise ValidationError(f'Metadata property {k} has no namespace and none is set globally')
else:
meta_name = f'{self._metadata_namespace}:{k}'
else:
meta_name = k
_metadata_properties[meta_name] = metadata_properties[k]
validate_metadata_property_name(meta_name)
if type(v) != str:
_metadata_properties[meta_name] = f'{v}'
validate_property_value(_metadata_properties[meta_name])
# validate type specifics
if dataset_type == DatasetType.VOLUME:
if not size:
raise ValidationError('Size must be specified for volumes')
try:
size = int(size)
except ValueError as e:
raise ValidationError('Size is not an integer') from e
if size < 1:
raise ValidationError('Size is too low')
if properties and 'blocksize' in properties:
try:
blocksize = int(properties['blocksize'])
except ValueError as e:
raise ValidationError('blocksize must be an integer')
if blocksize < 2 or blocksize > 128*1024: # zfs(8) version 0.8.1 lists 128KB as maximum
raise ValidationError('blocksize must be between 2 and 128kb (inclusive)')
if not ((blocksize & (blocksize-1) == 0) and blocksize != 0):
raise ValidationError('blocksize must be a power of two')
# TODO recursive
elif dataset_type == DatasetType.FILESET:
if '@' in name or '#' in name:
raise ValidationError('Filesets can\'t contain @ or #')
elif dataset_type in (DatasetType.SNAPSHOT, DatasetType.BOOKMARK):
symbol = '@' if dataset_type == DatasetType.SNAPSHOT else '#'
if symbol not in name:
raise ValidationError(f'Name must include {symbol}name')
# check if parent exits
ds_name, ss_name = name.split(symbol)
if not self.dataset_exists(ds_name):
raise DatasetNotFound(f'The parent dataset "{ds_name}" could not be found')
# TODO
return self._create_dataset(name, dataset_type=dataset_type, properties=properties, metadata_properties=_metadata_properties, mount_helper=mount_helper, sparse=sparse, size=size, recursive=recursive)
def _create_dataset(
self,
name: str,
*,
dataset_type: DatasetType,
properties: Dict[str, str] = None,
metadata_properties: Dict[str, str] = None,
mount_helper: str = None,
sparse: bool = False,
size: Optional[int] = None,
recursive: bool = False,
) -> Dataset:
raise NotImplementedError(f'{self} has not implemented this function')
def destroy_dataset(self, name: str, *, recursive: bool = False) -> None:
'''
Destroy a dataset. This function tries to remove a dataset, optionally removing all children recursively if
``recursive`` is **True**. This function works on all types of datasets, ``fileset``, ``volume``, ``snapshot``
and ``bookmark``.
This function can't be used to destroy pools, please use :class:`~zfs.ZPool` instead.
Example:
>>> zfs = ZFSCli()
>>> zfs.destroy_dataset('pool/system/root@pre-distupgrade')
:note: This is a destructive process that can't be undone.
:param name: Name of the dataset to remove.
'''
raise NotImplementedError(f'{self} has not implemented this function')
def get_zfs(api: str = 'cli', metadata_namespace: Optional[str] = None, **kwargs) -> ZFS:
'''
Returns an instance of the desired ZFS API. Default is ``cli``.
Using this function is an alternative to instantiating one of the implementations yourself.
The parameters ``metadata_namespace`` and all of the ``kwargs`` are passed to the implementations constructor.
Example:
>>> from zfs import get_zfs, ZFS
>>> type(get_zfs('cli'))
<class 'zfs.zfs_cli.ZFSCli'>
>>> type(get_zfs('native'))
<class 'zfs.zfs_native.ZFSNative'>
>>> isinstance(get_zfs(), ZFS)
True
:param api: API to use, either ``cli`` for zfs(8) or ``native`` for the `libzfs_core` api.
:param metadata_namespace: Default namespace.
:param kwargs: Extra parameters to pass to the implementations constructor.
:return: An API instance.
:raises NotImplementedError: If an unknown API was requested.
'''
if api == 'cli':
from .zfs_cli import ZFSCli
return ZFSCli(metadata_namespace=metadata_namespace, **kwargs)
elif api == 'native':
from .zfs_native import ZFSNative
return ZFSNative(metadata_namespace=metadata_namespace, **kwargs)
raise NotImplementedError(f'The api "{api}" has not been implemented.')

@ -0,0 +1,225 @@
'''
CLI-based implementation.
'''
from typing import List, Optional, Union
import logging
import os
import shutil
import subprocess
from .exceptions import DatasetNotFound, PropertyNotFound, ValidationError
from .types import Property, Dataset, DatasetType
from .validation import (
validate_dataset_path,
validate_metadata_property_name,
validate_native_property_name,
validate_pool_name,
)
from .zfs import ZFS
log = logging.getLogger('zfs.zfs_cli')
class ZFSCli(ZFS):
'''
ZFS interface implementation using the zfs(8) command line utility. For documentation, please see the interface
:class:`~zfs.zfs.ZFS`. It is recommended to use :func:`~zfs.zfs.get_zfs` to obtain an instance using ``cli`` as
api.
If ``zfs_exe`` is supplied, it is assumed that it points to the path of the ``zfs(8)`` executable.
'''
def __init__(self, *, metadata_namespace: Optional[str] = None, zfs_exe: Optional[str] = None, **kwargs) -> None:
super().__init__(metadata_namespace=metadata_namespace)
self.find_executable(path=zfs_exe)
def find_executable(self, path: str = None):
'''
Tries to find the executable ``zfs(8)``. If ``path`` points to an executable, it is used instead of relying on
the PATH to find it. It does not fall back to searching in $PATH of ``path`` does not point to an executable.
An exception is raised if no executable could be found.
:param path: Path to the executable, used blindly if supplied.
:raises OSError: If the executable could not be found.
'''
exe_path = path
if not exe_path:
exe_path = shutil.which('zfs')
if not exe_path:
raise OSError('Could not find executable')
self.__exe = exe_path
@property
def executable(self) -> str:
'''
Returns the zfs executable that was found by find_executable
'''
return self.__exe
@staticmethod
def parse_dataset_identifier(name: str) -> Dataset:
'''
Parses a dataset identifier like ``pool/system/root@initial`` to a :class:`~simplezfs.types.Dataset`.
:param name: The name to parse.
:return: The dataset.
:raises ValidationError: If the argument is not valid or the argument was a pool.
'''
if '/' in name:
validate_dataset_path(name)
tokens = name.split('/')
ds_name = tokens[-1]
ds_parent = '/'.join(tokens[:-1]) # type: Optional[str]
ds_pool = tokens[0]
else:
validate_pool_name(name)
ds_name = name
ds_parent = None
ds_pool = name
if '@' in ds_name:
ds_type = DatasetType.SNAPSHOT
elif '#' in ds_name:
ds_type = DatasetType.BOOKMARK
elif ZFSCli.is_zvol(name):
ds_type = DatasetType.VOLUME
else:
ds_type = DatasetType.FILESET
return Dataset(name=ds_name, parent=ds_parent, type=ds_type, full_path=name, pool=ds_pool)
@staticmethod
def is_zvol(name: str) -> bool:
'''
Resolves the given name in the dev filesystem. If it is found beneath ``/dev/zvol``, **True** is returned.
:param name: The name of the suspected volume
:return: Whether the name represents a volume rather than a fileset.
:raises ValidationError: If validation fails.
'''
if '/' in name:
validate_dataset_path(name)
else:
validate_pool_name(name)
return os.path.exists(os.path.join('/dev/zvol', name))
def list_datasets(self, *, parent: Union[str, Dataset] = None) -> List[Dataset]:
'''
:todo: ability to limit to a pool (path validator discards pool-only arguments)
:todo: find a way to tell the user to use ZPool for pools if only a pool is given
'''
# zfs list -H -r -t all
args = [self.__exe, 'list', '-H', '-r', '-t', 'all']
if parent:
# zfs list -H -r -t all $parent
if isinstance(parent, Dataset):
parent_path = parent.full_path
else:
parent_path = parent
# as the upmost parent is a dataset as well, but not a path, we need to handle this case
if '/' not in parent_path:
validate_pool_name(parent_path)
else:
validate_dataset_path(parent_path)
args.append(parent_path)
# python 3.7 can use capture_output=True
proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
if proc.returncode != 0 or len(proc.stderr) > 0:
if parent:
self.handle_command_error(proc, dataset=args[-1])
else:
self.handle_command_error(proc)
res = list()
for line in proc.stdout.strip().split('\n'):
# format is NAME, USED, AVAIL, REFER, MOUNTPOINT, we only care for the name here
name = line.split('\t')[0]
res.append(ZFSCli.parse_dataset_identifier(name.strip()))
return res
def handle_command_error(self, proc: subprocess.CompletedProcess, dataset: str = None) -> None:
'''
Handles errors that occured while running a command.
:param proc: The result of subprocess.run
:param dataset: If the error was caused by working with a dataset, specify it to enhance the error message.
:todo: propper exception!
:raises DatasetNotFound: If zfs could not find the dataset it was requested to work with.
:raises PropertyNotFound: If the could not find the property it was asked to work with.
:raises Exception: tmp
'''
if 'dataset does not exist' in proc.stderr:
if dataset:
raise DatasetNotFound(f'Dataset "{dataset}" not found')
raise DatasetNotFound('Dataset not found')
elif 'bad property list: invalid property' in proc.stderr:
if dataset:
raise PropertyNotFound(f'invalid property on dataset {dataset}')
else:
raise PropertyNotFound('invalid property')
raise Exception(f'Command execution "{" ".join(proc.args)}" failed: {proc.stderr}')
def _set_property(self, dataset: str, key: str, value: str, is_metadata: bool) -> None:
'''
Sets a property, basically using ``zfs set {key}={value} {dataset}```.
:raises DatasetNotFound: If the dataset does not exist.
'''
args = [self.__exe, 'set', f'{key}={value}', dataset]
log.debug(f'_set_property: about to run command: {args}')
proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
if proc.returncode != 0 or len(proc.stderr) > 0:
log.debug(f'_set_propery: command failed, code={proc.returncode}, stderr="{proc.stderr}"')
self.handle_command_error(proc, dataset=dataset)
def _get_property(self, dataset: str, key: str, is_metadata: bool) -> Property:
'''
Gets a property, basically using ``zfs get -H -p {key} {dataset}``.
:raises DatasetNotFound: If the dataset does not exist.
:raises PropertyNotFound: If the property does not exist or is invalid (for native ones).
'''
args = [self.__exe, 'get', '-H', '-p', key, dataset]
log.debug(f'_get_property: about to run command: {args}')
proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
if proc.returncode != 0 or len(proc.stderr) > 0:
log.debug(f'_get_property: command failed, code={proc.returncode}, stderr="{proc.stderr}"')
self.handle_command_error(proc, dataset=dataset)
name, prop_name, prop_value, prop_source = proc.stdout.strip().split('\t')
if name != dataset:
raise Exception(f'expected name "{dataset}", but got {name}')
if is_metadata and prop_value == '-' and prop_source == '-':
raise PropertyNotFound(f'Property {key} was not found')
namespace = None
if is_metadata:
namespace = prop_name.split(':')[0]
return Property(key=prop_name, value=prop_value, source=prop_source, namespace=namespace)
def _get_properties(self, dataset: str, include_metadata: bool = False) -> List[Property]:
'''
Gets all properties from a dataset, basically running ``zfs get -H -p all {dataset}``.
:raises DatasetNotFound: If the dataset does not exist.
'''
args = [self.__exe, 'get', '-H', '-p', 'all', dataset]
log.debug(f'_get_properties: about to run command: {args}')
proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
if proc.returncode != 0 or len(proc.stderr) > 0:
log.debug(f'_get_properties: command faild, code={proc.returncode}, stderr="{proc.stderr}"')
self.handle_command_error(proc, dataset=dataset)
res = list()
for line in proc.stdout.split('\n'):
if line:
_, prop_name, prop_value, prop_source = line.strip().split('\t')
if ':' in prop_name:
if include_metadata:
namespace = prop_name.split(':')[0]
prop_name = prop_name.lstrip(f'{namespace}:')
res.append(Property(key=prop_name, value=prop_value, source=prop_source, namespace=namespace))
else:
res.append(Property(key=prop_name, value=prop_value, source=prop_source, namespace=None))
return res

@ -0,0 +1,31 @@
'''
Native, ``libzfs_core``-based implementation.
'''
from typing import List, Optional
import logging
from .types import Property
from .zfs import ZFS
log = logging.getLogger('zfs.zfs_native')
class ZFSNative(ZFS):
'''
ZFS interface implementation using the libzfs_core python bindings. For documentation, please see the interface
:class:`~zfs.zfs.ZFS`. It is recommended to use :func:`~zfs.zfs.get_zfs` to obtain an instance, using ``native``
as api.
'''
def __init__(self, *, metadata_namespace: Optional[str] = None, **kwargs) -> None:
super().__init__(metadata_namespace=metadata_namespace)
def set_property(self, dataset: str, key: str, value: str, *, metadata: bool = False, overwrite_metadata_namespace: Optional[str] = None) -> None:
raise NotImplementedError
def get_property(self, dataset: str, key: str, *, metadata: bool = False, overwrite_metadata_namespace: Optional[str] = None) -> Property:
raise NotImplementedError
def get_properties(self, dataset: str, *, include_metadata: bool = False) -> List[Property]:
raise NotImplementedError

@ -0,0 +1,62 @@
from typing import Optional
class ZPool:
'''
ZPool interface class. This API generally covers only the zpool(8) tool, for zfs(8) please see class :class:`~ZFS`.
**ZFS implementation**
There are two ways how the API actually communicates with the ZFS filesystem:
* Using the CLI tools
* Using the native API
When creating an instance of this class, select one or the other as the ``api`` argument.
'''
def __init__(self, *, metadata_namespace: Optional[str] = None, **kwargs) -> None:
self.metadata_namespace = metadata_namespace
@property
def metadata_namespace(self) -> Optional[str]:
'''
Returns the metadata namespace, which may be None if not set.
'''
return self._metadata_namespace
@metadata_namespace.setter
def metadata_namespace(self, namespace: str) -> None:
'''
Sets a new metadata namespace
:todo: validate!
'''
self._metadata_namespace = namespace
def get_zpool(api: str = 'cli', metadata_namespace: Optional[str] = None, **kwargs) -> ZPool:
'''
Returns an instance of the desired ZPool API. Default is ``cli``.
Using this function is an alternative to instantiating one of the implementations yourself and is the recommended
way to get an instance.
Example:
>>> from zfs import get_zpool, ZPool
>>> type(get_zpool('cli'))
<class 'zfs.zpool_cli.ZPoolCli'>
>>> type(get_zpool('native'))
<class 'zfs.zpool_native.ZPoolNative'>
>>> isinstance(get_zpool(), ZPool)
True
'''
if api == 'cli':
from .zpool_cli import ZPoolCli
return ZPoolCli(metadata_namespace=metadata_namespace, **kwargs)
elif api == 'native':
from .zpool_native import ZPoolNative
return ZPoolNative(metadata_namespace=metadata_namespace, **kwargs)
raise NotImplementedError(f'The api "{api}" has not been implemented.')

@ -0,0 +1,161 @@
'''
CLI-based implementation of ZPOOL.
'''
import logging
import shutil
from typing import Any, Dict, NamedTuple, Optional
from .types import ZPoolHealth
from .zpool import ZPool
log = logging.getLogger('zfs.zpool_cli')
class ZPoolCli(ZPool):
def __init__(self, *, metadata_namespace: Optional[str] = None, zpool_exe: Optional[str] = None, **kwargs) -> None:
super().__init__(metadata_namespace=metadata_namespace)
self.find_executable(path=zpool_exe)
def find_executable(self, path: str = None) -> None:
'''
Tries to find the executable ``zpool``. If ``path`` points to an executable, it is used instead of relying on
the PATH to find it. It does not fall back to searching in PATH if ``path`` does not point to an exeuctable.
An exception is raised if no executable could be found.
:param path: Path to an executable to use instead of searching through $PATH.
:raises OSError: If the executable could not be found.
'''
exe_path = path
if not exe_path:
exe_path = shutil.which('zpool')
if not exe_path:
raise OSError('Could not find the executable')
self.__exe = exe_path
@property
def executable(self) -> str:
'''
Returns the executable found by find_executable.
'''
return self.__exe
def parse_pool_structure(self, zpool_list_output: str) -> Dict:
'''
Parses the output of ``zpool list -vPHp`` and emits a list of pool structures.
'''
plog = logging.getLogger('zfs.zpool_cli.zpool_list_parser')
output = dict() # type: Dict[str, Dict]
# holds the current pool name
pool_name = ''
# section we're parsing
state = ''
vdev_drives = list()
vdevs = dict() # type: Dict[str, Any]
# offset is 0 for zfs 0.7 and 1 for 0.8, due to the added "CKPOINT" field
# This gets set when we encounter a pool in the output, i.e. the very first line. We don't cate that we set it
# for every pool we encounter, the value does not change during output.
offset = 0
for line in [x.split('\t') for x in zpool_list_output.split('\n')]:
print(line)
if len(line) == 1 and not line[0]:
# caught the last line ending
plog.debug('ignoring empty line')
continue
if not line[0]:
plog.debug(f'token 0 not set, token 1: {line[1]}')
# empty first token: parse the members. $state defines what part of the pool we're parsing
if line[1].startswith('/'):
# paths always define either disk or file vdevs
plog.debug(f'+ drive {line[1]}')
vdev_drives.append(dict(name=line[1], health=ZPoolHealth.from_string(line[9 + offset].strip())))
else:
# everything else defines a combination of disks (aka raidz, mirror etc)
if vdev_drives:
# if we have pending drives in the list, append them to previous segment as we're starting a
# new one, then clear the states
plog.debug('end section, save data')
print('end section')
vdevs['members'] = vdev_drives
output[pool_name][state].append(vdevs)
vdevs = dict(type='none')
vdev_drives = list()
vdevs['type'] = line[1]
if state not in ('log', 'cache', 'spare'):
vdevs['health'] = ZPoolHealth.from_string(line[9 + offset].strip())
vdevs['size'] = int(line[2])
vdevs['alloc'] = int(line[3])
vdevs['free'] = int(line[4])
vdevs['frag'] = int(line[6 + offset])
vdevs['cap'] = float(line[7 + offset])
plog.debug(f'new type: {line[1]}')
else:
plog.debug(f'token 0: {line[0]}')
# A token in the first place defines a new pool or section (log, cache, spare) in the current pool.
# Append the pending elements to the current pool and state and clear them.
if vdev_drives:
plog.debug(f'have {len(vdev_drives)} vdev_drives, save data')
vdevs['members']= vdev_drives
output[pool_name][state].append(vdevs)
vdevs = dict(type='none')
vdev_drives = list()
# The first element is either a pool name or a section within the pool
# these break the format and are not tab separated
if line[0].startswith('cache'):
plog.debug('new section: cache')
state = 'cache'
elif line[0].startswith('log'):
plog.debug('new section: log')
state = 'log'
elif line[0].startswith('spare'):
plog.debug('new section: spare')
state = 'spare'
else:
# new pool name
plog.debug(f'new section: drives. new pool: {line[0]}')
state = 'drives'
pool_name = line[0]
# NOTE ZoL v0.7 has 10 fields, v0.8 has 11 (chkpoint)
if len(line) == 10:
offset = 0
else:
offset = 1
output[pool_name] = {
'drives': [],
'log': [],
'cache': [],
'spare': [],
'size': int(line[1]),
'alloc': int(line[2]),
'free': int(line[3]),
'chkpoint': ZPoolCli.dash_to_none(line[4]) if len(line) == 11 else None,
'expandsz': ZPoolCli.dash_to_none(line[4 + offset].strip()),
'frag': int(line[5 + offset]),
'cap': float(line[6 + offset]),
'dedup': float(line[7 + offset]),
'health': ZPoolHealth.from_string(line[8 + offset].strip()),
'altroot': ZPoolCli.dash_to_none(line[9 + offset]),
}
if vdev_drives:
vdevs['members'] = vdev_drives
output[pool_name][state].append(vdevs)
return output
@staticmethod
def dash_to_none(data: str) -> Optional[str]:
if data and data != '-':
return data
return None

@ -0,0 +1,13 @@
'''
Native, ``libzfs_core``-based implementation.
'''
from typing import Optional
from .zpool import ZPool
class ZPoolNative(ZPool):
def __init__(self, *, metadata_namespace: Optional[str] = None, **kwargs) -> None:
super().__init__(metadata_namespace=metadata_namespace)

@ -0,0 +1,61 @@
import pytest
from simplezfs.types import DatasetType, PropertySource
class TestDatasetType:
@pytest.mark.parametrize('string,value', [('FiLeSet', DatasetType.FILESET), ('fileset', DatasetType.FILESET), ('vOlUMe', DatasetType.VOLUME), ('volume', DatasetType.VOLUME), ('SnapSHOT', DatasetType.SNAPSHOT), ('snapshot', DatasetType.SNAPSHOT), ('BOOKmark', DatasetType.BOOKMARK), ('bookmark', DatasetType.BOOKMARK)])
def test_from_string_valid(self, string, value):
'''
Tests that the from_string helper works.
'''
v = DatasetType.from_string(string)
assert isinstance(v, DatasetType)
assert v.value == string.lower()
@pytest.mark.parametrize('string', [' fileset', 'dateiset', 'file set', 'data\0set', '', ' '])
def test_from_string_invalid(self, string):
'''
Tests if it raises an exception if the value is invalid.
'''
with pytest.raises(ValueError) as excinfo:
DatasetType.from_string(string)
assert 'not a valid DatasetType' in str(excinfo.value)
def test_from_string_None(self):
'''
Tests that it properly fails with None as input.
'''
with pytest.raises(ValueError) as excinfo:
DatasetType.from_string(None)
assert 'only string' in str(excinfo.value)
class TestPropertySource:
@pytest.mark.parametrize('string,value', [('default', PropertySource.DEFAULT), ('DeFAULT', PropertySource.DEFAULT), ('inheriteD', PropertySource.INHERITED), ('inherited', PropertySource.INHERITED), ('TEMPORARY', PropertySource.TEMPORARY), ('temporary', PropertySource.TEMPORARY), ('rEcEiVeD', PropertySource.RECEIVED), ('received', PropertySource.RECEIVED), ('None', PropertySource.NONE), ('none', PropertySource.NONE)])
def test_from_string_valid(self, string, value):
'''
Test that the from_string helper works.
'''
v = PropertySource.from_string(string)
assert isinstance(v, PropertySource)
assert v.value == string.lower()
@pytest.mark.parametrize('string', ['asd', '', ' ', 'default\0', 'defaultdefault', 'Normal'])
def test_from_string_invalid(self, string):
'''
Tests that it raises an exception if the value is invalid.
'''
with pytest.raises(ValueError) as excinfo:
PropertySource.from_string(string)
assert 'not a valid PropertySource' in str(excinfo.value)
def test_from_string_None(self):
'''
Tests that it properly fails with None as input.
'''
with pytest.raises(ValueError) as excinfo:
PropertySource.from_string(None)
assert 'only string' in str(excinfo.value)

@ -0,0 +1,264 @@
'''
Tests the validation functions.
'''
import pytest
from simplezfs.exceptions import ValidationError
from simplezfs.validation import (
validate_dataset_name,
validate_dataset_path,
validate_metadata_property_name,
validate_native_property_name,
validate_pool_name,
validate_property_value,
)
class TestPoolName:
'''
Tests the function ``validate_pool_name``.
'''
@pytest.mark.parametrize('name', ['a', 'aa', 'aaa', 'a' * 20, 'a123', 'a ', 'a 1 2 _ : .. ::', 'a:a'])
def test_valid_name(self, name):
'''
Tests a set of known good combinations.
'''
validate_pool_name(name)
@pytest.mark.parametrize('name', [' ', ' a', 'ä', '', 'aaaaaa→', '\0', '\n', '\t'])
def test_invalid_name(self, name):
'''
Tests a set of known bad combinations.
'''
with pytest.raises(ValidationError) as excinfo:
validate_pool_name(name)
assert 'malformed name' in str(excinfo.value)
@pytest.mark.parametrize('name', ['mirror', 'raidz', 'spare', 'log'])
def test_invalid_reserved_keyword(self, name):
'''
Tests with reserved keywords.
'''
with pytest.raises(ValidationError) as excinfo:
validate_pool_name(name)
assert 'reserved name' in str(excinfo.value)
@pytest.mark.parametrize('name', ['mirrored', 'mirror:ed', 'spared', 'spare:d', 'raidzfun', 'raidz fun'])
def test_invalid_begins_with_reserved_keyword(self, name):
'''
Tests with strings that are known to be reserved starts of the name.
'''
with pytest.raises(ValidationError) as excinfo:
validate_pool_name(name)
assert 'starts with invalid token' in str(excinfo.value)
def test_valid_keyword_begin_extra(self):
'''
Of the reserved keywords, 'log' is allowed as beginning of the pool name, check that it is allowed.
'''
validate_pool_name('logger')
@pytest.mark.parametrize('name', ['c0', 'c1', 'c0 ', 'c0d0', 'c0t0', 'c999', 'c9 9asd .-:'])
def test_invalid_solaris_disk_names_begin(self, name):
'''
Test with solaris disk names and similar names.
'''
with pytest.raises(ValidationError) as excinfo:
validate_pool_name(name)
assert 'begins with reserved sequence' in str(excinfo.value)
def test_too_short(self):
'''
Tests with a name that we know is too short.
'''
with pytest.raises(ValidationError) as excinfo:
validate_pool_name('')
assert 'too short' in str(excinfo.value)
# TODO test too long
class TestDatasetName:
'''
Tests the function ``validate_dataset_name``.
'''
@pytest.mark.parametrize('name', ['a', 'aa', '0', '0a', 'A', 'A0', 'qwertzuiop', 'a-', '-a', 'a.a', '.', 'a:', ':a', 'a:a', 'a::a', '842bf5a29bd55c12c20a8d1e73bdb5790e8ab804d857885e35e55be025acb6b2', '842bf5a29bd55c12c20a8d1e73bdb5790e8ab804d857885e35e55be025acb6b2-init', 'towel@20190525', 'page#42'])
def test_valid_name(self, name):
'''
Tests a set of known good combinations.
'''
validate_dataset_name(name)
@pytest.mark.parametrize('name', ['/a', '/', 'a/a', 'a/', 'a+', 'ä', '', '\0', '\n', 'towel@@20190525', 'towel@#42', 'page##42', 'page#@20190525'])
def test_invalid_name(self, name):
'''
Tests a set of known invalid combinations.
'''
with pytest.raises(ValidationError) as excinfo:
validate_dataset_name(name)
assert 'disallowed characters' in str(excinfo.value)
@pytest.mark.parametrize('name', ['a@a', 'aasdf@1234', 'a#a', 'a#123'])
def test_invalid_name_strict(self, name):
'''
Tests with strict=True, which disallows snapshot or bookmark identifiers.
'''
with pytest.raises(ValidationError) as excinfo:
validate_dataset_name(name, strict=True)
assert 'not allowed in strict' in str(excinfo.value)
# TODO trigger UnicodeEncodeError!
def test_invalid_name_length_short(self):
'''
Tests the behaviour if the name is too short
'''
with pytest.raises(ValidationError) as excinfo:
validate_dataset_name('')
assert 'too short' in str(excinfo.value)
def test_invalid_length_long(self):
'''
Providing a very long name, it should bail out.
'''
with pytest.raises(ValidationError) as excinfo:
validate_dataset_name('a' * 1024)
assert 'length >' in str(excinfo.value)
class TestDatasetPath:
'''
Tests the function ``validate_dataset_path``.
'''
@pytest.mark.parametrize('path', ['a/a', 'a/a/a', 'a/b/c', 'asdf/qwer/yxcv', 'a/a@a', 'a/a#a'])
def test_valid_path(self, path):
'''
Tests a set of known good combinations.
'''
validate_dataset_path(path)
@pytest.mark.parametrize('path', ['a', '/a', 'a/', '/a/', '/aaaaa/', 'a/a a', 'a@a/a', 'a/a#a/a', 'a/a@a/a#a'])
def test_invalid_path(self, path):
'''
Tests a set of known bad combinations.
'''
with pytest.raises(ValidationError):
validate_dataset_path(path)
@pytest.mark.parametrize('path', ['asdf', 'asdf@yesterday', 'asdf#tomorrow'])
def test_invalid_path_no_slash(self, path):
'''
Tests the behaviour if no slash is found, making it a dataset name.
'''
with pytest.raises(ValidationError) as excinfo:
validate_dataset_path(path)
assert 'Not a path' in str(excinfo.value)
# TODO tests for specific errors passed from the validation functions for pool and dataset name
class TestNativePropertyName:
'''
Tests the function ``validate_native_property_name``.
'''
@pytest.mark.parametrize('name', ['a', 'aa', 'a0', 'a0a', 'asdfghjkl'])
def test_valid_name(self, name):
'''
Tests a set of known good combinations.
'''
validate_native_property_name(name)
@pytest.mark.parametrize('name', ['0', '0a', 'A', 'AA', '-', 'a-', 'a-a', '-a', '_', 'a_', 'a_a', '_a', ':', 'a:', 'a:a', ':a', '\0'])
def test_invalid_name(self, name):
'''
Tests a set of known invalid combinations.
'''
with pytest.raises(ValidationError) as excinfo:
validate_native_property_name(name)
assert 'does not match' in str(excinfo.value)
# TODO trigger UnicodeEncodeError!
def test_invalid_length_short(self):
'''
Tests the behaviour if the name is too short.
'''
with pytest.raises(ValidationError) as excinfo:
validate_native_property_name('')
assert 'too short' in str(excinfo.value)
def test_invalid_length_long(self):
'''
Provided a very long name, it should bail out.
'''
with pytest.raises(ValidationError) as excinfo:
validate_native_property_name('a' * 1024)
assert 'length >' in str(excinfo.value)
class TestMetadataPropertyName:
'''
Tests the function ``validate_metadata_property_name``.
'''
@pytest.mark.parametrize('name', [':a', 'a:a', 'a:0', 'a:0:a', ':a:s:d:f:g:h:j:k::l', ':-', 'a-:-a'])
def test_valid_name(self, name):
'''
Tests a set of known good combinations.
'''
validate_metadata_property_name(name)
@pytest.mark.parametrize('name', ['0', '0a', 'A', 'AA', '-', 'a-', 'a-a', '-a', '_', 'a_', 'a_a', '_a', ':', 'a:', '\0', 'a+:a'])
def test_invalid_name(self, name):
'''
Tests a set of known invalid combinations.
'''
with pytest.raises(ValidationError) as excinfo:
validate_metadata_property_name(name)
assert 'does not match' in str(excinfo.value)
# TODO trigger UnicodeEncodeError!
def test_invalid_length_short(self):
'''
Tests the behaviour if the name is too short.
'''
with pytest.raises(ValidationError) as excinfo:
validate_metadata_property_name('')
assert 'too short' in str(excinfo.value)
def test_invalid_length_long(self):
'''
Provided a very long name, it should bail out.
'''
with pytest.raises(ValidationError) as excinfo:
validate_metadata_property_name('a' * 1024)
assert 'length >' in str(excinfo.value)
class TestPropertyValue:
'''
Tests the function ``validate_property_value``.
'''
@pytest.mark.parametrize('value', ['a', '1', '1TB', '1ZB', '99 red baloons', 'asd 123'])
def test_value_valid(self, value):
'''
Tests a set of known good combinations.
'''
validate_property_value(value)
def test_value_valid_long(self):
'''
Tests with a value that is long, but still valid.
'''
validate_property_value('x' * 8191)
def test_value_too_long(self):
'''
Tests with a value that is too long
'''
with pytest.raises(ValidationError) as excinfo:
validate_property_value('x' * 8192)
assert 'length >' in str(excinfo.value)

@ -0,0 +1,378 @@
'''
Tests the main ZFS class, non-destructive version.
'''
from unittest.mock import patch
import pytest # type: ignore
from simplezfs.exceptions import ValidationError
from simplezfs.zfs import ZFS, get_zfs
from simplezfs.zfs_cli import ZFSCli
from simplezfs.zfs_native import ZFSNative
class TestZFS:
def test_init_noparam(self):
instance = ZFS() # noqa: F841
def test_get_metadata_namespace(self):
zfs = ZFS(metadata_namespace='pytest')
assert zfs.metadata_namespace == 'pytest'
def test_set_metadata_namespace(self):
zfs = ZFS(metadata_namespace='prod')
zfs.metadata_namespace = 'pytest'
assert zfs.metadata_namespace == 'pytest'
##########################################################################
def test_get_property_notimplemented(self):
zfs = ZFS()
with pytest.raises(NotImplementedError):
zfs.get_property('tank/test1', 'compression')
def test_get_property_all(self):
'''
Uses "all" and expects a ValidationError.
'''
zfs = ZFS()
with pytest.raises(ValidationError) as excinfo:
zfs.get_property('tank/test', 'all')
assert 'get_properties' in str(excinfo.value)
def test_get_property_all_meta(self):
'''
Tests that <namespace>:all is allowed.
'''
def mock_get_property(myself, dataset, key, is_metadata):
assert dataset == 'tank/test'
assert key == 'testns:all'
assert is_metadata == True
with patch.object(ZFS, '_get_property', new=mock_get_property):
zfs = ZFS(metadata_namespace='testns')
zfs.get_property('tank/test', 'all', metadata=True)
def test_property_dataset_validation_unhappy(self):
def mock_get_property(myself, dataset, key, is_metadata):
assert False, 'this should not have been called'
with patch.object(ZFS, '_get_property', new=mock_get_property):
zfs = ZFS()
with pytest.raises(ValidationError) as excinfo:
zfs.get_property('tan#k/test', 'compression')
def test_get_property_meta_nooverwrite_invalidns_unhappy(self):
'''
Tests the validation of the metadata namespace, coming from the ctor.
'''
def mock_get_property(myself, dataset, key, is_metadata):
assert False, 'This should not have been called'
with patch.object(ZFS, '_get_property', new=mock_get_property):
zfs = ZFS(metadata_namespace=' ')
with pytest.raises(ValidationError) as excinfo:
zfs.get_property('tank/test', 'test', metadata=True)
assert 'not match' in str(excinfo.value)
def test_get_property_meta_overwrite_invalidns_unhappy(self):
'''
Tests the validation of the metadata namespace, coming from overwrite parameter.
'''
def mock_get_property(myself, dataset, key, is_metadata):
assert False, 'This should not have been called'
with patch.object(ZFS, '_get_property', new=mock_get_property):
zfs = ZFS(metadata_namespace='pytest')
with pytest.raises(ValidationError) as excinfo:
zfs.get_property('tank/test', 'test', metadata=True, overwrite_metadata_namespace=' ')
assert 'not match' in str(excinfo.value)
def test_get_property_poolname_nometa_happy(self):
'''
Tests that the name of the pool (first or root dataset) can be queried.
'''
def mock_get_property(myself, dataset, key, is_metadata):
assert dataset == 'tank'
assert key == 'compression'
assert is_metadata == False
with patch.object(ZFS, '_get_property', new=mock_get_property):
zfs = ZFS()
zfs.get_property('tank', 'compression')
def test_get_property_poolname_invalid_nometa_unhappy(self):
'''
Tests the pool name validation.
'''
def mock_get_property(myself, dataset, key, is_meta):
assert False, 'This should not have been called'
with patch.object(ZFS, '_get_property', new=mock_get_property):
zfs = ZFS()
with pytest.raises(ValidationError) as excinfo:
zfs.get_property(' ', 'compression')
assert 'malformed' in str(excinfo.value)
def test_get_property_nometa_happy(self):
def mock_get_property(myself, dataset: str, key: str, is_metadata: bool):
assert dataset == 'tank/test'
assert key == 'compression'
assert is_metadata == False
with patch.object(ZFS, '_get_property', new=mock_get_property):
zfs = ZFS()
zfs.get_property('tank/test', 'compression')
def test_get_property_meta_ns_happy(self):
def mock_get_property(myself, dataset, key, is_metadata):
assert dataset == 'tank/test'
assert key == 'testns:testprop'
assert is_metadata == True
with patch.object(ZFS, '_get_property', new=mock_get_property):
zfs = ZFS(metadata_namespace='testns')
zfs.get_property('tank/test', 'testprop', metadata=True)
def test_get_property_meta_nsoverwrite_happy(self):
def mock_get_property(myself, dataset, key, is_metadata):
assert dataset == 'tank/test'
assert key == 'testns:testprop'
assert is_metadata == True
with patch.object(ZFS, '_get_property', new=mock_get_property):
zfs = ZFS(metadata_namespace='pytest')
zfs.get_property('tank/test', 'testprop', metadata=True, overwrite_metadata_namespace='testns')
def test_get_property_nometa_nons_unhappy(self):
'''
Tests that it bails out if no metadata namespace has been set and none is given.
'''
def mock_get_property(myself, dataset, key, is_metadata):
assert False, 'This should not have been called'
with patch.object(ZFS, '_get_property', new=mock_get_property):
zfs = ZFS()
assert zfs.metadata_namespace is None
with pytest.raises(ValidationError) as excinfo:
zfs.get_property('tank/test', 'testprop', metadata=True)
assert 'no metadata namespace set' == str(excinfo.value)
def test_get_property_meta_ns_noflag_invalid(self):
'''
Tests that it bails out if the syntax indicates a metadata property is requested but metadata flag is false.
'''
def mock_get_property(myself, dataset, key, is_metadata):
assert False, 'This should not have been called'
with patch.object(ZFS, '_get_property', new=mock_get_property):
zfs = ZFS(metadata_namespace='testns')
with pytest.raises(ValidationError) as excinfo:
zfs.get_property('tank/test', 'testns:testprop', metadata=False)
##########################################################################
def test_get_properties_notimplemented(self):
zfs = ZFS()
with pytest.raises(NotImplementedError):
zfs.get_properties('tank/test')
def test_get_properties_poolname_nometa_happy(self):
def mock_get_properties(myself, dataset, include_metadata):
assert dataset == 'tank'
assert include_metadata == False
with patch.object(ZFS, '_get_properties', new=mock_get_properties):
zfs = ZFS()
zfs.get_properties('tank')
def test_get_properties_dataset_unhappy(self):
'''
Tests that it validates the dataset name
'''
def mock_get_properties(myself, dataset, include_metadata):
assert False, 'this should not have been called'
with patch.object(ZFS, '_get_properties', new=mock_get_properties):
zfs = ZFS()
with pytest.raises(ValidationError) as excinfo:
zfs.get_properties('as#df/tank')
##########################################################################
def test_set_property_notimplemented(self):
zfs = ZFS()
with pytest.raises(NotImplementedError):
zfs.set_property('tank/test1', 'compression', 'lz4')
def test_set_property_all_nometa_unhappy(self):
zfs = ZFS()
with pytest.raises(ValidationError) as excinfo:
zfs.set_property('tank/test', 'all', 'test')
assert 'valid property name' in str(excinfo.value)
def test_set_property_all_meta_happy(self):
def mock_set_property(myself, dataset, key, value, is_metadata):
assert 'tank/test' == dataset
assert 'testns:all' == key
assert 'test' == value
assert is_metadata
with patch.object(ZFS, '_set_property', new=mock_set_property):
zfs = ZFS(metadata_namespace='testns')
zfs.set_property('tank/test', 'all', 'test', metadata=True)
def test_set_property_dataset_validation_unhappy(self):
def mock_set_property(myself, dataset, key, value, is_metadata):
assert False, 'This should not have been called'
with patch.object(ZFS, '_set_property', new=mock_set_property):
zfs = ZFS()
with pytest.raises(ValidationError) as excinfo:
zfs.set_property('ta#nk/test', 'compression', 'lz4')
def test_set_property_meta_nooverwrite_invalidns_unhappy(self):
'''
Tests the validation of the metadata namespace, coming from the ctor.
'''
def mock_set_property(myself, dataset, key, value, is_metadata):
assert False, 'This should not have been called'
with patch.object(ZFS, '_set_property', new=mock_set_property):
zfs = ZFS(metadata_namespace=' ')
with pytest.raises(ValidationError) as excinfo:
zfs.set_property('tank/test', 'test', 'test', metadata=True)
assert 'not match' in str(excinfo.value)
def test_set_property_meta_overwrite_invalidns_unhappy(self):
def mock_set_property(myself, dataset, key, value, is_metadata):
assert False, 'This should not have been called'
with patch.object(ZFS, '_set_property', new=mock_set_property):
zfs = ZFS(metadata_namespace='pytest')
with pytest.raises(ValidationError) as excinfo:
zfs.set_property('tank/test', 'test', 'test', metadata=True, overwrite_metadata_namespace=' ')
assert 'not match' in str(excinfo.value)
def test_set_property_poolname_nometa_happy(self):
def mock_set_property(myself, dataset, key, value, is_metadata):
assert dataset == 'tank'
assert key == 'compression'
assert value == 'lz4'
assert not is_metadata
with patch.object(ZFS, '_set_property', new=mock_set_property):
zfs = ZFS()
zfs.set_property('tank', 'compression', 'lz4')
def test_set_property_poolname_invalid_nometa_unhappy(self):
def mock_set_property(myself, dataset, key, value, is_metadata):
assert False, 'This should not have been called'
with patch.object(ZFS, '_set_property', new=mock_set_property):
zfs = ZFS()
with pytest.raises(ValidationError) as excinfo:
zfs.set_property(' ', 'compression', 'lz4')
assert 'malformed' in str(excinfo.value)
def test_set_property_nometa_happy(self):
def mock_set_property(myself, dataset, key, value, is_metadata):
assert dataset == 'tank/test'
assert key == 'compression'
assert value == 'lz4'
assert not is_metadata
with patch.object(ZFS, '_set_property', new=mock_set_property):
zfs = ZFS()
zfs.set_property('tank/test', 'compression', 'lz4')
def test_set_property_meta_ns_happy(self):
def mock_set_property(myself, dataset, key, value, is_metadata):
assert dataset == 'tank/test'
assert key == 'testns:testprop'
assert value == 'testval'
assert is_metadata
with patch.object(ZFS, '_set_property', new=mock_set_property):
zfs = ZFS(metadata_namespace='testns')
zfs.set_property('tank/test', 'testprop', 'testval', metadata=True)
def test_set_property_meta_nsoverwrite_happy(self):
def mock_set_property(myself, dataset, key, value, is_metadata):
assert dataset == 'tank/test'
assert key == 'testns:testprop'
assert value == 'testval'
assert is_metadata
with patch.object(ZFS, '_set_property', new=mock_set_property):
zfs = ZFS(metadata_namespace='pytest')
zfs.set_property('tank/test', 'testprop', 'testval', metadata=True, overwrite_metadata_namespace='testns')
def test_set_property_nometa_nons_unhappy(self):
def mock_set_property(myself, dataset, key, value, is_metadata):
assert False, 'This should not have been called'
with patch.object(ZFS, '_set_property', new=mock_set_property):
zfs = ZFS()
assert zfs.metadata_namespace is None
with pytest.raises(ValidationError) as excinfo:
zfs.set_property('tank/test', 'testprop', 'testval', metadata=True)
assert 'no metadata namespace set' in str(excinfo.value)
def test_set_property_meta_ns_noflag_inalid(self):
def mock_set_property(myself, dataset, key, value, is_metadata):
assert False, 'This should not have been called'
with patch.object(ZFS, '_set_property', new=mock_set_property):
zfs = ZFS(metadata_namespace='testns')
with pytest.raises(ValidationError) as excinfo:
zfs.set_property('tank/test', 'testns:testprop', 'testval', metadata=False)
##########################################################################
def test_notimplemented(self):
zfs = ZFS()
with pytest.raises(NotImplementedError):
zfs.list_datasets()
with pytest.raises(NotImplementedError):
zfs.create_snapshot('tank', 'test1')
with pytest.raises(NotImplementedError):
zfs.create_bookmark('tank', 'test2')
with pytest.raises(NotImplementedError):
zfs.create_fileset('tank/test3')
with pytest.raises(NotImplementedError):
zfs.create_volume('tank/test4', size=100)
with pytest.raises(NotImplementedError):
zfs.create_dataset('tank/test5')
with pytest.raises(NotImplementedError):
zfs.destroy_dataset('tank/test6')
##########################################################################
class TestZFSGetZFS:
def test_get_zfs_default(self):
# TODO we might need to mock the shutils.which
zfs = get_zfs()
assert type(zfs) == ZFSCli
assert zfs.metadata_namespace == None
def test_get_zfs_cli(self):
zfs = get_zfs('cli')
assert type(zfs) == ZFSCli
def test_get_zfs_cli_args(self):
zfs = get_zfs('cli', metadata_namespace='asdf', zfs_exe='/bin/true')
assert type(zfs) == ZFSCli
assert zfs.metadata_namespace == 'asdf'
assert zfs.executable == '/bin/true'
def test_get_zfs_native(self):
zfs = get_zfs('native')
assert type(zfs) == ZFSNative
def test_get_zfs_invalid(self):
with pytest.raises(NotImplementedError):
get_zfs('testing')

@ -0,0 +1,247 @@
'''
Tests the ZFSCli class, non-distructive version.
'''
from unittest.mock import patch, PropertyMock
import pytest
import subprocess
from simplezfs.exceptions import ValidationError
from simplezfs.types import Dataset, DatasetType
from simplezfs.validation import validate_dataset_path
from simplezfs.zfs_cli import ZFSCli
class TestZFSCli:
def test_init_noparam(self):
instance = ZFSCli() # noqa: F841
########################
@patch('simplezfs.zfs_cli.ZFSCli.is_zvol')
@pytest.mark.parametrize('identifier,name,parent,dstype,pool', [
('pool/test', 'test', 'pool', DatasetType.FILESET, 'pool'),
('pool/test@st', 'test@st', 'pool', DatasetType.SNAPSHOT, 'pool'),
('pool/test1/test@snap-12', 'test@snap-12', 'pool/test1', DatasetType.SNAPSHOT, 'pool'),
('tank/test#bm1', 'test#bm1', 'tank', DatasetType.BOOKMARK, 'tank'),
('tank/test1/test#bmark-12', 'test#bmark-12', 'tank/test1', DatasetType.BOOKMARK, 'tank'),
('pool/test2', 'test2', 'pool', DatasetType.VOLUME, 'pool'),
('pool/test2/test', 'test', 'pool/test2', DatasetType.VOLUME, 'pool'),
])
def test_parse_dataset_identifier_valid(self, is_zvol, identifier, name, parent, dstype, pool):
'''
Tests the happy path.
'''
validate_dataset_path(identifier)
is_zvol.return_value = dstype == DatasetType.VOLUME
ds = ZFSCli.parse_dataset_identifier(identifier)
assert isinstance(ds, Dataset)
assert ds.name == name
assert ds.parent == parent
assert ds.type == dstype
assert ds.full_path == identifier
assert ds.pool == pool
@pytest.mark.parametrize('identifier', [' /asd', ' /asd', '\0/asd', 'mirrored/asd', 'raidz fun/asd'])
def test_parse_dataset_identifier_invalid(self, identifier):
with pytest.raises(ValidationError):
ZFSCli.parse_dataset_identifier(identifier)
######################
@patch('os.path.exists')
def test_is_zvol_ok_exists(self, exists):
exists.return_value = True
assert ZFSCli.is_zvol('newpool/newvol')
@patch('os.path.exists')
def test_is_zvol_ok_not_exists(self, exists):
exists.return_value = False
assert not ZFSCli.is_zvol('newpool/newfileset')
@patch('os.path.exists')
def test_is_zvol_ok_not_exists_pool(self, exists):
'''
Tests that is_zvol can cope with pool-level filesets
'''
exists.return_value = False
assert not ZFSCli.is_zvol('newpool')
##########################################################################
@patch('shutil.which')
def test_find_executable_parameter(self, which):
which.return_value = None
zfs = ZFSCli(zfs_exe='asdf')
assert zfs.executable == 'asdf'
@patch('shutil.which')
def test_find_executable_path(self, which):
which.return_value = 'test_return'
zfs = ZFSCli()
assert zfs.executable == 'test_return'
@patch('shutil.which')
def test_find_executable_path_fail(self, which):
which.return_value = None
with pytest.raises(OSError) as excinfo:
zfs = ZFSCli()
assert 'not find executable' in str(excinfo.value)
##########################################################################
@patch('subprocess.run')
def test_list_dataset_noparent_happy(self, subproc):
test_stdout = '''tank 213G 13.3G 96K none
tank/system 128G 13.3G 96K none
tank/system/home 86.6G 13.3G 86.6G /home
tank/system/root 14.9G 13.3G 14.9G /'''
subproc.return_value = subprocess.CompletedProcess(args=[], returncode=0, stdout=test_stdout, stderr='')
zfs = ZFSCli(zfs_exe='/bin/true')
lst = zfs.list_datasets()
subproc.assert_called_once()
assert ['/bin/true', 'list', '-H', '-r', '-t', 'all'] == subproc.call_args[0][0]
assert len(lst) == 4
assert lst[0].pool == 'tank'
assert lst[0].parent is None
assert lst[1].name == 'system'
assert lst[1].parent == 'tank'
assert lst[1].type == DatasetType.FILESET
assert lst[3].name == 'root'
assert lst[3].full_path == 'tank/system/root'
@patch('subprocess.run')
def test_list_dataset_parent_pool_str_happy(self, subproc):
test_stdout = '''tank 213G 13.3G 96K none
tank/system 128G 13.3G 96K none
tank/system/home 86.6G 13.3G 86.6G /home
tank/system/root 14.9G 13.3G 14.9G /'''
subproc.return_value = subprocess.CompletedProcess(args=[], returncode=0, stdout=test_stdout, stderr='')
zfs = ZFSCli(zfs_exe='/bin/true')
lst = zfs.list_datasets(parent='tank')
subproc.assert_called_once()
assert ['/bin/true', 'list', '-H', '-r', '-t', 'all', 'tank'] == subproc.call_args[0][0]
assert len(lst) == 4
assert lst[0].pool == 'tank'
assert lst[0].parent is None
assert lst[1].name == 'system'
assert lst[1].parent == 'tank'
assert lst[1].type == DatasetType.FILESET
assert lst[3].name == 'root'
assert lst[3].full_path == 'tank/system/root'
@patch('subprocess.run')
def test_list_dataset_parent_pool_dataset_happy(self, subproc):
'''
Supplies a dataset as parent.
'''
test_stdout = '''tank 213G 13.3G 96K none
tank/system 128G 13.3G 96K none
tank/system/home 86.6G 13.3G 86.6G /home
tank/system/root 14.9G 13.3G 14.9G /'''
subproc.return_value = subprocess.CompletedProcess(args=[], returncode=0, stdout=test_stdout, stderr='')
zfs = ZFSCli(zfs_exe='/bin/true')
lst = zfs.list_datasets(parent=Dataset(pool='tank', name='system', full_path='tank', parent='tank', type=DatasetType.FILESET))
subproc.assert_called_once()
assert ['/bin/true', 'list', '-H', '-r', '-t', 'all', 'tank'] == subproc.call_args[0][0]
assert len(lst) == 4
assert lst[0].pool == 'tank'
assert lst[0].parent is None
assert lst[1].name == 'system'
assert lst[1].parent == 'tank'
assert lst[1].type == DatasetType.FILESET
assert lst[3].name == 'root'
assert lst[3].full_path == 'tank/system/root'
@patch('subprocess.run')
def test_list_dataset_parent_fileset_str_happy(self, subproc):
'''
Specifies a parent as a string.
'''
test_stdout = '''tank/system 128G 13.3G 96K none
tank/system/home 86.6G 13.3G 86.6G /home
tank/system/root 14.9G 13.3G 14.9G /'''
subproc.return_value = subprocess.CompletedProcess(args=[], returncode=0, stdout=test_stdout, stderr='')
zfs = ZFSCli(zfs_exe='/bin/true')
lst = zfs.list_datasets(parent='tank/system')
subproc.assert_called_once()
assert ['/bin/true', 'list', '-H', '-r', '-t', 'all', 'tank/system'] == subproc.call_args[0][0]
assert len(lst) == 3
assert lst[0].name == 'system'
assert lst[0].parent == 'tank'
assert lst[0].type == DatasetType.FILESET
assert lst[2].name == 'root'
assert lst[2].full_path == 'tank/system/root'
@patch('subprocess.run')
def test_list_dataset_parent_fileset_dataset_happy(self, subproc):
'''
Specifies a parent as a dataset.
'''
test_stdout = '''tank/system 128G 13.3G 96K none
tank/system/home 86.6G 13.3G 86.6G /home
tank/system/root 14.9G 13.3G 14.9G /'''
subproc.return_value = subprocess.CompletedProcess(args=[], returncode=0, stdout=test_stdout, stderr='')
zfs = ZFSCli(zfs_exe='/bin/true')
lst = zfs.list_datasets(parent=Dataset(pool='tank', full_path='tank/system', name='system', parent='tank', type=DatasetType.FILESET))
subproc.assert_called_once()
assert ['/bin/true', 'list', '-H', '-r', '-t', 'all', 'tank/system'] == subproc.call_args[0][0]
assert len(lst) == 3
assert lst[0].name == 'system'
assert lst[0].parent == 'tank'
assert lst[0].type == DatasetType.FILESET
assert lst[2].name == 'root'
assert lst[2].full_path == 'tank/system/root'
@patch('subprocess.run')
def test_list_dataset_cmd_error_noparent(self, subproc):
def mock_handle_command_error(myself, proc, dataset = None):
assert type(proc) == subprocess.CompletedProcess
assert proc.returncode == 42
assert proc.stderr == 'test'
assert dataset is None
raise Exception('test')
subproc.return_value = subprocess.CompletedProcess(args=[], returncode=42, stdout='', stderr='test')
with patch.object(ZFSCli, 'handle_command_error', new=mock_handle_command_error):
zfs = ZFSCli(zfs_exe='/bin/true')
with pytest.raises(Exception) as excinfo:
zfs.list_datasets()
assert 'test' == str(excinfo.value)
@patch('subprocess.run')
def test_list_dataset_cmd_error_parent(self, subproc):
def mock_handle_command_error(myself, proc, dataset):
assert type(proc) == subprocess.CompletedProcess
assert proc.returncode == 42
assert proc.stderr == 'test'
assert dataset == 'tank/test'
raise Exception('test')
subproc.return_value = subprocess.CompletedProcess(args=[], returncode=42, stdout='', stderr='test')
with patch.object(ZFSCli, 'handle_command_error', new=mock_handle_command_error):
zfs = ZFSCli(zfs_exe='/bin/true')
with pytest.raises(Exception) as excinfo:
zfs.list_datasets(parent='tank/test')
assert 'test' == str(excinfo.value)
##########################################################################
##########################################################################

@ -0,0 +1,52 @@
'''
Tests the ZPoolCli class, non destructive version.
'''
from unittest.mock import patch
import pytest
from simplezfs.zpool_cli import ZPoolCli
class TestZPoolCli:
@patch('shutil.which')
def test_init_noparam(self, which):
which.return_value = '/bin/true'
ZPoolCli()
##########################################################################
@patch('shutil.which')
def test_find_executable_parameter(self, which):
which.return_value = None
zpool = ZPoolCli(zpool_exe='asdf')
assert zpool.executable == 'asdf'
@patch('shutil.which')
def test_find_executable_path(self, which):
which.return_value = 'test_return'
zpool = ZPoolCli()
assert zpool.executable == 'test_return'
@patch('shutil.which')
def test_find_executable_path_fail(self, which):
which.return_value = None
with pytest.raises(OSError) as excinfo:
ZPoolCli()
assert 'not find the executable' in str(excinfo.value)
##########################################################################
@pytest.mark.parametrize('data,expected', [('-', None), (None, None), ('asdf', 'asdf')])
def test_dash_to_none(self, data, expected):
assert ZPoolCli.dash_to_none(data) == expected
##########################################################################
##########################################################################
##########################################################################
Loading…
Cancel
Save