from __future__ import division, print_function
import logging
import pyvips
from pyvips import ffi, vips_lib, Error, _to_bytes, _to_string, GValue, \
type_map, type_from_name, nickname_find, at_least_libvips
logger = logging.getLogger(__name__)
# values for VipsArgumentFlags
_REQUIRED = 1
_CONSTRUCT = 2
_SET_ONCE = 4
_SET_ALWAYS = 8
_INPUT = 16
_OUTPUT = 32
_DEPRECATED = 64
_MODIFY = 128
# for VipsOperationFlags
_OPERATION_DEPRECATED = 8
[docs]class Introspect(object):
"""Build introspection data for operations.
Make an operation, introspect it, and build a structure representing
everything we know about it.
"""
__slots__ = ('description', 'flags', 'details',
'required_input', 'optional_input',
'required_output', 'optional_output',
'doc_optional_input', 'doc_optional_output',
'member_x', 'method_args')
def __init__(self, operation_name):
op = Operation.new_from_name(operation_name)
self.description = op.get_description()
self.flags = vips_lib.vips_operation_get_flags(op.pointer)
# build a list of constructor arg [name, flags] pairs in arg order
arguments = []
def add_args(name, flags):
if (flags & _CONSTRUCT) != 0:
# libvips uses '-' to separate parts of arg names, but we
# need '_' for Python
name = name.replace('-', '_')
arguments.append([name, flags])
if at_least_libvips(8, 7):
p_names = ffi.new('char**[1]')
p_flags = ffi.new('int*[1]')
p_n_args = ffi.new('int[1]')
result = vips_lib.vips_object_get_args(op.vobject,
p_names, p_flags, p_n_args)
if result != 0:
raise Error('unable to get arguments from operation')
p_names = p_names[0]
p_flags = p_flags[0]
n_args = p_n_args[0]
for i in range(0, n_args):
add_args(_to_string(p_names[i]), p_flags[i])
else:
def add_construct(self, pspec, argument_class,
argument_instance, a, b):
add_args(_to_string(pspec.name), argument_class.flags)
return ffi.NULL
cb = ffi.callback('VipsArgumentMapFn', add_construct)
vips_lib.vips_argument_map(op.vobject, cb, ffi.NULL, ffi.NULL)
# logger.debug('arguments = %s', self.arguments)
# build a hash from arg name to detailed arg information
self.details = {}
for name, flags in arguments:
self.details[name] = {
"name": name,
"flags": flags,
"blurb": op.get_blurb(name),
"type": op.get_typeof(name)
}
# lists of arg names by category
self.required_input = []
self.optional_input = []
self.required_output = []
self.optional_output = []
# same, but with deprecated args filtered out ... this is the set we
# show in documentation
self.doc_optional_input = []
self.doc_optional_output = []
for name, flags in arguments:
if ((flags & _INPUT) != 0 and
(flags & _REQUIRED) != 0 and
(flags & _DEPRECATED) == 0):
self.required_input.append(name)
# required inputs which we MODIFY are also required outputs
if (flags & _MODIFY) != 0:
self.required_output.append(name)
if ((flags & _OUTPUT) != 0 and
(flags & _REQUIRED) != 0 and
(flags & _DEPRECATED) == 0):
self.required_output.append(name)
# deprecated optional args get on to the main arg lists, but are
# filtered from the documented set
if ((flags & _INPUT) != 0 and
(flags & _REQUIRED) == 0):
self.optional_input.append(name)
if (flags & _DEPRECATED) == 0:
self.doc_optional_input.append(name)
if ((flags & _OUTPUT) != 0 and
(flags & _REQUIRED) == 0):
self.optional_output.append(name)
if (flags & _DEPRECATED) == 0:
self.doc_optional_output.append(name)
# find the first required input image arg, if any ... that will be self
self.member_x = None
for name in self.required_input:
details = self.details[name]
if details['type'] == GValue.image_type:
self.member_x = name
break
# method args are required args, but without the image they are a
# method on
if self.member_x is not None:
self.method_args = list(self.required_input)
self.method_args.remove(self.member_x)
else:
self.method_args = self.required_input
# a hash mapping operation names to introspection data
_introspect_cache = {}
@classmethod
def get(cls, operation_name):
if operation_name not in cls._introspect_cache:
cls._introspect_cache[operation_name] = Introspect(operation_name)
return cls._introspect_cache[operation_name]
# search an array with a predicate, recursing into subarrays as we see them
# used to find the match_image for an operation
def _find_inside(pred, thing):
if pred(thing):
return thing
if isinstance(thing, list) or isinstance(thing, tuple):
for x in thing:
result = _find_inside(pred, x)
if result is not None:
return result
return None
[docs]class Operation(pyvips.VipsObject):
"""Call libvips operations.
This class wraps the libvips VipsOperation class.
"""
__slots__ = ()
# cache nickname -> docstring here
_docstring_cache = {}
def __init__(self, pointer):
# logger.debug('Operation.__init__: pointer = %s', pointer)
super(Operation, self).__init__(pointer)
@staticmethod
def new_from_name(operation_name):
vop = vips_lib.vips_operation_new(_to_bytes(operation_name))
if vop == ffi.NULL:
raise Error('no such operation {0}'.format(operation_name))
return Operation(vop)
[docs] def set(self, name, flags, match_image, value):
# if the object wants an image and we have a constant, _imageize it
#
# if the object wants an image array, _imageize any constants in the
# array
if match_image:
gtype = self.get_typeof(name)
if gtype == pyvips.GValue.image_type:
value = pyvips.Image._imageize(match_image, value)
elif gtype == pyvips.GValue.array_image_type:
value = [pyvips.Image._imageize(match_image, x)
for x in value]
# MODIFY args need to be copied before they are set
if (flags & _MODIFY) != 0:
# logger.debug('copying MODIFY arg %s', name)
# make sure we have a unique copy
value = value.copy().copy_memory()
super(Operation, self).set(name, value)
[docs] @staticmethod
def call(operation_name, *args, **kwargs):
"""Call a libvips operation.
Use this method to call any libvips operation. For example::
black_image = pyvips.Operation.call('black', 10, 10)
See the Introduction for notes on how this works.
"""
logger.debug('VipsOperation.call: operation_name = %s', operation_name)
# logger.debug('VipsOperation.call: args = %s, kwargs =%s',
# args, kwargs)
intro = Introspect.get(operation_name)
if len(intro.required_input) != len(args):
raise Error('{0} needs {1} arguments, but {2} given'
.format(operation_name,
len(intro.required_input),
len(args)))
op = Operation.new_from_name(operation_name)
# set any string options before any args so they can't be
# overridden
string_options = kwargs.pop('string_options', '')
if not op.set_string(string_options):
raise Error('unable to call {0}'.format(operation_name))
# the first image argument is the thing we expand constants to
# match ... look inside tables for images, since we may be passing
# an array of images as a single param
match_image = _find_inside(lambda x: isinstance(x, pyvips.Image),
args)
logger.debug('VipsOperation.call: match_image = %s', match_image)
# collect a list of all input references here
# we can't use a set because set elements are unique under "==", and
# Python checks memoryview equality with hash functions, not pointer
# equality
references = []
# does a list contain an element using pointer equality
# we can't use "in" since that uses "==", which means hash equality
def contains(array, x):
for y in array:
if id(x) == id(y):
return True
return False
def add_reference(x):
if isinstance(x, pyvips.Image):
for i in x._references:
if not contains(references, i):
references.append(i)
return False
# set required input args
for name, value in zip(intro.required_input, args):
_find_inside(add_reference, value)
op.set(name, intro.details[name]['flags'], match_image, value)
# set any optional args
for name in kwargs:
if (name not in intro.optional_input and
name not in intro.optional_output):
raise Error('{0} does not support optional argument {1}'
.format(operation_name, name))
value = kwargs[name]
details = intro.details[name]
if (details['flags'] & _DEPRECATED) != 0:
logger.info('%s argument %s is deprecated',
operation_name, name)
_find_inside(add_reference, value)
op.set(name, details['flags'], match_image, value)
# build operation
vop = vips_lib.vips_cache_operation_build(op.pointer)
if vop == ffi.NULL:
vips_lib.vips_object_unref_outputs(op.vobject)
raise Error('unable to call {0}'.format(operation_name))
op = Operation(vop)
# attach all input refs to output x
def set_reference(x):
if isinstance(x, pyvips.Image):
x._references.extend(references)
return False
# fetch required output args (plus modified input images)
result = []
for name in intro.required_output:
value = op.get(name)
_find_inside(set_reference, value)
result.append(value)
# fetch optional output args
opts = {}
for name in intro.optional_output:
if name in kwargs:
value = op.get(name)
_find_inside(set_reference, value)
opts[name] = value
if len(opts) > 0:
result.append(opts)
vips_lib.vips_object_unref_outputs(op.vobject)
if len(result) == 0:
result = None
elif len(result) == 1:
result = result[0]
logger.debug('VipsOperation.call: result = %s', result)
return result
[docs] @staticmethod
def generate_docstring(operation_name):
"""Make a google-style docstring.
This is used to generate help() output.
"""
# we cache these to save regeneration
if operation_name in Operation._docstring_cache:
return Operation._docstring_cache[operation_name]
intro = Introspect.get(operation_name)
if (intro.flags & _OPERATION_DEPRECATED) != 0:
raise Error('No such operator.',
'operator "{0}" is deprecated'.format(operation_name))
result = intro.description[0].upper() + intro.description[1:] + '.\n\n'
result += 'Example:\n'
result += ' ' + ', '.join(intro.required_output) + ' = '
if intro.member_x is not None:
result += intro.member_x + '.' + operation_name + '('
else:
result += 'pyvips.Image.' + operation_name + '('
args = []
args += intro.method_args
args += [x + '=' + GValue.gtype_to_python(intro.details[x]['type'])
for x in intro.doc_optional_input]
args += [x + '=bool'
for x in intro.doc_optional_output]
result += ", ".join(args) + ')\n'
def argstr(name):
details = intro.details[name]
return (u' {0} ({1}): {2}\n'.
format(name,
GValue.gtype_to_python(details['type']),
details['blurb']))
result += '\nReturns:\n'
for name in intro.required_output:
result += argstr(name)
result += '\nArgs:\n'
if intro.member_x is not None:
result += argstr(intro.member_x)
for name in intro.method_args:
result += argstr(name)
if len(intro.doc_optional_input) > 0:
result += '\nKeyword args:\n'
for name in intro.doc_optional_input:
result += argstr(name)
if len(intro.doc_optional_output) > 0:
result += '\nOther Parameters:\n'
for name in intro.doc_optional_output:
result += argstr(name)
result += '\nRaises:\n :class:`.Error`\n'
# add to cache to save building again
Operation._docstring_cache[operation_name] = result
return result
[docs] @staticmethod
def generate_sphinx(operation_name):
"""Make a sphinx-style docstring.
This is used to generate the off-line docs.
"""
intro = Introspect.get(operation_name)
if (intro.flags & _OPERATION_DEPRECATED) != 0:
raise Error('No such operator.',
'operator "{0}" is deprecated'.format(operation_name))
if intro.member_x is not None:
result = '.. method:: '
else:
result = '.. staticmethod:: '
args = []
args += intro.method_args
args += [x + '=' + GValue.gtype_to_python(intro.details[x]['type'])
for x in intro.doc_optional_input]
args += [x + '=bool'
for x in intro.doc_optional_output]
result += operation_name + '(' + ", ".join(args) + ')\n\n'
result += intro.description[0].upper() + \
intro.description[1:] + '.\n\n'
result += 'Example:\n'
result += ' '
if len(intro.required_output) > 0:
result += ', '.join(intro.required_output) + ' = '
if intro.member_x is not None:
result += intro.member_x + "." + operation_name + '('
else:
result += 'pyvips.Image.' + operation_name + '('
args = []
args += intro.method_args
args += [x + '=' + GValue.gtype_to_python(intro.details[x]['type'])
for x in intro.doc_optional_input]
result += ', '.join(args)
result += ')\n\n'
for name in intro.method_args + intro.doc_optional_input:
details = intro.details[name]
result += (':param {0}: {1}\n'.
format(name, details['blurb']))
result += (':type {0}: {1}\n'.
format(name, GValue.gtype_to_python(details['type'])))
for name in intro.doc_optional_output:
result += (':param {0}: enable output: {1}\n'.
format(name, intro.details[name]['blurb']))
result += (':type {0}: bool\n'.format(name))
output_types = [GValue.gtype_to_python(intro.details[name]['type'])
for name in intro.required_output]
if len(output_types) == 1:
output_type = output_types[0]
else:
output_type = 'list[' + ', '.join(output_types) + ']'
if len(intro.doc_optional_output) > 0:
output_types += ['Dict[str, mixed]']
output_type += ' or list[' + ', '.join(output_types) + ']'
result += ':rtype: ' + output_type + '\n'
result += ':raises Error:\n'
return result
[docs] @staticmethod
def generate_sphinx_all():
"""Generate sphinx documentation.
This generates a .rst file for all auto-generated image methods. Use it
to regenerate the docs with something like::
$ python -c \
"import pyvips; pyvips.Operation.generate_sphinx_all()" > x
And copy-paste the file contents into doc/vimage.rst in the appropriate
place.
"""
# these names are aliased
alias = ["crop"]
alias_gtypes = {}
for name in alias:
gtype = pyvips.type_find("VipsOperation", name)
alias_gtypes[gtype] = name
# all names we can generate docstrings for
all_names = []
def add_name(gtype, a, b):
if gtype in alias_gtypes:
name = alias_gtypes[gtype]
else:
name = nickname_find(gtype)
try:
Operation.generate_sphinx(name)
all_names.append(name)
except Error:
pass
type_map(gtype, add_name)
return ffi.NULL
type_map(type_from_name('VipsOperation'), add_name)
all_names.sort()
# remove operations we have to wrap by hand
exclude = ['scale', 'ifthenelse', 'bandjoin', 'bandrank']
all_names = [x for x in all_names if x not in exclude]
# Output summary table
print('.. class:: pyvips.Image\n')
print(' .. rubric:: Methods\n')
print(' .. autosummary::')
print(' :nosignatures:\n')
for name in all_names:
print(' ~{0}'.format(name))
print()
# Output docs
print()
for name in all_names:
docstr = Operation.generate_sphinx(name)
docstr = docstr.replace('\n', '\n ')
print(' ' + docstr)
[docs]def cache_set_max(mx):
"""Set the maximum number of operations libvips will cache."""
vips_lib.vips_cache_set_max(mx)
[docs]def cache_set_max_mem(mx):
"""Limit the operation cache by memory use."""
vips_lib.vips_cache_set_max_mem(mx)
[docs]def cache_set_max_files(mx):
"""Limit the operation cache by number of open files."""
vips_lib.vips_cache_set_max_files(mx)
[docs]def cache_set_trace(trace):
"""Turn on libvips cache tracing."""
vips_lib.vips_cache_set_trace(trace)
[docs]def cache_get_max():
"""Get the maximum number of operations libvips will cache."""
return vips_lib.vips_cache_get_max()
[docs]def cache_get_size():
"""Get the current size of the operations cache."""
return vips_lib.vips_cache_get_size()
[docs]def cache_get_max_mem():
"""Get the operation cache limit by memory use."""
return vips_lib.vips_cache_get_max_mem()
[docs]def cache_get_max_files():
"""Get the operation cache limit by number of open files."""
return vips_lib.vips_cache_get_max_files()
[docs]def block_untrusted_set(state):
"""Set the block state for all untrusted operations."""
if at_least_libvips(8, 13):
vips_lib.vips_block_untrusted_set(state)
[docs]def operation_block_set(name, state):
"""Set the block state for a named operation."""
if at_least_libvips(8, 13):
vips_lib.vips_operation_block_set(_to_bytes(name), state)
__all__ = [
'Introspect', 'Operation',
'cache_set_max', 'cache_set_max_mem', 'cache_set_max_files',
'cache_set_trace',
'cache_get_max', 'cache_get_max_mem', 'cache_get_max_files',
'cache_get_size',
'block_untrusted_set', 'operation_block_set',
]