Skip to content

Commit edaa06a

Browse files
committed
SQLAlchemy: let database do work where it can
provide subclasses for salt.utils.minions.CkMinions and salt.key.Key that use sqlalchemy to make things go fast that would otherwise require a full fetch of the entire minion data cache. the larger your installation the bigger the improvement will be. The CkMinions optimization heavily relies on GIN indexing with @> contains queries, so is postgresql speciific. mysql/sqlite unfortunately don't offer anything equivalent to translate to.
1 parent d7c9fe6 commit edaa06a

File tree

4 files changed

+418
-52
lines changed

4 files changed

+418
-52
lines changed

changelog/68108.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
SQLAlchemy: implement salt.utils.minions.CkMinions/salt.key.Keys subclasses to leverage database

salt/key.py

Lines changed: 146 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,27 @@
1414
import salt.crypt
1515
import salt.exceptions
1616
import salt.payload
17-
import salt.transport
17+
import salt.sqlalchemy
1818
import salt.utils.args
1919
import salt.utils.crypt
2020
import salt.utils.data
2121
import salt.utils.event
2222
import salt.utils.files
23-
import salt.utils.json
2423
import salt.utils.kinds
25-
import salt.utils.minions
2624
import salt.utils.sdb
27-
import salt.utils.stringutils
28-
import salt.utils.user
2925
from salt.utils.decorators import cached_property
3026

3127
log = logging.getLogger(__name__)
3228

3329

3430
def get_key(opts):
35-
return Key(opts)
31+
if opts.get("keys.cache_driver") == "sqlalchemy":
32+
if not salt.sqlalchemy.orm_configured():
33+
salt.sqlalchemy.configure_orm(opts)
34+
35+
return SqlAlchemyKey(opts)
36+
else:
37+
return Key(opts)
3638

3739

3840
class KeyCLI:
@@ -562,14 +564,12 @@ def dict_match(self, match_dict):
562564
Accept a dictionary of keys and return the current state of the
563565
specified keys
564566
"""
565-
ret = {}
566-
cur_keys = self.list_keys()
567-
for status, keys in match_dict.items():
568-
for key in salt.utils.data.sorted_ignorecase(keys):
569-
for keydir in (self.ACC, self.PEND, self.REJ, self.DEN):
570-
if keydir and fnmatch.filter(cur_keys.get(keydir, []), key):
571-
ret.setdefault(keydir, []).append(key)
572-
return ret
567+
matches = []
568+
# not sure why this interface was ever added. just toss to glob match
569+
for match_list in match_dict.values():
570+
matches.extend(match_list)
571+
572+
return self.glob_match(matches)
573573

574574
def list_keys(self):
575575
"""
@@ -797,14 +797,21 @@ def delete_key(
797797
"master AES key is rotated or auth is revoked "
798798
"with 'saltutil.revoke_auth'.".format(key)
799799
)
800+
deleted = False
800801
if status == "minions_denied":
801802
self.cache.flush("denied_keys", key)
803+
deleted = True
802804
else:
803-
self.cache.flush("keys", key)
804-
eload = {"result": True, "act": "delete", "id": key}
805-
self.event.fire_event(
806-
eload, salt.utils.event.tagify(prefix="key")
807-
)
805+
val = self.cache.fetch("keys", key)
806+
if val and self.STATE_MAP[val["state"]] == status:
807+
self.cache.flush("keys", key)
808+
deleted = True
809+
810+
if deleted:
811+
eload = {"result": True, "act": "delete", "id": key}
812+
self.event.fire_event(
813+
eload, salt.utils.event.tagify(prefix="key")
814+
)
808815
except OSError:
809816
pass
810817
if self.opts.get("preserve_minions") is True:
@@ -925,3 +932,123 @@ def __enter__(self):
925932

