Skip to content

Commit 17ec2c8

Browse files
authored
Merge pull request #41 from UncoderIO/refactoring
linter issues fix
2 parents 28851af + b5243fd commit 17ec2c8

File tree

173 files changed

+1731
-1777
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

173 files changed

+1731
-1777
lines changed

translator/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ WORKDIR /siem_converter
44
COPY . .
55
RUN pip install --upgrade pip && \
66
python -m pip install --upgrade setuptools && \
7-
pip install --trusted-host=pypi.python.org --trusted-host=pypi.org --trusted-host=files.pythonhosted.org --no-cache-dir -Ur requirements.txt
7+
pip install --trusted-host=pypi.python.org --trusted-host=pypi.org --trusted-host=files.pythonhosted.org --no-cache-dir -Ur requirements/requirements_prod.txt
88
EXPOSE 8000
99
CMD ["python", "server.py"]
Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,37 @@
1-
import datetime
21
import json
3-
from datetime import datetime
4-
from typing import List, Dict
52
import os
6-
3+
from collections.abc import Generator
74
from contextlib import asynccontextmanager
5+
from datetime import datetime
86

97
from fastapi import APIRouter, FastAPI
108

11-
from const import ROOT_PROJECT_PATH
12-
139
from app.translator.core.mitre import MitreConfig
10+
from const import ROOT_PROJECT_PATH
1411

1512
assistance_router = APIRouter()
1613

1714
suggestions = {}
1815

1916

2017
@asynccontextmanager
21-
async def lifespan(app: FastAPI):
18+
async def lifespan(app: FastAPI) -> Generator[None, None, None]: # noqa: ARG001
2219
MitreConfig().update_mitre_config()
23-
with open(os.path.join(ROOT_PROJECT_PATH, 'app/dictionaries/uncoder_meta_info_roota.json'), 'r') as file:
24-
json_f = json.load(file)
25-
suggestions['roota'] = json_f
26-
with open(os.path.join(ROOT_PROJECT_PATH, 'app/dictionaries/uncoder_meta_info_sigma.json'), 'r') as file:
27-
json_f = json.load(file)
28-
suggestions['sigma'] = json_f
20+
with open(os.path.join(ROOT_PROJECT_PATH, "app/dictionaries/uncoder_meta_info_roota.json")) as file:
21+
suggestions["roota"] = json.load(file)
22+
with open(os.path.join(ROOT_PROJECT_PATH, "app/dictionaries/uncoder_meta_info_sigma.json")) as file:
23+
suggestions["sigma"] = json.load(file)
2924
yield
3025

3126

32-
@assistance_router.get(
33-
'/suggestions/{parser_id}',
34-
tags=["assistance"],
35-
description="Get suggestions"
36-
)
37-
async def get_suggestions(parser_id: str) -> List[Dict]:
27+
@assistance_router.get("/suggestions/{parser_id}", tags=["assistance"], description="Get suggestions")
28+
async def get_suggestions(parser_id: str) -> list[dict]:
3829
parser_dict = suggestions.get(parser_id, [])
39-
if parser_id == 'roota':
40-
today = datetime.today().strftime('%Y-%m-%d')
30+
if parser_id == "roota":
31+
today = datetime.today().strftime("%Y-%m-%d")
4132
for i in parser_dict:
42-
if i['title'] == 'Date':
43-
for v in i['dictionary']:
44-
v['name'] = today
33+
if i["title"] == "Date":
34+
for v in i["dictionary"]:
35+
v["name"] = today
4536
return parser_dict
4637
return parser_dict
Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,48 @@
1-
from typing import Optional, List
1+
from typing import Optional
2+
23
from fastapi import APIRouter, Body
34

4-
from app.translator.tools.const import IOCType, HashType, IocParsingRule
5-
from app.translator.cti_translator import CTIConverter
65
from app.models.ioc_translation import CTIPlatform, OneTranslationCTIData
76
from app.models.translation import InfoMessage
7+
from app.translator.cti_translator import CTIConverter
8+
from app.translator.tools.const import HashType, IocParsingRule, IOCType
89

910
iocs_router = APIRouter()
1011
converter = CTIConverter()
1112

1213

