Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
update mapping selection flow
  • Loading branch information
alexvolha committed Jul 30, 2024
commit 818cc735015ce4193dcafd1852221856a62d9c37
29 changes: 24 additions & 5 deletions uncoder-core/app/translator/core/mapping.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional, TypeVar
from typing import TYPE_CHECKING, Optional, TypeVar, Union

from app.translator.core.exceptions.core import StrictPlatformException
from app.translator.core.models.platform_details import PlatformDetails
Expand All @@ -19,9 +19,14 @@ class LogSourceSignature(ABC):
wildcard_symbol = "*"

@abstractmethod
def is_suitable(self, *args, **kwargs) -> bool:
def is_suitable(self, **kwargs) -> bool:
raise NotImplementedError("Abstract method")

@staticmethod
def _check_conditions(conditions: list[Union[bool, None]]) -> bool:
conditions = [condition for condition in conditions if condition is not None]
return bool(conditions) and all(conditions)

@abstractmethod
def __str__(self) -> str:
raise NotImplementedError("Abstract method")
Expand Down Expand Up @@ -147,9 +152,23 @@ def prepare_fields_mapping(field_mapping: dict) -> FieldsMapping:
def prepare_log_source_signature(self, mapping: dict) -> LogSourceSignature:
raise NotImplementedError("Abstract method")

@abstractmethod
def get_suitable_source_mappings(self, *args, **kwargs) -> list[SourceMapping]:
raise NotImplementedError("Abstract method")
def get_suitable_source_mappings(
self, field_names: list[str], log_sources: dict[str, list[Union[int, str]]]
) -> list[SourceMapping]:
by_log_sources_and_fields = []
by_fields = []
for source_mapping in self._source_mappings.values():
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
continue

if source_mapping.fields_mapping.is_suitable(field_names):
by_fields.append(source_mapping)

log_source_signature: LogSourceSignature = source_mapping.log_source_signature
if log_source_signature.is_suitable(**log_sources):
by_log_sources_and_fields.append(source_mapping)

return by_log_sources_and_fields or by_fields or [self._source_mappings[DEFAULT_MAPPING_NAME]]

def get_source_mapping(self, source_id: str) -> Optional[SourceMapping]:
return self._source_mappings.get(source_id)
Expand Down
20 changes: 7 additions & 13 deletions uncoder-core/app/translator/core/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,30 +62,24 @@ def get_query_tokens(self, query: str) -> list[QUERY_TOKEN_TYPE]:
raise TokenizerGeneralException("Can't translate empty query. Please provide more details")
return self.tokenizer.tokenize(query=query)

@staticmethod
def get_field_tokens(
self, query_tokens: list[QUERY_TOKEN_TYPE], functions: Optional[list[Function]] = None
query_tokens: list[QUERY_TOKEN_TYPE], functions: Optional[list[Function]] = None
) -> list[Field]:
field_tokens = []
for token in query_tokens:
if isinstance(token, FieldValue):
field_tokens.append(token.field)
elif isinstance(token, FieldField):
if token.field_left:
field_tokens.append(token.field_left)
if token.field_right:
field_tokens.append(token.field_right)
elif isinstance(token, FunctionValue):
field_tokens.extend(self.tokenizer.get_field_tokens_from_func_args([token.function]))
if isinstance(token, (FieldField, FieldValue, FunctionValue)):
field_tokens.extend(token.fields)

if functions:
field_tokens.extend(self.tokenizer.get_field_tokens_from_func_args(functions))
field_tokens.extend([field for func in functions for field in func.fields])

return field_tokens

def get_source_mappings(
self, field_tokens: list[Field], log_sources: dict[str, Union[str, list[str]]]
self, field_tokens: list[Field], log_sources: dict[str, list[Union[int, str]]]
) -> list[SourceMapping]:
field_names = [field.source_name for field in field_tokens]
source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, **log_sources)
source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, log_sources=log_sources)
self.tokenizer.set_field_tokens_generic_names_map(field_tokens, source_mappings, self.mappings.default_mapping)
return source_mappings
20 changes: 1 addition & 19 deletions uncoder-core/app/translator/platforms/athena/mapping.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
from app.translator.platforms.athena.const import athena_query_details


Expand All @@ -22,23 +22,5 @@ def prepare_log_source_signature(self, mapping: dict) -> AthenaLogSourceSignatur
default_log_source = mapping["default_log_source"]
return AthenaLogSourceSignature(tables=tables, default_source=default_log_source)

def get_suitable_source_mappings(self, field_names: list[str], table: Optional[str]) -> list[SourceMapping]:
suitable_source_mappings = []
for source_mapping in self._source_mappings.values():
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
continue

log_source_signature: AthenaLogSourceSignature = source_mapping.log_source_signature
if table and log_source_signature.is_suitable(table=table):
if source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)
elif source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)

if not suitable_source_mappings:
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]