926933
def __exit__(self, *args):
927934
self.event.destroy()
935+
936+
937+
try:
938+
from sqlalchemy import or_, select
939+
940+
from salt.sqlalchemy.models import model_for
941+
except ImportError:
942+
# stubs to appease pyright
943+
model_for = select = or_ = print
944+
945+
946+
class SqlAlchemyKey(Key):
947+
def __init__(self, opts, *args, **kwargs):
948+
# unfortunately real null can't be used as a pk in certain backends so
949+
# it must be mapped to a string null
950+
self.cluster_id = opts["cluster_id"] or "null"
951+
super().__init__(opts, *args, **kwargs)
952+
953+
def list_keys(self):
954+
"""
955+
Return a dict of managed keys and what the key status are
956+
"""
957+
ret = {
958+
"minions_pre": [],
959+
"minions_rejected": [],
960+
"minions": [],
961+
"minions_denied": [],
962+
}
963+
964+
Cache = model_for("Cache")
965+
with salt.sqlalchemy.ROSession() as session:
966+
stmt = select(Cache.key, Cache.data["state"]).where(
967+
or_(Cache.bank == "keys", Cache.bank == "denied_keys"),
968+
Cache.cluster == self.cluster_id,
969+
)
970+
results = session.execute(stmt).all()
971+
session.commit()
972+
973+
for id_, state in results:
974+
if state == "accepted":
975+
ret["minions"].append(id_)
976+
elif state == "pending":
977+
ret["minions_pre"].append(id_)
978+
elif state == "rejected":
979+
ret["minions_rejected"].append(id_)
980+
981+
for id_ in self.cache.list("denied_keys"):
982+
ret["minions_denied"].append(id_)
983+
984+
return ret
985+
986+
def list_match(self, match):
987+
"""
988+
Accept a glob which to match the of a key and return the key's location
989+
"""
990+
ret = {}
991+
if isinstance(match, str):
992+
match = match.split(",")
993+
994+
ret = {}
995+
996+
Cache = model_for("Cache")
997+
with salt.sqlalchemy.ROSession() as session:
998+
stmt = select(Cache.key, Cache.bank, Cache.data["state"]).where(
999+
Cache.cluster == self.cluster_id,
1000+
Cache.key.in_(match),
1001+
or_(Cache.bank == "keys", Cache.bank == "denied_keys"),
1002+
)
1003+
1004+
results = session.execute(stmt).all()
1005+
for _id, bank, state in results:
1006+
# backward compatibility stuff requires denied_keys be handled differently
1007+
if bank == "denied_keys":
1008+
state = "denied"
1009+
1010+
ret.setdefault(self.STATE_MAP[state], [])
1011+
ret[self.STATE_MAP[state]].append(_id)
1012+
session.commit()
1013+
1014+
return ret
1015+
1016+
def glob_match(self, match, full=False): # pylint: disable=unused-argument
1017+
"""
1018+
Return the minions found by looking via globs
1019+
"""
1020+
ret = {}
1021+
1022+
# optimization:
1023+
# if there is no glob in the expression, we can just treat it as a single element list
1024+
if isinstance(match, str) and "*" not in match:
1025+
return self.list_match(match)
1026+
1027+
if isinstance(match, list):
1028+
match = ",".join(match)
1029+
1030+
# we want to translate shell globs to an equivalent ILIKE query
1031+
match = match.replace("_", "\\_")
1032+
match = match.replace("%", "\\%")
1033+
match = match.replace("*", "%")
1034+
match = match.split(",")
1035+
1036+
Cache = model_for("Cache")
1037+
with salt.sqlalchemy.ROSession() as session:
1038+
stmt = select(Cache.key, Cache.bank, Cache.data["state"]).where(
1039+
Cache.cluster == self.cluster_id,
1040+
or_(Cache.bank == "keys", Cache.bank == "denied_keys"),
1041+
or_(Cache.key.ilike(ex) for ex in match),
1042+
)
1043+
1044+
results = session.execute(stmt).all()
1045+
for _id, bank, state in results:
1046+
# backward compatibility stuff requires denied_keys be handled differently
1047+
if bank == "denied_keys":
1048+
state = "denied"
1049+
1050+
ret.setdefault(self.STATE_MAP[state], [])
1051+
ret[self.STATE_MAP[state]].append(_id)
1052+
session.commit()
1053+
1054+
return ret

salt/sqlalchemy/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ def configure_orm(opts):
253253

254254
ENGINE_REGISTRY[name] = {}
255255
ENGINE_REGISTRY[name]["engine"] = _engine
256+
ENGINE_REGISTRY[name]["dialect"] = _engine.dialect.name
256257
ENGINE_REGISTRY[name]["session"] = _Session
257258
ENGINE_REGISTRY[name]["ro_engine"] = _ro_engine
258259
ENGINE_REGISTRY[name]["ro_session"] = _ROSession
@@ -310,6 +311,20 @@ def reconfigure_orm(opts):
310311
configure_orm(opts)
311312

312313

314+
def get_engine(name=None):
315+
"""
316+
Get a SQLAlchemy engine by name
317+
"""
318+
if not name:
319+
name = "default"
320+
try:
321+
return ENGINE_REGISTRY[name]["engine"]
322+
except KeyError:
323+
raise salt.exceptions.SaltInvocationError(
324+
f"ORM not configured for '{name}' yet. Did you forget to call salt.sqlalchemy.configure_orm?"
325+
)
326+
327+
313328
def Session(name=None):
314329
"""
315330
Get a SQLAlchemy session for database operations.

0 commit comments

Comments
 (0)