361 lines
11 KiB
Python
361 lines
11 KiB
Python
"""Cliget - install cli tools in you user profile
|
|
|
|
Usage:
|
|
cliget [-v] [-c URL] search PAT
|
|
cliget [-v] [-c URL] info TOOL
|
|
cliget [-v] [-c URL] list
|
|
cliget [-v] [-c URL] versions TOOLS ...
|
|
cliget [-v] [-c URL] allversions TOOL
|
|
cliget [-v] [-c URL] install TOOLS ...
|
|
cliget [-v] [-c URL] update [--all] [TOOLS ...]
|
|
|
|
search search catalog for tools with the given pattern
|
|
info show details of an entry in the catalog
|
|
list list all installed tools
|
|
versions TOOLS list current and latest versions of some tools
|
|
allversions TOOL list all versions of a tool
|
|
install TOOLS install some tools
|
|
update list all updatable tools
|
|
update TOOLS update tools
|
|
update --all update all updatable tools
|
|
|
|
|
|
Options:
|
|
-h, --help
|
|
-c, --catalog URL
|
|
-v, --verbose
|
|
"""
|
|
|
|
import logging
|
|
from collections import namedtuple
|
|
import sys, os, shutil
|
|
from docopt import docopt
|
|
from yaml import load, SafeLoader
|
|
from semver import VersionInfo
|
|
import re
|
|
from subprocess import run, CalledProcessError, TimeoutExpired
|
|
from fuzzywuzzy import fuzz
|
|
|
|
|
|
class DotDict(dict):
|
|
def __getattr__(self, name):
|
|
return self[name] if name in self else None
|
|
|
|
|
|
Tool = namedtuple("Tool", "cli,props,lver,rver")
|
|
Tool.__annotations__ = {
|
|
"cli": str,
|
|
"props": DotDict,
|
|
"lver": VersionInfo,
|
|
"rver": VersionInfo,
|
|
}
|
|
|
|
import requests
|
|
from requests_cache import CachedSession
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.packages.urllib3.util.retry import Retry
|
|
|
|
retry_strategy = Retry(
|
|
total=5,
|
|
status_forcelist=[403, 429, 500, 502, 503, 504],
|
|
# method_whitelist=["HEAD", "GET", "OPTIONS"],
|
|
#method_whitelist=False,
|
|
backoff_factor=4,
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
http = CachedSession(
|
|
"cliget/http_cache",
|
|
use_cache_dir=True, # Save files in the default user cache dir
|
|
# cache_control=True, # Use Cache-Control response headers for expiration, if available
|
|
expire_after=3600, # Otherwise expire responses after one day
|
|
allowable_codes=[
|
|
200,
|
|
400,
|
|
404,
|
|
], # Cache 400 responses as a solemn reminder of your failures
|
|
# allowable_methods=['GET', 'POST'], # Cache whatever HTTP methods you want
|
|
# ignored_parameters=['api_key'], # Don't match this request param, and redact if from the cache
|
|
# match_headers=['Accept-Language'], # Cache a different response per language
|
|
stale_if_error=True, # In case of request errors, use stale cache data if possible
|
|
)
|
|
http.mount("https://", adapter)
|
|
http.mount("http://", adapter)
|
|
|
|
|
|
def trace(*mess):
|
|
if "TRACE" in os.environ:
|
|
print("TRACE", *mess)
|
|
|
|
|
|
def info(*mess):
|
|
print(*mess)
|
|
|
|
|
|
def warn(*mess):
|
|
print("WARNING:", *mess)
|
|
|
|
|
|
def _load_catalog(options) -> dict:
|
|
catalog = options.get(
|
|
"__catalog", "https://codeberg.org/setop/cliget/raw/branch/main/catalog.yaml"
|
|
)
|
|
if catalog.startswith("http"):
|
|
# IMPROVE cache catalog for some time
|
|
r = requests.get(catalog)
|
|
o = load(r.content, SafeLoader)
|
|
else:
|
|
o = load(open(catalog), SafeLoader)
|
|
trace(o)
|
|
return {k: DotDict(v) for k, v in o.items()}
|
|
|
|
|
|
def _find_semver(s: str) -> VersionInfo:
|
|
trace(s)
|
|
ver = VersionInfo(0, 0, 0)
|
|
try:
|
|
ver = VersionInfo.parse(s)
|
|
except ValueError:
|
|
try:
|
|
ver = VersionInfo(*list(i.group(0) for i in re.finditer("\d+", s))[:3])
|
|
except Exception as e:
|
|
trace("parse error", e)
|
|
return ver
|
|
|
|
|
|
def _local_version(cmd):
|
|
ver = VersionInfo(0)
|
|
try:
|
|
first_line = run(
|
|
[cmd, "--version"],
|
|
input="",
|
|
text=True,
|
|
capture_output=True,
|
|
check=True,
|
|
timeout=0.1,
|
|
).stdout.split("\n")[0]
|
|
trace(cmd, "=>", first_line)
|
|
ver = _find_semver(first_line)
|
|
except Exception as e:
|
|
trace("run error", e)
|
|
return ver
|
|
|
|
|
|
def _internal_list(options) -> tuple[str, VersionInfo]:
|
|
"""list installed tools and their version"""
|
|
ctl = _load_catalog(options)
|
|
for cli, props in ctl.items():
|
|
# search in path
|
|
try:
|
|
vers = _local_version(cli)
|
|
trace(cli, vers)
|
|
yield cli, props, vers
|
|
except CalledProcessError:
|
|
trace(cli, "call error")
|
|
except TimeoutExpired:
|
|
trace(cli, "timeout")
|
|
except FileNotFoundError:
|
|
trace(cli, "not found")
|
|
|
|
|
|
def dolist(options):
|
|
for cli, _, ver in _internal_list(options):
|
|
print(cli, ver)
|
|
|
|
|
|
def doinfo(options):
|
|
tool = options.TOOL
|
|
ctl = _load_catalog(options)
|
|
if tool in ctl:
|
|
print(tool)
|
|
for k, v in ctl[tool].items():
|
|
print(f" {k}: {v}")
|
|
else:
|
|
warn(f"{tool} not in catalog")
|
|
|
|
|
|
def dosearch(options):
|
|
pat = options.PAT
|
|
ctl = _load_catalog(options)
|
|
L = []
|
|
for cli, props in ctl.items():
|
|
trace(cli, props)
|
|
rtitle = fuzz.ratio(cli, pat)
|
|
rname = fuzz.ratio(props.name, pat) if "name" in props else 0
|
|
rdesc = fuzz.partial_token_set_ratio(props.desc, pat)
|
|
score = rdesc + rname
|
|
score = rtitle + score if rtitle > 60 else score
|
|
L.append((cli, props.desc[:50], score))
|
|
L = sorted(L, key=lambda x: -x[-1])
|
|
L = [["cli", "desc", "rel"]] + L[:10]
|
|
from terminaltables import SingleTable
|
|
|
|
table = SingleTable(L)
|
|
table.inner_row_border = False
|
|
print(table.table)
|
|
|
|
|
|
def dolistupdate(options):
|
|
print("look for updatables")
|
|
for cli, props, ver in _internal_list(options):
|
|
# get last version online
|
|
if props.github:
|
|
pass
|
|
pass
|
|
|
|
|
|
def _gh_versions(repo: str) -> [VersionInfo | None]:
|
|
[owner, repo] = repo.split("/")
|
|
url = f"https://api.github.com/repos/{owner}/{repo}/releases"
|
|
# GH API raise 403 when too many requests are sent
|
|
# TODO implement retry with threshold
|
|
response = http.get(url)
|
|
trace(response)
|
|
return [_find_semver(o.get("tag_name")) for o in response.json()]
|
|
|
|
|
|
def _gh_version(repo: str) -> VersionInfo:
|
|
[owner, repo] = repo.split("/")
|
|
url = f"https://api.github.com/repos/{owner}/{repo}/releases/latest"
|
|
trace(url)
|
|
response = http.get(url)
|
|
res_body = response.json()
|
|
trace(response, type(res_body), res_body)
|
|
if not response.ok:
|
|
return VersionInfo(0)
|
|
return _find_semver(res_body.get("tag_name"))
|
|
|
|
|
|
def doversions(options):
|
|
tools = options.TOOLS
|
|
ctl = _load_catalog(options)
|
|
for cli in tools:
|
|
if cli in ctl:
|
|
props = ctl[cli]
|
|
rver = _gh_version(props.github) if props.github else VersionInfo(0)
|
|
lver = _local_version(cli)
|
|
trace(lver)
|
|
trace(rver)
|
|
print(f"{cli} | {lver} | {rver}")
|
|
else:
|
|
warn(f"{tool} not in catalog")
|
|
|
|
|
|
def doallversions(options):
|
|
tool = options.TOOL
|
|
ctl = _load_catalog(options)
|
|
if tool in ctl:
|
|
props = ctl[tool]
|
|
if props.github:
|
|
vers = _gh_versions(props.github)
|
|
trace(vers)
|
|
print("\n".join(vers))
|
|
else:
|
|
warn(f"{tool} not in catalog")
|
|
|
|
|
|
def doinstall(options):
|
|
tools = options.TOOLS
|
|
ctl = _load_catalog(options)
|
|
for tool in tools:
|
|
if tool in ctl:
|
|
props = ctl[tool]
|
|
if props.github:
|
|
rver = _gh_version(props.github)
|
|
lver = _local_version(tool)
|
|
trace(lver, rver)
|
|
if rver > lver:
|
|
_perform_gh_install(tool, props.github)
|
|
else:
|
|
info(f"{tool} is already up do date ({lver})")
|
|
else:
|
|
warn(f"{tool} has no known install strategy")
|
|
else:
|
|
warn(f"{tool} not in catalog")
|
|
|
|
|
|
def _match_arch_machine(name: str) -> bool:
|
|
sysname = os.uname().sysname.lower() # os
|
|
machine = os.uname().machine.lower() # arch
|
|
# we don't consider libc - glic or musl - as musl is usually statically embed
|
|
lname = name.lower()
|
|
if sysname == 'linux' and machine == 'x86_64':
|
|
if lname.find('linux64')>0:
|
|
return True
|
|
return lname.find(sysname) > 0 and (
|
|
lname.find(machine) > 0 or (machine == "x86_64" and lname.find("amd64") > 0)
|
|
) # x86_64 and "amd64" are synonym
|
|
|
|
|
|
def _get_gh_matching_release(repo):
|
|
# get asset list from last release
|
|
url = f"https://api.github.com/repos/{repo}/releases/latest"
|
|
r = http.get(url)
|
|
assets = r.json()["assets"]
|
|
trace(assets)
|
|
# select right asset
|
|
asset = DotDict(next(filter(lambda x: _match_arch_machine(x["name"]), assets)))
|
|
trace(asset.name, asset.url)
|
|
return asset
|
|
|
|
|
|
def _perform_gh_install(cli, repo, version=None):
|
|
asset = _get_gh_matching_release(repo)
|
|
# mkdirs
|
|
p = os.path
|
|
home = os.environ["HOME"]
|
|
assetshome = p.join(home, ".cache/cliget/assets")
|
|
os.makedirs(assetshome, exist_ok=True)
|
|
location = p.join(assetshome, asset.name)
|
|
trace(f"will dl {location}")
|
|
# dl asset if not already there
|
|
if not p.exists(location):
|
|
dlurl = http.get(asset.url).json()["browser_download_url"]
|
|
trace(f'{dlurl=}')
|
|
r = http.get(dlurl, allow_redirects=True) #, stream=True)
|
|
trace(f'{r=}')
|
|
with open(location, "wb") as fd:
|
|
#shutil.copyfileobj(r.raw, fd)
|
|
fd.write(r.content)
|
|
trace("downloaded")
|
|
# unpack asset
|
|
if not asset.name.endswith(".tar.gz"):
|
|
raise ValueError("package type not handled")
|
|
progloc = p.join(home, ".local/programs", cli)
|
|
trace("process tgz")
|
|
os.makedirs(progloc, exist_ok=True)
|
|
run(["tar", "xfz", location, "-C", progloc])
|
|
# TODO look for exe : ./cli, ./<tar root folder>/cli, exe propertie
|
|
# symlink
|
|
# TODO remove existing symlink
|
|
os.symlink(p.join("../programs", cli, cli), p.join(home, ".local/bin", cli))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if "DEBUG" in os.environ:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
logging.debug("debug is on")
|
|
else:
|
|
logging.info("not in debug")
|
|
options = docopt(__doc__, version="Cliget 0.1.0")
|
|
options = DotDict(
|
|
{k.replace("-", "_"): v for (k, v) in options.items() if v is not None}
|
|
)
|
|
trace(options)
|
|
if options.info:
|
|
doinfo(options)
|
|
elif options.list:
|
|
dolist(options)
|
|
elif options.search:
|
|
dosearch(options)
|
|
elif options.versions:
|
|
doversions(options)
|
|
elif options.allversions:
|
|
doallversions(options)
|
|
elif options.install or options.update:
|
|
if not options.__all and len(options.TOOLS) == 0:
|
|
dolistupdate(options)
|
|
elif len(options.TOOLS) > 0:
|
|
doinstall(options)
|
|
else:
|
|
print("update all not implemented")
|