from __future__ import annotations
__all__ = [
'ATTR_TYPES',
'Cache',
'TYPES',
]
from typing import Any
import os
import re
import shutil
import sqlite3
import datetime
import functools as ft
import collections
from pypath_common import _misc
import platformdirs
from cache_manager._item import CacheItem
from cache_manager._status import Status
from cache_manager._session import _log
import cache_manager.utils as _utils
from . import _data
from ._lock import Lock
ATTR_TYPES = ['varchar', 'int', 'datetime', 'float']
TYPES = {
'str': 'VARCHAR',
'int': 'INT',
'float': 'FLOAT',
'datetime': 'DATETIME',
}
[docs]class Cache:
"""
Cache manager class, stores and manages the information in the registry
database as well as the files in the cache directory.
Args:
path:
Explicit path to set the cache in. Overrides the `pkg` keyword
argument. Optional, defaults to `None`.
pkg:
Package/module name the cache is used on. This sets the cache
directory in a folder located in the OS default cache directory.
Attrs:
con:
Current connection to the SQL database, an instance of
`sqlite3.Connection`.
cur:
Current cursor of the SQL database, an instance of `sqlite3.Cursor`.
path:
Path to the current cache registry.
dir:
The directory of the cache.
free_space:
Amount of free space available in the cache (in bytes).
"""
def __init__(self, path: str | None = None, pkg: str | None = None):
self.con, self.cur = None, None
self._fields = {}
self._set_path(path=path, pkg=pkg)
self._ensure_sqlite()
def __del__(self):
if hasattr(self, 'con'):
_log(f'Closing SQLite database path: {self.path}')
self.con.close()
def __len__(self):
self._ensure_sqlite()
return self.cur.execute('SELECT COUNT(*) FROM main').fetchone()[0]
@property
def free_space(self) -> int:
"""
Calculates the available free space in the cache directory.
Returns:
The available space in bytes.
"""
total, used, free = shutil.disk_usage(self.dir)
return free
[docs] def autoclean(self):
"""
Keeps only ready/in writing items and for each item the best version and
deletes anything else in the cache registry.
"""
_log('Auto cleaning cache.')
items = collections.defaultdict(set)
best = dict()
for it in self.contents().values():
if (item := it['item']):
items[item.key].add(item)
best = {
key: _misc.first([
it for it in sorted(its, key=lambda x: x.version)[::-1]
if it._status in {Status.READY.value, Status.WRITE.value}
])
for key, its in items.items()
}
to_remove = [
it for k, v in items.items()
for it in v - _misc.to_set(best.get(k, []))
]
_log(f'Deleting {len(to_remove)} records.')
self._delete_records(to_remove)
self.clean_disk()
_log('Auto clean complete.')
[docs] def best(
self,
uri: str,
params: dict | None = None,
status: int | set[int] | None = Status.READY.value,
newer_than: str | datetime.datetime | None = None,
older_than: str | datetime.datetime | None = None,
) -> CacheItem | None:
"""
Searches for the best (latest) version of an item in the cache registry.
Args:
uri:
Uniform Resource Identifier.
params:
Collection of parameters in dict format where key-value pairs
correspond to parameter-value respectively. Optional, defaults
to `None`.
status:
Integer (or set of) defining the valid status of the item to be
searched. Optional, defaults to `3` (READY status).
newer_than:
Date the times are required to be newer than. Optional, defaults
to `None`.
older_than:
Date the times are required to be older than. Optional, defaults
to `None`.
Returns:
The `CacheItem` instance corresponding to the latest version of it.
Example:
>>> cache = cm.Cache('./')
>>> cache.create('foo')
CacheItem[foo V:1 UNINITIALIZED]
>>> cache.create('foo')
CacheItem[foo V:2 UNINITIALIZED]
>>> cache.best('foo', status=0)
CacheItem[foo V:2 UNINITIALIZED]
"""
status = _misc.to_set(status)
items = self.search(
uri=uri,
params=params,
status=status,
newer_than=newer_than,
older_than=older_than,
)
# TODO: Consider also date
items = sorted(items, key=lambda it: it.version)
if items:
_log(f'Best matching version: {items[-1].version}')
return items[-1]
_log('No version found matching criteria')
[docs] def best_or_new(
self,
uri: str,
params: dict | None = None,
status: int | set[int] | None = Status.READY.value,
newer_than: str | datetime.datetime | None = None,
older_than: str | datetime.datetime | None = None,
attrs: dict | None = None,
ext: str | None = None,
label: str | None = None,
new_status: int = Status.WRITE.value,
filename: str | None = None,
) -> CacheItem:
"""
Searches for the best version of an item (i.e. last version). If such
item could not be found, it creates a new one.
Args:
uri:
Uniform Resource Identifier.
params:
Collection of parameters in dict format where key-value pairs
correspond to parameter-value respectively. Optional, defaults
to `None`.
status:
Integer (or set of) defining the valid status of the item to be
searched. Optional, defaults to `3` (READY status).
newer_than:
Date the times are required to be newer than. Optional, defaults
to `None`.
older_than:
Date the times are required to be older than. Optional, defaults
to `None`.
attrs:
Attributes of the item to be searched or created as dictionary
of key-value pairs corresponding to the name and value of the
attributes. Optional, defaults to `None`.
ext:
Extension of the file associated to the item. Optional, defaults
to `None`. Currently not implemented.
label:
Label for the item (e.g. type, group, category...). Optional,
defaults to `None`.
new_status:
Integer defining the new status to be set in the case a new item
is created. Optional, defaults to `1` (WRITE status).
filename:
Name of the file associated to the item. Optional, defaults to
`None`.
Returns:
The `CacheItem` instance of the best or new item according to the
provided attributes.
Examples:
>>> cache = cm.Cache('./')
>>> cache.create('foo')
CacheItem[foo V:1 UNINITIALIZED]
>>> cache.create('foo')
CacheItem[foo V:2 UNINITIALIZED]
>>> cache.best_or_new('foo', status=0)
CacheItem[foo V:2 UNINITIALIZED]
>>> cache.best_or_new('bar')
CacheItem[bar V:1 WRITE]
"""
args = locals()
args.pop('self')
args['status'] = args.pop('new_status')
args.pop('newer_than')
args.pop('older_than')
with Lock(self.con):
item = self.best(
uri=uri,
params=params,
status=status,
newer_than=newer_than,
older_than=older_than,
)
if not item:
item = self.create(**args)
return item
[docs] def by_attrs(self, attrs: dict) -> set[int]:
"""
Searches entries in the registry based on their attributes (stored in
the differen type-based attribute tables).
Args:
attrs:
Attributes and corresponding values of the items to search for.
By default, the different attributes in the search must be
satisfied. This is, items that fulfill all the attribute-value
pairs, will be included in the search result. In case one wants
the results of the search to just fulfill at least one term, it
must include the following key-value pair in the argument:
`'__and': False`. See example below.
Returns:
Set of keys corresponding to the elements in the registry with the
searched attributes.
Examples:
>>> cache = cm.Cache('./')
>>> cache.create('foo1', attrs={'bar': 1, 'baz': 2})
CacheItem[foo1 V:1 UNINITIALIZED]
>>> cache.create('foo2', attrs={'bar': 1, 'baz': 5})
CacheItem[foo2 V:1 UNINITIALIZED]
>>> cache.by_attrs({'bar': 1, 'baz': 5})
{2}
>>> cache.by_attrs({'bar': 1, 'baz': 5, '__and': False})
{1, 2}
"""
_log(f'Searching by attributes: {attrs}')
result = []
op = set.intersection if attrs.pop('__and', True) else set.union
attrs = _utils.parse_attr_search(attrs)
for atype, queries in attrs.items():
for query in queries:
self._execute(f'SELECT id FROM attr_{atype} WHERE {query}')
aux = self.cur.fetchall()
result.append({item[0] for item in aux})
return op(*result) if result else set()
[docs] def by_key(self, key: str, version: int) -> CacheItem:
"""
Searches a single item by its key and version number.
Args:
key:
The key of the item to be fetched.
version:
The specific version of the item to be retrieved.
Returns:
The `CacheItem` instance of the item searched (if any).
Example:
>>> cache = cm.Cache('./')
>>> it = cache.create('foo')
>>> it.key
'31d0e534960b07c0bde745c17b05eaba'
>>> cache.by_key('31d0e534960b07c0bde745c17b05eaba', 1)
CacheItem[foo V:1 UNINITIALIZED]
"""
_log(f'Looking up key: {key}')
return _misc.first(self.search(key=key, version=version))
[docs] def clean_db(self):
"""
Removes records from the database registry that do not have the
corresponding file on the cache disk directory.
"""
_log('Cleaning cache database: removing records without file on disk.')
items = {
item
for it in self.contents().values()
if (item := it['item'])
and not os.path.exists(it['item'].path)
}
_log(f'Deleting {len(items)} records.')
self._delete_records(items)
_log('Cleaning cache database complete.')
[docs] def clean_disk(self):
"""
Deletes files from the disk cache directory if they don't have any
record in the database registry.
"""
_log('Cleaning disk: removing items without DB record.')
fnames = {
os.path.join(self.dir, fname)
for item in self.contents().values()
if (fname := item['disk_fname'])
and not item.get('status', False)
}
_log(f'Deleting {len(fnames)} files.')
for file in fnames:
_log(f'Deleting from disk: `{file}`.')
os.remove(file)
_log('Cleaning disk complete.')
[docs] def contents(self) -> dict[str, dict[str, int | str | CacheItem]]:
"""
Generates a collection of all the items in the database registry and
files in the cache directory on the disk.
Returns:
Dictionary where keys correspond to each item's `version_id` and
values to dictionary with some of the item's attributes, namely:
`status` (current status of the item as integer), `fname` (file name
as stored in the cache database), `last_read` (date where the item
was last accessed), `read_count` (number of times the item has been
accessed), `item` (the instance of the `CacheItem` itself),
`disk_fname` (file name as stored in the cache directory on the
disk).
Example:
>>> cache = cm.Cache('./')
>>> cache.create('foo')
CacheItem[foo V:1 UNINITIALIZED]
>>> cache.contents()
{'31d0e534960b07c0bde745c17b05eaba-1': {'status': 0, 'fname': '31d0\
e534960b07c0bde745c17b05eaba-1', 'last_read': None, 'read_count': 0\
, 'item': CacheItem[foo V:1 UNINITIALIZED], 'disk_fname': None}}
"""
disk = {
m.group(): fname
for fname in os.listdir(self.dir)
if (m := re.search(r'[\dabcdef]{32}-\d+', fname))
}
db = {
it.version_id: {
'status': it._status,
'fname': it.cache_fname,
'last_read': it.last_read,
'read_count': it.read_count,
'item': it,
}
for it in self.search(include_removed = True)
}
return {
vid: dict(**db.get(vid, {}), disk_fname = disk.get(vid, None))
for vid in set(disk.keys()) | set(db.keys())
}
[docs] def create(
self,
uri: str,
params: dict | None = None,
attrs: dict | None = None,
status: int = Status.UNINITIALIZED.value,
ext: str | None = None,
label: str | None = None,
filename: str | None = None,
) -> CacheItem:
"""
Creates a new entry in the registry.
Args:
uri:
Uniform Resource Identifier.
params:
Collection of parameters in dict format where key-value pairs
correspond to parameter-value respectively. Optional, defaults
to `None`.
attrs:
Extra attributes associated to the item. Keys are the attribute
names and values their content. These attributes will be stored
in the attribute tables according to their data type
automatically. Optional, defaults to `None`.
status:
Status of the new item. Optional, defaults to `0`.
ext:
Extension of the file associated to the item. Optional, defaults
to `None` (automatically extracted from the file name).
label:
Label for the item (e.g. type, group, category...). Optional,
defaults to `None`.
filename:
Name of the file associated to the item. Optional, defaults to
`None` (automatically set).
Returns:
The newly created `CacheItem` instance.
Example:
>>> cache = cm.Cache('./')
>>> cache.create('foo')
CacheItem[31d0e534960b07c0bde745c17b05eaba V:1 UNINITIALIZED]
"""
self._ensure_sqlite()
_log(f'CREATE {uri}')
args = locals()
args.pop('self')
param_str = _utils.serialize(args)
_log(f'Creating new version for item {param_str}')
with Lock(self.con):
_log(f'Looking up existing versions of item `{uri}`')
items = self.search(
uri=uri,
params=params,
)
last_version = max((i.version for i in items), default = 0)
if last_version == 0:
_log('No existing version found.')
else:
_log(f'Latest version: `{last_version}`')
new = CacheItem.new(
uri,
params,
attrs=attrs,
version=last_version + 1,
date=_utils.parse_time(),
status=status,
ext=ext,
label=label,
cache=self,
)
_log(f'Next version: {new.key}-{new.version}')
self._execute(f'''
INSERT INTO
main (
item_id,
version_id,
version,
status,
file_name,
label,
date,
ext,
last_read,
last_search,
read_count,
search_count
)
VALUES (
{self._quotes(new.key)},
"{new.key}-{new.version}",
{new.version},
{new._status},
{self._quotes(new.filename)},
{self._quotes(new.label)},
{self._quotes(new.date)},
{self._quotes(new.ext)},
NULL,
{self._quotes(new.date)},
0,
0
)
''')
q = f'SELECT id FROM main WHERE version_id = "{new.version_id}"'
self._execute(q)
key = self.cur.fetchone()[0]
new._id = key
for actual_typ in ATTR_TYPES:
_log(f'Creating attributes in attr_{actual_typ}')
useattrs = {
k: v
for k, v in new.attrs.items()
if self._sqlite_type(v) == actual_typ.upper()
}
if not useattrs:
continue
main_fields = self._table_fields()
values = ', '.join(
f'({key}, "{k}", {self._quotes(v, actual_typ)})'
for k, v in useattrs.items()
if k not in main_fields
)
q = (
f'INSERT INTO attr_{actual_typ} ( id, name, value ) '
f'VALUES {values}'
)
self._execute(q)
_log(f'Successfully created: {new.version_id}')
_log('END CREATE')
return new
[docs] def does_it_fit(self, size: int) -> bool:
"""
Checks whether a given size is lower than the current available space.
Args:
size:
Integer corresponding to the size to be checked (in bytes).
Returns:
Whether the requested space is available.
"""
return size <= self.free_space
# TODO: Should method below include a call to `does_it_fit`?
[docs] def move_in(
self,
path: str,
uri: str | None = None,
params: dict | None = None,
attrs: dict | None = None,
status: int = Status.WRITE.value,
ext: str | None = None,
label: str | None = None,
filename: str | None = None,
) -> CacheItem:
"""
Copies a file into the cache directory and creates the corresponding
cache item registry.
Args:
path:
Current/original path of the file that has to be moved into the
cache.
uri:
Uniform Resource Identifier. Optional, defaults to `None`.
params:
Collection of parameters in dict format where key-value pairs
correspond to parameter-value respectively. Optional, defaults
to `None`.
attrs:
Extra attributes associated to the item. Keys are the attribute
names and values their content. These attributes will be stored
in the attribute tables according to their data type
automatically. Optional, defaults to `None`.
status:
Status of the new item. Optional, defaults to `1`.
ext:
Extension of the file associated to the item. Optional, defaults
to `None` (automatically extracted from the file name).
label:
Label for the item (e.g. type, group, category...). Optional,
defaults to `None`.
filename:
Name of the file associated to the item. Optional, defaults to
`None` (automatically set).
Returns:
The newly created `CacheItem` instance.
"""
args = locals()
args.pop('self')
args.pop('path')
uri = uri or os.path.basename(path)
item = self.create(**args)
_log(f'Copying `{path}` to `{item.path}`.')
shutil.copy(path, item.path)
return item
[docs] def reload(self):
"""
Reloads the cache_manager at the module level and reloads the current
instance of `Cache`
"""
modname = self.__class__.__module__
mod = __import__(modname, fromlist=[modname.split('.')[0]])
import importlib as imp
imp.reload(mod)
new = getattr(mod, self.__class__.__name__)
setattr(self, '__class__', new)
# FIXME: attrs, ext and label are not used
# TODO: Make it more safer later (avoid to delete everything accidentally)
[docs] def remove(
self,
uri: str | None = None,
params: dict | None = None,
version: int | set[int] | None = None,
attrs: dict | None = None,
status: int | None = None,
ext: str | None = None,
label: str | None = None,
newer_than: str | datetime.datetime | None = None,
older_than: str | datetime.datetime | None = None,
key: str | None = None,
disk: bool = False,
keep_record: bool = True,
) -> None:
"""
Removes item(s) from the cache. The removal procedure will depend on the
parameters `disk` and `keep_record`, see argument description below for
specifics on their behavior.
Args:
uri:
Uniform Resource Identifier. Optional, defaults to `None`.
params:
Collection of parameters in dict format where key-value pairs
correspond to parameter-value respectively. Optional, defaults
to `None`.
version:
Integer defining the version of the item to update. Optional,
defaults to `None`.
attrs:
Extra attributes associated to the item. Keys are the attribute
names and values their content. Optional, defaults to `None`.
Currently not implemented
status:
Integer defining the status of the item to update. Optional,
defaults to `None`.
ext:
Extension of the file associated to the item. Optional, defaults
to `None`. Currently not implemented.
label:
Label for the item (e.g. type, group, category...). Optional,
defaults to `None`. Currently not implemented.
newer_than:
Date the times are required to be newer than. Optional, defaults
to `None`.
older_than:
Date the times are required to be older than. Optional, defaults
to `None`.
key:
Unique key name for the item. Optional, defaults to `None`.
disk:
Whether to also remove the files associated to the entry(ies)
from disk too. Optional, defaults to `False`.
keep_record:
Whether to keep the record of the entry in the registry (marks
the entry status as trashed, status = -1). Otherwise the entry
is permanently deleted. Optional, `True` by default.
Example:
>>> cache = cm.Cache('./')
>>> cache.create('foo')
CacheItem[foo V:1 UNINITIALIZED]
>>> cache.remove(uri='foo')
>>> cache.search(uri='foo')
[]
"""
with Lock(self.con):
items = self.search(
uri=uri,
params=params,
status=status,
version=version,
newer_than=newer_than,
older_than=older_than,
key=key,
)
if not items:
return
where = ','.join(str(item._id) for item in items)
where = f' WHERE id IN ({where})'
new_status = Status.DELETED.value if disk else Status.TRASH.value
q = f'UPDATE main SET status = {new_status} {where};'
self._execute(q)
if disk:
self._delete_files(items)
if not keep_record:
self._delete_records(items)
[docs] def search(
self,
uri: str | None = None,
params: dict | None = None,
status: int | set[int] | None = None,
version: int | set[int] | None = None,
newer_than: str | datetime.datetime | None = None,
older_than: str | datetime.datetime | None = None,
ext: str | None = None,
label: str | None = None,
filename: str | None = None,
key: str | None = None,
attrs: dict | None = None,
include_removed: bool = False,
) -> list[CacheItem]:
"""
Looks up for items in the cache based on the passed parameter(s).
Args:
uri:
Uniform Resource Identifier. Optional, defaults to `None`.
params:
Collection of parameters in dict format where key-value pairs
correspond to parameter-value respectively. Optional, defaults
to `None`.
status:
Integer defining the status of the item to update. Optional,
defaults to `None`.
version:
Integer defining the version of the item to update. Optional,
defaults to `None`.
newer_than:
Date the times are required to be newer than. Optional, defaults
to `None`.
older_than:
Date the times are required to be older than. Optional, defaults
to `None`.
ext:
Extension of the file associated to the item. Optional, defaults
to `None`.
label:
Label for the item (e.g. type, group, category...). Optional,
defaults to `None`.
filename:
Name of the file associated to the item. Optional, defaults to
`None`.
key:
Unique key name for the item. Optional, defaults to `None`.
attrs:
Search by attributes. A dict of attribute names and values.
Operators can be included at the end of the names or in front
of the values, forming a tuple of length 2 in the latter case.
Multiple values can be provided as lists. By default the
attribute search parameters are joined by AND, this can be
overridden by including `"__and": False` in `attrs`. The types
of the attributes will be inferred from the values, except if
the values provided as their correct type, such as numeric
types or `datetime`. Strings will be converted to dates only if
prefixed with `"DATE:"`. Optional, defaults to `None`.
include_removed:
Whether to include items marked for removal (i.e. trashed,
status = -1) in the search.
Returns:
List of `CacheItem` instances of the items fulfilling the search.
terms.
Example:
>>> cache = cm.Cache('./')
>>> it = cache.create('foo')
>>> cache.search(uri='foo)
[CacheItem[foo V:1 UNINITIALIZED]]
"""
_log('SEARCH')
args = locals()
args.pop('self')
param_str = _utils.serialize(args)
_log(f'Searching cache: {param_str}')
attrs = args.pop('attrs') or {}
ids = self.by_attrs(attrs)
where = self._where(**args)
if attrs:
where += f' AND main.id IN ({",".join(str(i) for i in ids)})'
results = {}
with Lock(self.con):
for actual_typ in ATTR_TYPES:
q = (
'SELECT * FROM main '
f'LEFT JOIN attr_{actual_typ} attr ON main.id = attr.id '
f'{where}'
)
self._execute(q)
_log(f'Fetching results from attr_{actual_typ}')
for row in self.cur.fetchall():
keys = (
tuple(self._table_fields().keys()) +
('_id', 'name', 'value')
)
row = dict(zip(keys, row))
verid = row['version_id']
if verid not in results:
_log(f'Found version: `{verid}`')
results[verid] = CacheItem(
key=row['item_id'],
version=row['version'],
status=row['status'],
date=row['date'],
filename=row['file_name'],
ext=row['ext'],
label=row['label'],
_id=row['id'],
last_read=row['last_read'],
last_search=row['last_search'],
read_count=row['read_count'],
search_count=row['search_count'],
cache=self,
)
if row['name']:
results[verid].attrs[row['name']] = row['value']
if results:
ids = ','.join(str(item._id) for item in results.values())
update_q = (
'UPDATE main SET '
'last_search = DATETIME("now"), '
'search_count = search_count + 1 '
f'WHERE id IN ({ids});'
)
self._execute(update_q)
_log(f'Retrieved {len(results)} results')
_log('END SEARCH')
return list(results.values())
# FIXME: attrs, ext and label are not used
[docs] def update(
self,
uri: str | None = None,
params: dict | None = None,
attrs: dict | None = None,
status: int | None = None,
version: int | None = None,
ext: str | None = None,
label: str | None = None,
newer_than: str | datetime.datetime | None = None,
older_than: str | datetime.datetime | None = None,
key: str | None = None,
update: dict | None = None,
):
"""
Updates one or more items. All arguments except `update` are used to
search for the items to be updated.
Args:
uri:
Uniform Resource Identifier. Optional, defaults to `None`.
params:
Collection of parameters in dict format where key-value pairs
correspond to parameter-value respectively. Optional, defaults
to `None`.
attrs:
Extra attributes associated to the item. Keys are the attribute
names and values their content. Optional, defaults to `None`.
Currently not implemented
status:
Integer defining the status of the item to update. Optional,
defaults to `None`.
version:
Integer defining the version of the item to update. Optional,
defaults to `None`.
ext:
Extension of the file associated to the item. Optional, defaults
to `None`. Currently not implemented.
label:
Label for the item (e.g. type, group, category...). Optional,
defaults to `None`. Currently not implemented.
newer_than:
Date the times are required to be newer than. Optional, defaults
to `None`.
older_than:
Date the times are required to be older than. Optional, defaults
to `None`.
key:
Unique key name for the item. Optional, defaults to `None`.
update:
Dictionary containing the key-value pairs of fields/attributes
and the new values respectively to be updated. Optional,
defaults to `None`.
Example:
>>> cache = cm.Cache('./')
>>> it = cache.create('foo', attrs={'bar': 123, 'baz': 456})
>>> it.attrs
{'bar': 123, 'baz': 456, '_uri': 'foo'}
>>> cache.update(uri='foo', update={'bar': 0})
>>> it = cache.search('foo')[0]
>>> it.attrs
{'_uri': 'foo', 'bar': 0, 'baz': 456}
"""
with Lock(self.con):
items = self.search(
uri=uri,
params=params,
status=status,
version=version,
newer_than=newer_than,
older_than=older_than,
key=key,
)
update = update or {}
main_fields = self._table_fields()
main = ', '.join(
f'{k} = {self._quotes(v, TYPES[type(v).__name__])}'
for k, v in update.items() if k in main_fields
)
# Updating elements in main table
ids = [it._id for it in items]
_log(f'Updating {len(ids)} items')
where = f' WHERE id IN ({", ".join(map(str, ids))})'
if main:
q = f'UPDATE main SET {main}{where};'
self._execute(q)
# Updating elements in attribute tables
for actual_typ in ATTR_TYPES:
_log(f'Updating attributes in attr_{actual_typ}')
for k, v in update.items():
typ = type(v).__name__
if k not in main_fields and typ == actual_typ:
val = f'value = {self._quotes(v, TYPES[typ])}'
name_where = where + f' AND name = {self._quotes(k)}'
q = f'UPDATE attr_{actual_typ} SET {val} {name_where}'
self._execute(q)
_log(f'Finished updating attributes')
[docs] def update_status(
self,
uri: str | None = None,
params: dict | None = None,
version: int | None = -1,
status: int = Status.READY.value,
key: str | None = None,
):
"""
Updates the status of a given entry(ies) in the registry. All arguments
other than `status` are used to identify/search the entry(ies) to
update.
Args:
uri:
Uniform Resource Identifier. Optional, defaults to `None`.
params:
Collection of parameters in dict format where key-value pairs
correspond to parameter-value respectively. Optional, defaults
to `None`.
version:
Version number of the item(s). Optional, defaults to `None`.
status:
Integer defining the new status to be set. Optional, defaults to
`3` (READY status).
key:
Unique identifier for the item (alphanumeric hash).
Example:
>>> cache = cm.Cache('./')
>>> it = cache.create('foo')
>>> it.status
0
>>> cache.update_status(uri='foo')
>>> it.status
3
"""
self.update(
uri=uri,
params=params,
version=version,
update={'status': status},
key=key,
)
ready = ft.partialmethod(update_status, status=Status.READY.value)
failed = ft.partialmethod(update_status, status=Status.FAILED.value)
def _accessed(self, item_id: int):
"""
Updates the 'last_read' and 'read_count' attributes of a given item to
current date/time and +1 respectively.
Args:
item_id:
Integer corresponding to the internal `CacheItem._id` attribute
that has just been accessed.
"""
q = (
'UPDATE main SET '
'last_read = DATETIME("now"), read_count = read_count + 1 '
f'WHERE id = {item_id};'
)
self._execute(q)
def _create_schema(self):
"""
Initializes the SQL registry database and creates the main and attribute
tables if not already existing.
"""
self._ensure_sqlite()
_log(f'Initializing database')
fields = ', '.join(f'{k} {v}' for k, v in self._table_fields().items())
_log(f'Ensuring main table exists')
self._execute(f'''
CREATE TABLE IF NOT EXISTS
main (
{fields}
)
''')
for typ in ATTR_TYPES:
_log(f'Ensuring attr_{typ} table exists')
self._execute(
'''
CREATE TABLE IF NOT EXISTS
attr_{} (
id INT,
name VARCHAR,
value {},
FOREIGN KEY(id) REFERENCES main(id)
)
'''.format(typ, typ.upper()),
)
def _delete_files(self, items: list[int, CacheItem]):
"""
Permanently deletes the files from a given list of items in the cache
from the disk.
Args:
items:
List of items to be deleted, these can be either the `CacheItem`
instances or an integer corresponding to the internal
`CacheItem._id` attribute.
"""
for item in items:
if os.path.exists(item.path):
_log(f'Deleting from disk: `{item.path}`.')
os.remove(item.path)
def _delete_records(self, items: list[int, CacheItem]):
"""
Permanently deletes a given list of items from the cache registry.
Args:
items:
List of items to be deleted, these can be either the `CacheItem`
instances or an integer corresponding to the internal
`CacheItem._id` attribute.
"""
with Lock(self.con):
where = ','.join(str(getattr(i, '_id', i)) for i in items)
where = f' WHERE id IN ({where})'
_log(f'_delete_records: {len(items)} IDs to be deleted.')
n_before = len(self)
for actual_typ in ATTR_TYPES:
attr_table = f'attr_{actual_typ}'
_log(f'Deleting attributes from {attr_table}')
q = f'DELETE FROM {attr_table} {where}'
self._execute(q)
q = f'DELETE FROM main'
q += where
self._execute(q)
_log(f'Deleted {n_before - len(self)} records.')
def _ensure_sqlite(self):
"""
Ensures the connection to the SQL database is open.
"""
if self.con is None:
self._open_sqlite()
def _execute(self, query: str):
"""
Executes a given SQL query in the database.
Args:
query:
The SQL query string to execute in the database.
"""
query = re.sub(r'\s+', ' ', query)
_log(f'Executing query: {query}')
self.cur.execute(query)
self.con.commit()
def _open_sqlite(self):
"""
Opens the cache registry (SQL database) connection.
"""
_log(f'Opening SQLite database: {self.path}')
self.con = sqlite3.connect(self.path)
self.cur = self.con.cursor()
self._create_schema()
def _set_path(self, path: str | None, pkg: str | None = None):
"""
Sets the path for the cache. It can either be a explicitly defined path
or can take a module/package name and set the path to the OS default
cache directory and create a cache folder under the package name.
Args:
path:
Explicit path to set the cache in. Overrides the `pkg` keyword
argument. Optional, defaults to `None`.
pkg:
Package/module name the cache is used on. This sets the cache
directory in a folder located in the OS default cache directory.
Example:
>>> cache = cm.Cache('.')
>>> cache._set_path(path='./test_cache')
>>> cache.dir
'./test_cache'
>>> cache.path
'./test_cache/cache.sqlite'
"""
if not path and not pkg:
raise ValueError('Please provide a valid path or package name')
path = path or platformdirs.user_cache_dir(pkg)
if not os.path.exists(path):
stem, ext = os.path.splitext(path)
os.makedirs(stem if ext else path, exist_ok=True)
if os.path.isdir(path):
path = os.path.join(path, 'cache.sqlite')
_log(f'Setting SQLite database path: {path}')
self.path = path
self.dir = os.path.dirname(self.path)
def _table_fields(self, name: str = 'main') -> dict[str, str]:
"""
Retrieves the available fields in the main table (i.e. column names).
Args:
name:
Name of the table from which to retreive the field names.
Optional, defaults to `'main'` (currently only option).
Returns:
Dictionary containing the field names as keys and values correspond
to the SQL data types and column specifications.
Example:
>>> cache = Cache('./')
>>> cache._table_fields()
OrderedDict([('id', 'INTEGER PRIMARY KEY AUTOINCREMENT'), \
('item_id', 'VARCHAR'), ('version_id', 'VARCHAR'), \
('version', 'INT'), ('status', 'INT'), \
('file_name', 'VARCHAR'), ('label', 'VARCHAR'), \
('date', 'DATETIME'), ('ext', 'VARCHAR'), \
('last_read', 'DATETIME'), ('last_search', 'DATETIME'), \
('read_count', 'INT'), ('search_count', 'INT')])
"""
# TODO: Make other tables available?
if name not in self._fields:
self._fields[name] = _data.load(f'{name}.yaml')
return self._fields[name]
@staticmethod
def _quotes(string: str | None, typ: str = 'VARCHAR') -> str:
"""
Double-quotes strings to convert them to literals in SQL.
Args:
string:
The string to be quoted.
typ:
Type of variable the string contains. Optional, defaults to
`'VARCHAR'`.
Returns:
The resulting quoted string.
Example:
>>> cache = Cache('./')
>>> cache._quotes('abc')
'"abc"'
"""
if string is None:
return 'NULL'
typ = typ.upper()
return f'"{string}"' if (
typ.startswith('VARCHAR') or
typ.startswith('DATETIME')
) else string
@staticmethod
def _sqlite_type(obj: Any) -> str:
"""
Checks a given value for the type and gives corresponding SQL
equivalent.
Args:
obj:
The value to check the type for.
Returns:
The resulting SQL data type as a string.
Examples:
>>> cache = Cache('./')
>>> cache._sqlite_type(123)
'INT'
>>> cache._sqlite_type(1.25)
'FLOAT'
>>> cache._sqlite_type('abc')
'VARCHAR'
"""
pytype = type(obj).__name__
return TYPES.get(pytype, None)
@staticmethod
def _typeof(value: Any) -> str:
"""
Checks a given value for the numerical type.
Args:
value:
The variable to check for the type.
Returns:
The resulting type as a string in SQL format. `'INT'` if the value
is an integer or `'FLOAT'` if its a floating point number.
Examples:
>>> cache = Cache('./')
>>> cache._typeof(123)
'INT'
>>> cache._typeof(9.01)
'FLOAT'
>>> cache._typeof('123')
'INT'
"""
if (
isinstance(value, int)
or (isinstance(value, str) and _misc.is_int(value))
):
return 'INT'
elif (
isinstance(value, float)
or (isinstance(value, str) and _misc.is_float(value))
):
return 'FLOAT'
@staticmethod
def _where(
uri: str | None = None,
params: dict | None = None,
status: Status | set[int] | None = None,
version: int | set[int] | None = None,
newer_than: str | datetime.datetime | None = None,
older_than: str | datetime.datetime | None = None,
ext: str | None = None,
label: str | None = None,
filename: str | None = None,
key: str | None = None,
include_removed: bool = False,
) -> str:
"""
Generates a SQL `WHERE` clause based on different parameters defined in
the function arguments.
Args:
uri:
Uniform Resource Identifier. Optional, defaults to `None`.
params:
Collection of parameters in dict format where key-value pairs
correspond to parameter-value respectively. Optional, defaults
to `None`.
status:
Status(es) of the item(s). Optional, defaults to `None`.
version:
Version(s) of the item(s). Optional, defaults to `None`.
newer_than:
Date the times are required to be newer than. Optional, defaults
to `None`.
older_than:
Date the times are required to be older than. Optional, defaults
to `None`.
ext:
Extension of the file associated to the item(s). Optional,
defaults to `None`.
label:
Label for the item (e.g. type, group, category...). Optional,
defaults to `None`.
filename:
Name of the file associated to the item. Optional, defaults to
`None`.
key:
Unique key name for the item. Optional, defaults to `None`.
include_removed:
Whether to include item(s) marked for removal. Optional,
defaults to `False`.
Returns:
The query string with the WHERE clause.
Example:
>>> cache = Cache('./')
>>> cache.create('test_entry')
CacheItem[test_entry V:1 UNINITIALIZED]
>>> cache._where('test_entry')
' WHERE item_id = "224eeebf8db5634d8d9b2a31755d4a97" AND status != \
-1 AND status != -2'
"""
where = []
item_id = key
if not item_id and (uri or params):
params = params or {}
if uri:
params['_uri'] = uri
item_id = CacheItem.serialize(params)
if item_id:
where.append(f'item_id = "{item_id}"')
if filename:
where.append(f'file_name = "{filename}"')
status = _misc.to_set(status)
if -1 not in status and not include_removed:
where.append('status != -1')
if -2 not in status and not include_removed:
where.append('status != -2')
if status:
status = str(status).strip('{}')
where.append(f'status IN ({status})')
if version is not None and version != -1:
version = str(_misc.to_set(version)).strip('{}')
where.append(f'version IN ({version})')
if newer_than:
where.append(f'date > "{_utils.parse_time(newer_than)}"')
if older_than:
where.append(f'date < "{_utils.parse_time(older_than)}"')
if ext:
where.append(f'ext = "{ext}"')
if label:
where.append(f'label = "{label}"')
where = f' WHERE {" AND ".join(where)}' if where else ''
if version == -1: # TODO: Address cases where multiple items
where += ' ORDER BY version DESC LIMIT 1'
return where