13-
@iocs_router.post(
14-
"/iocs/translate",
15-
description="Parse IOCs from text.",
16-
)
14+
@iocs_router.post("/iocs/translate", description="Parse IOCs from text.")
1715
@iocs_router.post("/iocs/translate", include_in_schema=False)
1816
def parse_and_translate_iocs(
1917
text: str = Body(..., description="Text to parse IOCs from", embed=True),
2018
iocs_per_query: int = Body(25, description="Platforms to parse IOCs to", embed=True),
2119
platform: CTIPlatform = Body(..., description="Platforms to parse IOCs to", embed=True),
22-
include_ioc_types: Optional[List[IOCType]] = Body(
23-
None, description="List of IOC types to include. By default all types are enabled.", embed=True),
24-
include_hash_types: Optional[List[HashType]] = Body(
25-
None, description="List of hash types to include. By default all hash types are enabled.", embed=True),
26-
exceptions: Optional[List[str]] = Body(
27-
None, description="List of exceptions. IOC is ignored if it contains one of exception values.", embed=True),
28-
ioc_parsing_rules: Optional[List[IocParsingRule]] = Body(
29-
None, embed=True, description="Additional parsing parameters."),
30-
include_source_ip: Optional[bool] = Body(
31-
False, description="Include source IP in query. By default it is false."
32-
)
20+
include_ioc_types: Optional[list[IOCType]] = Body(
21+
None, description="List of IOC types to include. By default all types are enabled.", embed=True
22+
),
23+
include_hash_types: Optional[list[HashType]] = Body(
24+
None, description="List of hash types to include. By default all hash types are enabled.", embed=True
25+
),
26+
exceptions: Optional[list[str]] = Body(
27+
None, description="List of exceptions. IOC is ignored if it contains one of exception values.", embed=True
28+
),
29+
ioc_parsing_rules: Optional[list[IocParsingRule]] = Body(
30+
None, embed=True, description="Additional parsing parameters."
31+
),
32+
include_source_ip: Optional[bool] = Body(False, description="Include source IP in query. By default it is false."),
3333
) -> OneTranslationCTIData:
34-
status, translations = converter.convert(text=text,
35-
platform_data=platform,
36-
iocs_per_query=iocs_per_query,
37-
include_ioc_types=include_ioc_types,
38-
include_hash_types=include_hash_types,
39-
exceptions=exceptions,
40-
ioc_parsing_rules=ioc_parsing_rules,
41-
include_source_ip=include_source_ip)
34+
status, translations = converter.convert(
35+
text=text,
36+
platform_data=platform,
37+
iocs_per_query=iocs_per_query,
38+
include_ioc_types=include_ioc_types,
39+
include_hash_types=include_hash_types,
40+
exceptions=exceptions,
41+
ioc_parsing_rules=ioc_parsing_rules,
42+
include_source_ip=include_source_ip,
43+
)
4244
if status:
4345
return OneTranslationCTIData(status=status, translations=translations, target_siem_type=platform.name)
44-
else:
45-
info_message = InfoMessage(message=translations, severity="error")
46-
return OneTranslationCTIData(info=info_message, status=status, target_siem_type=platform.name)
46+
47+
info_message = InfoMessage(message=translations, severity="error")
48+
return OneTranslationCTIData(info=info_message, status=status, target_siem_type=platform.name)
Lines changed: 47 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,101 +1,90 @@
11
from fastapi import APIRouter, Body
22

3-
from app.translator.translator import SiemConverter
3+
from app.models.translation import ConvertorPlatforms, InfoMessage, OneTranslationData, Platform
44
from app.translator.cti_translator import CTIConverter
5-
from app.models.translation import OneTranslationData, ConvertorPlatforms, Platform, InfoMessage
5+
from app.translator.translator import SiemConverter
66

77
st_router = APIRouter()
88

99
converter = SiemConverter()
1010

1111

12-
@st_router.post(
13-
"/translate",
14-
tags=["siem_translate"],
15-
description="Generate target translation",
16-
)
12+
@st_router.post("/translate", tags=["siem_translate"], description="Generate target translation")
1713
@st_router.post("/translate/", include_in_schema=False)
1814
def generate_one_translation(
1915
source_siem: str = Body(..., embed=True),
20-
source_scheme: str = Body(None, embed=True),
16+
source_scheme: str = Body(None, embed=True), # noqa: ARG001
2117
target_siem: str = Body(..., embed=True),
22-
target_scheme: str = Body(None, embed=True),
18+
target_scheme: str = Body(None, embed=True), # noqa: ARG001
2319
text: str = Body(..., embed=True),
24-
2520
) -> OneTranslationData:
26-
status, data = converter.generate_translation(
27-
text=text,
28-
source=source_siem,
29-
target=target_siem
30-
)
21+
status, data = converter.generate_translation(text=text, source=source_siem, target=target_siem)
3122
if status:
32-
return OneTranslationData(
33-
status=status,
34-
translation=data,
35-
target_siem_type=target_siem)
36-
else:
37-
info_message = InfoMessage(message=data, severity="error")
38-
return OneTranslationData(
39-
info=info_message,
40-
status=status,
41-
target_siem_type=target_siem)
23+
return OneTranslationData(status=status, translation=data, target_siem_type=target_siem)
24+
25+
info_message = InfoMessage(message=data, severity="error")
26+
return OneTranslationData(info=info_message, status=status, target_siem_type=target_siem)
4227

4328