return suitable_source_mappings


athena_query_mappings = AthenaMappings(platform_dir="athena", platform_details=athena_query_details)
57 changes: 11 additions & 46 deletions uncoder-core/app/translator/platforms/base/aql/mapping.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature


class AQLLogSourceSignature(LogSourceSignature):
Expand All @@ -20,23 +20,18 @@ def __init__(

def is_suitable(
self,
devicetype: Optional[list[int]],
category: Optional[list[int]],
qid: Optional[list[int]],
qideventcategory: Optional[list[int]],
devicetype: Optional[list[int]] = None,
category: Optional[list[int]] = None,
qid: Optional[list[int]] = None,
qideventcategory: Optional[list[int]] = None,
) -> bool:
device_type_match = set(devicetype).issubset(self.device_types) if devicetype else None
category_match = set(category).issubset(self.categories) if category else None
qid_match = set(qid).issubset(self.qids) if qid else None
qid_event_category_match = (
set(qideventcategory).issubset(self.qid_event_categories) if qideventcategory else None
)
all_conditions = [
condition
for condition in (device_type_match, category_match, qid_match, qid_event_category_match)
if condition is not None
conditions = [
set(devicetype).issubset(self.device_types) if devicetype else None,
set(category).issubset(self.categories) if category else None,
set(qid).issubset(self.qids) if qid else None,
set(qideventcategory).issubset(self.qid_event_categories) if qideventcategory else None,
]
return bool(all_conditions) and all(all_conditions)
return self._check_conditions(conditions)

def __str__(self) -> str:
return self._default_source.get("table", "events")
Expand All @@ -61,33 +56,3 @@ def prepare_log_source_signature(self, mapping: dict) -> AQLLogSourceSignature:
qid_event_categories=log_source.get("qideventcategory"),
default_source=default_log_source,
)

def get_suitable_source_mappings(
self,
field_names: list[str],
devicetype: Optional[list[int]] = None,
category: Optional[list[int]] = None,
qid: Optional[list[int]] = None,
qideventcategory: Optional[list[int]] = None,
) -> list[SourceMapping]:
suitable_source_mappings = []
for source_mapping in self._source_mappings.values():
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
continue

log_source_signature: AQLLogSourceSignature = source_mapping.log_source_signature
if log_source_signature.is_suitable(devicetype, category, qid, qideventcategory): # noqa: SIM102
if source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)

if not suitable_source_mappings:
for source_mapping in self._source_mappings.values():
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
continue
if source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)

if not suitable_source_mappings:
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]

return suitable_source_mappings
29 changes: 3 additions & 26 deletions uncoder-core/app/translator/platforms/base/lucene/mapping.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from typing import Optional

from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature


class LuceneLogSourceSignature(LogSourceSignature):
def __init__(self, indices: Optional[list[str]], default_source: dict):
self.indices = set(indices or [])
self._default_source = default_source or {}

def is_suitable(self, index: Optional[list[str]]) -> bool:
return set(index or []).issubset(self.indices)
def is_suitable(self, index: Optional[list[str]] = None, **kwargs) -> bool: # noqa: ARG002
return self._check_conditions([set(index).issubset(self.indices) if index else None])

def __str__(self) -> str:
return self._default_source.get("index", "")
Expand All @@ -20,26 +20,3 @@ def prepare_log_source_signature(self, mapping: dict) -> LuceneLogSourceSignatur
indices = mapping.get("log_source", {}).get("index")
default_log_source = mapping.get("default_log_source", {})
return LuceneLogSourceSignature(indices=indices, default_source=default_log_source)

def get_suitable_source_mappings(
self,
field_names: list[str],
index: Optional[list[str]] = None,
**kwargs, # noqa: ARG002
) -> list[SourceMapping]:
suitable_source_mappings = []
for source_mapping in self._source_mappings.values():
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
continue

log_source_signature: LuceneLogSourceSignature = source_mapping.log_source_signature
if index and log_source_signature.is_suitable(index=index):
if source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)
elif source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)

if not suitable_source_mappings:
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]

