# -*- coding: utf-8 -*-
"""
libs.hook
~~~~~~~~~
As a proxy to dynamically load and unload hook methods.
:copyright: (c) 2019 by staugur.
:license: BSD 3-Clause, see LICENSE for more details.
"""
import warnings
from time import time
from sys import modules
from os import listdir, getpid
from os.path import (
join,
dirname,
abspath,
isdir,
isfile,
splitext,
basename,
getmtime,
)
from jinja2 import ChoiceLoader, FileSystemLoader, PackageLoader
from flask import (
render_template,
render_template_string,
Markup,
abort,
send_from_directory,
url_for,
)
from utils.tool import (
Attribution,
is_valid_verion,
is_match_appversion,
logger,
parse_author_mail,
)
from utils._compat import string_types, integer_types, iteritems, text_type
from config import GLOBAL
from .storage import get_storage
[文档]class HookManager(object):
def __init__(
self,
app=None,
hooks_dir="hooks",
reload_time=600,
third_hooks=None,
):
"""Receive initialization parameters and
pass options to :meth:`init_app` method.
"""
self.__storage = get_storage()
self.__hf = hooks_dir
self.__hook_dir = join(dirname(dirname(abspath(__file__))), self.__hf)
self.__MAX_RELOAD_TIME = int(GLOBAL["HookReloadTime"] or reload_time)
self.__third_hooks = third_hooks
self.__last_load_time = time()
#: hook static endpoint and url_path
self.__static_endpoint = "assets"
self.__static_url_path = "/{}".format(self.__static_endpoint)
#: local and thirds hooks data
self.__hooks = {}
#: Initialize app via a factory
if app is not None:
self.init_app(app)
[文档] def init_app(self, app):
self.__init_load_hooks()
#: Register template variable
app.jinja_env.globals.update(
intpl=self.call_intpl,
get_call_list=self.get_call_list,
emit_assets=self.emit_assets,
es=self.emit_assets,
)
#: Custom add multiple template folders.
app.jinja_loader = ChoiceLoader(
[
app.jinja_loader,
FileSystemLoader(self.__get_valid_tpl),
]
)
#: Add a static rule for plugins
app.add_url_rule(
"{}/<hook_name>/<path:filename>".format(self.__static_url_path),
endpoint=self.__static_endpoint,
view_func=self._send_static_file,
)
#: register extension with app
app.extensions = getattr(app, "extensions", None) or {}
app.extensions["hookmanager"] = self
self.app = app
@property
def _pid(self):
return str(getpid())
@property
def __last_load_time(self):
hlt = self.__storage.get("hookloadtime") or {}
return hlt.get(self._pid)
@__last_load_time.setter
def __last_load_time(self, timestamp):
if not isinstance(timestamp, (integer_types, float)):
raise TypeError("The value of last_load_time type error")
hlt = self.__storage.get("hookloadtime") or {}
if timestamp == 0:
hlt = {k: 0 for k, v in iteritems(hlt)}
else:
hlt[self._pid] = timestamp
self.__storage.set("hookloadtime", hlt)
@__last_load_time.deleter
def __last_load_time(self):
if "hookloadtime" in self.__storage.list:
del self.__storage["hookloadtime"]
def __del__(self):
del self.__last_load_time
def __ensure_reloaded(self):
hlt = self.__storage.get("hookloadtime") or {}
myself = hlt.get(self._pid)
if not myself or (time() - myself) > self.__MAX_RELOAD_TIME:
self.__hooks = {}
self.__last_load_time = time()
@property
def __third_hooks(self):
return self.__storage.get("hookthirds") or []
@__third_hooks.setter
def __third_hooks(self, third_hook_module_name):
"""添加/删除第三方钩子
:param str,list third_hook_module_name: 模块名
"""
if not third_hook_module_name:
return
hooks = set(self.__storage.get("hookthirds") or [])
if isinstance(third_hook_module_name, string_types):
if third_hook_module_name.endswith(":delete"):
delete_name = third_hook_module_name.split(":")[0]
if delete_name in hooks:
hooks.remove(delete_name)
else:
hooks.add(third_hook_module_name)
elif isinstance(third_hook_module_name, (list, tuple)):
hooks.update(third_hook_module_name)
self.__storage.set("hookthirds", list(set(hooks)))
def __get_state_storage(self, name):
s = set(self.__storage.get("hookstate") or [])
d = "DISABLED.%s" % name
e = "ENABLED.%s" % name
if e in s:
return "enabled"
elif d in s:
return "disabled"
else:
return None
def __get_state(self, h):
s = self.__get_state_storage(h.name)
if s is None:
s = h.state
return s
def __set_state_storage(self, name, state):
s = set(self.__storage.get("hookstate") or [])
d = "DISABLED.%s" % name
e = "ENABLED.%s" % name
if state == "disabled":
if e in s:
s.remove(e)
s.add(d)
elif state == "enabled":
if d in s:
s.remove(d)
s.add(e)
self.__storage.set("hookstate", list(s))
def __get_fileorparent(self, obj, ask_dir=False):
py = abspath(obj.__file__.replace(".pyc", ".py"))
return dirname(py) if ask_dir else py
def __init_load_hooks(self):
self.__scan_local()
self.__scan_third()
def __scan_local(self):
if isdir(self.__hook_dir):
for f in listdir(self.__hook_dir):
fn, fs = splitext(basename(f))
fa, fm = join(self.__hook_dir, f), "%s.%s" % (self.__hf, fn)
if isfile(fa) and fn != "__init__" and fs == ".py":
if fm in modules:
#: The mtime timestamp of the file when the module
#: was first imported.
if getattr(modules[fm], "__mtime__", 0) < getmtime(fa):
del modules[fm]
try:
fo = __import__(fm, fromlist=[self.__hook_dir])
except ImportError as e:
logger.error(e, exc_info=True)
continue
if hasattr(fo, "__version__") and hasattr(
fo, "__author__"
):
fo.__mtime__ = getmtime(fa)
fo.__family__ = "local"
self.__hooks[fm] = self.__get_meta(fo)
def __scan_third(self):
if self.__third_hooks and isinstance(self.__third_hooks, list):
for hn in self.__third_hooks:
#: hn: the name of the hook module that can be imported
if hn in modules:
hm = modules[hn]
if getattr(hm, "__mtime__", 0) < getmtime(
self.__get_fileorparent(hm)
):
del modules[hn]
try:
ho = __import__(hn)
except ImportError as e:
logger.error(e, exc_info=True)
continue
if hasattr(ho, "__version__") and hasattr(ho, "__author__"):
#: 语义化版本号
if not is_valid_verion(ho.__version__):
warnings.warn("%s: irregular version number" % hn)
continue
#: 匹配扩展要求的应用版本
appversion = getattr(ho, "__appversion__", None)
if not is_match_appversion(appversion):
warnings.warn(
"%s: app version number does not match for %s"
% (hn, appversion)
)
continue
ho.__mtime__ = getmtime(self.__get_fileorparent(ho))
ho.__family__ = "third"
ho.__module_name__ = hn
self.__hooks[hn] = self.__get_meta(ho)
def __get_meta(self, f_obj):
#: 钩子友好的可见名,非模块名
name = getattr(
f_obj,
"__hookname__",
f_obj.__name__.split(".")[-1],
)
state = self.__get_state_storage(name)
if state is None:
state = getattr(f_obj, "__state__", "enabled")
(author, mail) = parse_author_mail(f_obj.__author__)
return Attribution(
{
"author": author,
"email": mail,
"version": f_obj.__version__,
"appversion": getattr(f_obj, "__appversion__", None),
"description": getattr(f_obj, "__description__", None),
"state": state,
"name": name,
"proxy": f_obj,
"time": time(),
"catalog": getattr(f_obj, "__catalog__", None),
"tplpath": join(
self.__get_fileorparent(f_obj, True), "templates"
),
"atspath": join(
self.__get_fileorparent(f_obj, True), "static"
),
}
)
@property
def __get_valid_tpl(self):
return [h.tplpath for h in self.get_all_hooks if isdir(h.tplpath)]
@property
def get_all_hooks(self):
"""Get all hooks, enabled and disabled, returns list"""
self.__ensure_reloaded()
if not self.__hooks:
self.__init_load_hooks()
try:
hooks = self.__hooks.values()
data = []
for h in hooks:
h["state"] = self.__get_state(h)
data.append(h)
except RuntimeError:
return self.get_all_hooks
else:
return data
@property
def get_all_hooks_for_api(self):
"""query hook for admin api"""
hooks = self.get_all_hooks
return [
dict(
name=h.name,
description=h.description,
version=h.version,
appversion=h.appversion,
author=h.author,
email=h.email,
catalog=h.catalog,
state=h.state,
ltime=h.time,
mtime=h.proxy.__mtime__,
family=h.proxy.__family__,
)
for h in hooks
]
@property
def get_map_hooks(self):
"""Get all hooks, enabled and disabled, returns dict"""
return {h.name: h for h in self.get_all_hooks}
@property
def get_enabled_hooks(self):
"""Get all enabled hooks, return list"""
return [h for h in self.get_all_hooks if h.state == "enabled"]
@property
def get_enabled_map_hooks(self):
"""Get map enabled hooks, return dict"""
return {
name: h
for name, h in iteritems(self.get_map_hooks)
if h.state == "enabled"
}
[文档] def disable(self, name):
"""禁用钩子"""
if name in self.get_map_hooks:
self.__set_state_storage(name, "disabled")
[文档] def enable(self, name):
"""启用钩子"""
if name in self.get_map_hooks:
self.__set_state_storage(name, "enabled")
[文档] def reload(self):
self.__hooks = {}
self.__last_load_time = 0
self.__init_load_hooks()
[文档] def add_third_hook(self, third_hook_module_name):
"""添加第三方钩子
:param str third_hook_module_name: 钩子可直接导入的模块名
"""
if third_hook_module_name:
self.__third_hooks = third_hook_module_name
if hasattr(self, "app"):
try:
self.app.jinja_loader.loaders.append(
PackageLoader(third_hook_module_name)
)
except ValueError:
self.app.jinja_loader.loaders.append(
PackageLoader(third_hook_module_name, "")
)
self.reload()
[文档] def remove_third_hook(self, third_hook_name):
"""移除第三方钩子
:param str third_hook_name: 钩子名(非模块名)
"""
if third_hook_name:
p = self.proxy(third_hook_name, is_enabled=False)
if p and p.__family__ == "third" and hasattr(p, "__module_name__"):
self.__third_hooks = "%s:delete" % p.__module_name__
self.reload()
[文档] def proxy(self, name, is_enabled=True):
"""代理到钩子中执行方法
:param str name: 钩子名称(__hookname__),非其模块名
:param bool is_enabled: True表示仅从已启用钩子中查找方法,否则查找所有
"""
if is_enabled:
if name in self.get_enabled_map_hooks:
return self.get_enabled_map_hooks[name]["proxy"]
else:
if name in self.get_map_hooks:
return self.get_map_hooks[name]["proxy"]
[文档] def get_call_list(
self, _callname, _include=None, _exclude=None, _type="all"
):
"""获取所有启用钩子的某个类型对应的方法/变量"""
hooks = []
for h in sorted(self.get_enabled_hooks, key=lambda h: h.name):
if _include and isinstance(_include, (tuple, list)):
if h.name not in _include:
continue
if _exclude and isinstance(_exclude, (tuple, list)):
if h.name in _exclude:
continue
hin = False
tpl = getattr(h.proxy, "intpl_%s" % _callname, None)
cn = getattr(h.proxy, _callname, None)
if _type == "func":
if callable(cn):
hin = True
elif _type == "tpl":
if tpl:
hin = True
elif _type == "bool":
if cn is True:
hin = True
else:
if callable(cn) or tpl:
hin = True
if hin:
hooks.append(dict(name=h.name, description=h.description))
return hooks
[文档] def call(
self,
_funcname,
_include=None,
_exclude=None,
_every=None,
_mode=None,
_args=None,
_kwargs=None,
):
"""Try to execute the func method in all enabled hooks.
.. versionchanged:: 1.7.0
add param `_mode` and `_every`
.. versionchanged:: 1.9.0
_mode add any_false
.. deprecated:: 1.8.0
_callback replaced by `_every`;
args replaced by `_args`;
kwargs replaced by `_kwargs`
"""
response = []
for h in sorted(self.get_enabled_hooks, key=lambda h: h.name):
if _include and isinstance(_include, (tuple, list)):
if h.name not in _include:
continue
if _exclude and isinstance(_exclude, (tuple, list)):
if h.name in _exclude:
continue
func = getattr(h.proxy, _funcname, None)
if callable(func):
try:
if isinstance(_args, (list, tuple)) and isinstance(
_kwargs, dict
):
result = func(*_args, **_kwargs)
elif isinstance(_kwargs, dict):
result = func(**_kwargs)
elif isinstance(_args, (list, tuple)):
result = func(*_args)
else:
result = func()
except (ValueError, TypeError, Exception) as e:
result = dict(code=1, msg=str(e))
else:
if isinstance(result, dict):
if "code" not in result:
result["code"] = 0
else:
result = dict(code=0, data=result)
result["sender"] = h.name
#: Use `_every` to change the hook execution result
if callable(_every):
r = _every(result)
if isinstance(r, dict) and "code" in r:
if "sender" not in r:
r["sender"] = h.name
result = r
response.append(result)
if _mode == "any_true":
#: 任意钩子处理成功时则中止后续
if result.get("code") == 0:
break
elif _mode == "any_false":
#: 任意钩子处理失败时则中止后续
if result.get("code") != 0:
break
return response
[文档] def call_intpl(self, _tplname, _include=None, _exclude=None, **context):
"""在模板中渲染
:param _tplname: 扩展点名称
:param list _include: 仅查找哪些钩子
:param list _exclude: 排除哪些钩子
:kerword context: 渲染模板时传递的变量
:type _tplname: str or function
:returns: Markup HTML
"""
result = []
for h in sorted(self.get_enabled_hooks, key=lambda h: h.name):
if _include and isinstance(_include, (tuple, list)):
if h.name not in _include:
continue
if _exclude and isinstance(_exclude, (tuple, list)):
if h.name in _exclude:
continue
#: tpl is a file or html code or a func
tpl = getattr(h.proxy, "intpl_%s" % _tplname, None)
if not tpl:
continue
if callable(tpl):
tpl = tpl()
if tpl.split(".")[-1] in ("html", "htm", "xhtml"):
content = render_template(tpl, **context)
else:
content = render_template_string(tpl, **context)
if content:
result.append(content)
return Markup("".join(result))
def _send_static_file(self, hook_name, filename):
try:
h = self.get_enabled_map_hooks[hook_name]
except KeyError:
return abort(404)
else:
return send_from_directory(h.atspath, filename)
[文档] def emit_assets(self, hook_name, filename, _raw=False, _external=False):
"""在模板中快速构建出扩展中静态文件的地址。
当然,可以用 :func:`flask.url_for` 代替。
如果文件以 `.css` 结尾,那么将返回 `<link>` ,例如::
<link rel="stylesheet" href="/assets/hook/hi.css">
如果文件以 `.js` 结尾,那么将返回 `<script>` ,例如::
<script type="text/javascript" src="/assets/hook/hi.js"></script>
其他类型文件,仅仅返回文件地址,例如::
/assets/hook/img/logo.png
/assets/hook/attachment/test.zip
以下是一个完整的使用示例:
.. code-block:: html
<!DOCTYPE html>
<html>
<head>
<title>Hello World</title>
{{ emit_assets('demo','css/demo.css') }}
</head>
<body>
<div class="logo">
<img src="{{ emit_assets('demo', 'img/logo.png') }}">
</div>
<div class="showJsPath">
<scan>
{{ emit_assets('demo', 'js/demo.js', _raw=True) }}
</scan>
</div>
</body>
</html>
:param hook_name: 钩子名
:param path filename: 钩子包下static目录中的文件
:param bool _raw: True则只生成文件地址,不解析css、js,默认False
:param bool _external: 转发到url_for的_external
:returns: html code with :class:`~flask.Markup`
.. versionadded:: 1.9.0
"""
uri = url_for(
self.__static_endpoint,
hook_name=hook_name,
filename=filename,
_external=_external,
)
if _raw is not True:
if filename.endswith(".css"):
uri = '<link rel="stylesheet" href="%s">' % uri
elif filename.endswith(".js"):
uri = '<script type="text/javascript" src="%s"></script>' % uri
return Markup(uri)