44-
@st_router.post(
45-
"/translate/all",
46-
tags=["siem_translate"],
47-
description="Generate all translations",
48-
)
29+
@st_router.post("/translate/all", tags=["siem_translate"], description="Generate all translations")
4930
@st_router.post("/translate/all/", include_in_schema=False)
5031
def generate_all_translations(
5132
source_siem: str = Body(..., embed=True),
52-
source_scheme: str = Body(None, embed=True),
33+
source_scheme: str = Body(None, embed=True), # noqa: ARG001
5334
text: str = Body(..., embed=True),
5435
) -> list[OneTranslationData]:
55-
result = converter.generate_all_translation(
56-
text=text,
57-
source=source_siem
58-
)
36+
result = converter.generate_all_translation(text=text, source=source_siem)
5937
translations = []
6038
for siem_result in result:
6139
if siem_result.get("status"):
62-
translations.append(OneTranslationData(
63-
status=siem_result.get("status", True),
64-
translation=siem_result.get("result"),
65-
target_siem_type=siem_result.get("siem_type"))
40+
translations.append(
41+
OneTranslationData(
42+
status=siem_result.get("status", True),
43+
translation=siem_result.get("result"),
44+
target_siem_type=siem_result.get("siem_type"),
45+
)
6646
)
6747
else:
68-
translations.append(OneTranslationData(
69-
status=siem_result.get("status", False),
70-
info=InfoMessage(message=siem_result.get("result"), severity="error"),
71-
target_siem_type=siem_result.get("siem_type"))
48+
translations.append(
49+
OneTranslationData(
50+
status=siem_result.get("status", False),
51+
info=InfoMessage(message=siem_result.get("result"), severity="error"),
52+
target_siem_type=siem_result.get("siem_type"),
53+
)
7254
)
7355
return translations
7456

7557

76-
@st_router.get(
77-
"/platforms",
78-
tags=["siem_translate"],
79-
description="Get translator platforms",
80-
)
58+
@st_router.get("/platforms", tags=["siem_translate"], description="Get translator platforms")
8159
@st_router.get("/platforms/", include_in_schema=False)
8260
def get_convertor_platforms() -> ConvertorPlatforms:
8361
renders, parsers = converter.get_all_platforms()
8462
return ConvertorPlatforms(renders=renders, parsers=parsers)
8563

8664

87-
@st_router.get(
88-
"/all_platforms",
89-
description="Get Sigma, RootA and iocs platforms",
90-
)
65+
@st_router.get("/all_platforms", description="Get Sigma, RootA and iocs platforms")
9166
@st_router.get("/all_platforms/", include_in_schema=False)
9267
def get_all_platforms() -> list:
9368
converter_renders, converter_platforms = converter.get_all_platforms()
9469
return [
95-
Platform(id="roota", name="RootA", code="roota", group_name="RootA", group_id="roota",
96-
renders=converter_renders, parsers=converter_platforms),
97-
Platform(id="sigma", name="Sigma", code="sigma", group_name="Sigma", group_id="sigma",
98-
renders=[render for render in converter_renders if render.code != "sigma"]),
99-
Platform(id="ioc", name="IOCs", code="ioc", group_name="IOCs", group_id="ioc",
100-
renders=CTIConverter().get_renders())
70+
Platform(
71+
id="roota",
72+
name="RootA",
73+
code="roota",
74+
group_name="RootA",
75+
group_id="roota",
76+
renders=converter_renders,
77+
parsers=converter_platforms,
78+
),
79+
Platform(
80+
id="sigma",
81+
name="Sigma",
82+
code="sigma",
83+
group_name="Sigma",
84+
group_id="sigma",
85+
renders=[render for render in converter_renders if render.code != "sigma"],
86+
),
87+
Platform(
88+
id="ioc", name="IOCs", code="ioc", group_name="IOCs", group_id="ioc", renders=CTIConverter().get_renders()
89+
),
10190
]

translator/app/translator/const.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from os.path import abspath, dirname
2-
from typing import Union, List
2+
from typing import Union
33

44
APP_PATH = dirname(abspath(__file__))
55

66
CTI_MIN_LIMIT_QUERY = 10000
77

88
CTI_IOCS_PER_QUERY_LIMIT = 25
99

10-
DEFAULT_VALUE_TYPE = Union[Union[int, str, List[int], List[str]]]
10+
DEFAULT_VALUE_TYPE = Union[int, str, list[int], list[str]]
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from app.translator.tools.custom_enum import CustomEnum
2+
3+
4+
class SeverityType(CustomEnum):
5+
critical = "critical"
6+
high = "high"
7+
medium = "medium"
8+
low = "low"
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import re
22
from abc import ABC
3-
from typing import Union
3+
from typing import ClassVar, Union
44

55
from app.translator.core.custom_types.values import ValueType
66
from app.translator.core.models.escape_details import EscapeDetails
77

88

99
class EscapeManager(ABC):
10-
escape_map: dict[str, EscapeDetails] = {}
10+
escape_map: ClassVar[dict[str, EscapeDetails]] = {}
1111

1212
def escape(self, value: Union[str, int], value_type: str = ValueType.value) -> Union[str, int]:
1313
if isinstance(value, int):
@@ -17,8 +17,8 @@ def escape(self, value: Union[str, int], value_type: str = ValueType.value) -> U
1717
value = symbols_pattern.sub(escape_details.escape_symbols, value)
1818
return value
1919

20-
def remove_escape(self, value: Union[str, int]) -> Union[str, int]:
20+
@staticmethod
21+
def remove_escape(value: Union[str, int]) -> Union[str, int]:
2122
if isinstance(value, int):
2223
return value
23-
value = value.encode().decode("unicode_escape")
24-
return value
24+
return value.encode().decode("unicode_escape")

0 commit comments

Comments
 (0)