return suitable_source_mappings
7 changes: 3 additions & 4 deletions uncoder-core/app/translator/platforms/base/sql/parsers/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
"""

import re
from typing import Optional

from app.translator.core.models.query_container import RawQueryContainer, TokenizedQueryContainer
from app.translator.core.parser import PlatformQueryParser
Expand All @@ -31,12 +30,12 @@ class SqlQueryParser(PlatformQueryParser):

wrapped_with_comment_pattern = r"^\s*--.*(?:\n|$)"

def _parse_query(self, query: str) -> tuple[str, dict[str, Optional[str]]]:
log_source = {"table": None}
def _parse_query(self, query: str) -> tuple[str, dict[str, list[str]]]:
log_source = {"table": []}
if re.search(self.query_delimiter_pattern, query, flags=re.IGNORECASE):
table_search = re.search(self.table_pattern, query)
table = table_search.group("table")
log_source["table"] = table
log_source["table"] = [table]
return re.split(self.query_delimiter_pattern, query, flags=re.IGNORECASE)[1], log_source

return query, log_source
Expand Down
18 changes: 2 additions & 16 deletions uncoder-core/app/translator/platforms/chronicle/mapping.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
from app.translator.platforms.chronicle.const import chronicle_query_details, chronicle_rule_details


class ChronicleLogSourceSignature(LogSourceSignature):
def is_suitable(self) -> bool:
raise NotImplementedError
raise True

def __str__(self) -> str:
return ""
Expand All @@ -16,20 +16,6 @@ class ChronicleMappings(BasePlatformMappings):
def prepare_log_source_signature(self, mapping: dict) -> ChronicleLogSourceSignature:
...

def get_suitable_source_mappings(self, field_names: list[str]) -> list[SourceMapping]:
suitable_source_mappings = []
for source_mapping in self._source_mappings.values():
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
continue

if source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)

if not suitable_source_mappings:
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]

return suitable_source_mappings


chronicle_query_mappings = ChronicleMappings(platform_dir="chronicle", platform_details=chronicle_query_details)
chronicle_rule_mappings = ChronicleMappings(platform_dir="chronicle", platform_details=chronicle_rule_details)
21 changes: 4 additions & 17 deletions uncoder-core/app/translator/platforms/crowdstrike/mapping.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
from app.translator.platforms.crowdstrike.const import crowdstrike_query_details


Expand All @@ -9,8 +9,9 @@ def __init__(self, event_simple_name: Optional[list[str]], default_source: dict)
self.event_simple_names = set(event_simple_name or [])
self._default_source = default_source or {}

def is_suitable(self, event_simple_name: list[str]) -> bool:
return set(event_simple_name).issubset(self.event_simple_names)
def is_suitable(self, event_simple_name: Optional[list[str]] = None) -> bool:
conditions = [set(event_simple_name).issubset(self.event_simple_names) if event_simple_name else None]
return self._check_conditions(conditions)

def __str__(self) -> str:
return f"event_simpleName={self._default_source['event_simpleName']}"
Expand All @@ -24,19 +25,5 @@ def prepare_log_source_signature(self, mapping: dict) -> CrowdStrikeLogSourceSig
event_simple_name=log_source.get("event_simpleName"), default_source=default_log_source
)

def get_suitable_source_mappings(self, field_names: list[str], event_simpleName: list[str]) -> list[SourceMapping]: # noqa: N803
suitable_source_mappings = []
for source_mapping in self._source_mappings.values():
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
continue

source_signature: CrowdStrikeLogSourceSignature = source_mapping.log_source_signature
if source_signature.is_suitable(
event_simple_name=event_simpleName
) and source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)

return suitable_source_mappings or [self._source_mappings[DEFAULT_MAPPING_NAME]]


crowdstrike_query_mappings = CrowdstrikeMappings(platform_dir="crowdstrike", platform_details=crowdstrike_query_details)
29 changes: 3 additions & 26 deletions uncoder-core/app/translator/platforms/forti_siem/mapping.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
from typing import Optional

from app.translator.core.mapping import (
DEFAULT_MAPPING_NAME,
BaseCommonPlatformMappings,
LogSourceSignature,
SourceMapping,
)
from app.translator.core.mapping import BaseCommonPlatformMappings, LogSourceSignature
from app.translator.platforms.forti_siem.const import forti_siem_rule_details


Expand All @@ -14,8 +9,8 @@ def __init__(self, event_types: Optional[list[str]], default_source: dict):
self.event_types = set(event_types or [])
self._default_source = default_source or {}

def is_suitable(self, event_type: str) -> bool:
return event_type in self.event_types
def is_suitable(self, event_type: Optional[list[str]] = None) -> bool:
return self._check_conditions([set(event_type).issubset(self.event_types) if event_type else None])

def __str__(self) -> str:
event_type = self._default_source.get("eventType", "")
Expand All @@ -39,23 +34,5 @@ def prepare_log_source_signature(self, mapping: dict) -> FortiSiemLogSourceSigna
default_log_source = mapping["default_log_source"]
return FortiSiemLogSourceSignature(event_types=event_types, default_source=default_log_source)

def get_suitable_source_mappings(self, field_names: list[str], event_type: Optional[str]) -> list[SourceMapping]:
suitable_source_mappings = []
for source_mapping in self._source_mappings.values():
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
continue

log_source_signature: FortiSiemLogSourceSignature = source_mapping.log_source_signature
if event_type and log_source_signature.is_suitable(event_type=event_type):
if source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)
elif source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)

if not suitable_source_mappings:
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]

return suitable_source_mappings


forti_siem_rule_mappings = FortiSiemMappings(platform_dir="forti_siem", platform_details=forti_siem_rule_details)
Loading