Source code for gd.http

import asyncio
import atexit
import random
import time
import types
import uuid

from pathlib import Path

import aiohttp
import tqdm  # type: ignore
from yarl import URL

from gd.api.recording import Recording, RecordingEntry
from gd.async_utils import get_running_loop, maybe_coroutine, shutdown_loop
from gd.converters import GameVersion, Password, Version
from gd.crypto import (  # generate_leaderboard_seed,
    Key,
    Salt,
    encode_base64_str,
    encode_robtop_str,
    generate_chk,
    generate_level_seed,
    generate_rs,
    generate_rs_and_encode_number,
    zip_level_str,
)
from gd.decorators import synchronize
from gd.enums import (
    AccountURLType,
    CommentState,
    CommentStrategy,
    CommentType,
    DemonDifficulty,
    FriendRequestState,
    FriendRequestType,
    IconType,
    LeaderboardStrategy,
    LevelLeaderboardStrategy,
    LevelLength,
    LikeType,
    MessageState,
    MessageType,
    RewardType,
    SearchStrategy,
    Secret,
    SimpleRelationshipType,
)
from gd.errors import (
    CommentBanned,
    HTTPError,
    HTTPStatusError,
    LoginFailure,
    LoginRequired,
    MissingAccess,
    NothingFound,
    SongRestricted,
)
from gd.filters import Filters
from gd.logging import get_logger
from gd.model import CommentBannedModel  # type: ignore
from gd.text_utils import is_level_probably_decoded, make_repr, object_count, snake_to_camel
from gd.typing import (
    JSON,
    Any,
    Awaitable,
    Callable,
    Dict,
    IO,
    Iterable,
    Mapping,
    Optional,
    Set,
    Type,
    TypeVar,
    Union,
    cast,
)
from gd.version import python_version_info, version_info

__all__ = ("Route", "HTTPClient")

log = get_logger(__name__)

DATABASE = "database"
ROOT = "/"

BASE = "http://www.boomlings.com/database"
GD_BASE = "http://geometrydash.com/database"

NEWGROUNDS_SONG_LISTEN = "https://www.newgrounds.com/audio/listen/{song_id}"
NEWGROUNDS_SONG_PAGE = "https://{name}.newgrounds.com/audio/page/{page}"
NEWGROUNDS_SEARCH = "https://www.newgrounds.com/search/conduct/{type}"

# I might sound stupid but I really like "XMLHTTPRequest", and so I wrote this ~ nekit
XML_HTTP_REQUEST = "XML" + "HTTP".title() + "Request"

CLIENTS: Set["HTTPClient"] = set()

VALID_ERRORS = (OSError, aiohttp.ClientError)

HEAD = "HEAD"
GET = "GET"

POST = "POST"
PUT = "PUT"
PATCH = "PATCH"

DELETE = "DELETE"

CONNECT = "CONNECT"
OPTIONS = "OPTIONS"
TRACE = "TRACE"

CHUNK_SIZE = 64 * 1024

T = TypeVar("T")

RequestHook = Callable[["HTTPClient"], Union[T, Awaitable[T]]]
ResponseData = Union[bytes, str, JSON]

COMMENT_TO_ADD = 1 << 31


async def read_data(
    response: aiohttp.ClientResponse,
    raw: bool = False,
    json: bool = False,
    encoding: str = "utf-8",
) -> ResponseData:
    if raw:
        return await response.read()

    elif json:
        return await response.json(encoding=encoding, content_type=None)

    else:
        return await response.text(encoding=encoding)


def is_error_code(data: Union[bytes, str]) -> bool:
    if isinstance(data, bytes):
        return bool(data) and data[0] == b"-" and data[1:].isdigit()

    else:
        return bool(data) and data[0] == "-" and data[1:].isdigit()


def int_or(data: Union[bytes, str], default: int = 0) -> int:
    try:
        return int(data)

    except ValueError:
        return default


def unexpected_error_code(error_code: int) -> MissingAccess:
    return MissingAccess(f"Got unexpected error code: {error_code}.")


def snake_to_camel_with_id(string: str) -> str:
    return snake_to_camel(string).replace("Id", "ID")


class Route:
    def __init__(
        self,
        method: str,
        path: str,
        *,
        to_camel: bool = False,
        are_params: bool = False,
        **parameters,
    ) -> None:
        self.method = method
        self.path = path.strip("/")
        self.are_params = are_params

        self.parameters: Dict[str, Any] = {}

        self.update(parameters, to_camel=to_camel)

    def update(
        self, mapping: Mapping[str, Any] = None, *, to_camel: bool = False, **parameters,
    ) -> None:
        if mapping is not None:
            parameters.update(mapping)

        if to_camel:
            self.parameters.update(
                {snake_to_camel_with_id(name): value for name, value in parameters.items()}
            )

        else:
            self.parameters.update(parameters)

    def __str__(self) -> str:
        return f"{self.method} {self.path}"

    def __repr__(self) -> str:
        info = {"method": self.method, "path": self.path, "parameters": self.parameters}

        return make_repr(self, info)


async def close_all_clients() -> None:
    for client in CLIENTS:
        await client.close()

    CLIENTS.clear()


def close_all_clients_sync() -> None:
    loop = asyncio.new_event_loop()

    asyncio.set_event_loop(loop)

    loop.run_until_complete(close_all_clients())

    shutdown_loop(loop)


atexit.register(close_all_clients_sync)


[docs]@synchronize class HTTPClient: USER_AGENT = f"python/{python_version_info} gd.py/{version_info}" REQUEST_LOG = "{method} {url} has returned {status}" SUCCESS_LOG = "{method} {url} has received {data}" DEFAULT_SKIP_HEADERS = ["Accept-Encoding", "User-Agent"] def __init__( self, *, url: Union[str, URL] = BASE, proxy: Optional[str] = None, proxy_auth: Optional[aiohttp.BasicAuth] = None, timeout: Union[float, int] = 150, game_version: GameVersion = GameVersion(2, 1), binary_version: Version = Version(3, 5), gd_world: bool = False, forwarded_for: Optional[str] = None, use_user_agent: bool = False, ) -> None: self.session: Optional[aiohttp.ClientSession] = None self.set_url(url) self.proxy = proxy self.proxy_auth = proxy_auth self.timeout = timeout self.game_version = game_version self.binary_version = binary_version self.gd_world = gd_world self.forwarded_for = forwarded_for self.use_user_agent = use_user_agent self._before_request: Optional[RequestHook] = None self._after_request: Optional[RequestHook] = None CLIENTS.add(self) def __repr__(self) -> str: info = { "url": self.str_url, "timeout": self.timeout, "game_version": self.game_version, "binary_version": self.binary_version, "gd_world": self.gd_world, } return make_repr(self, info) def change(self, **attrs) -> "HTTPClientContextManager": return HTTPClientContextManager(self, **attrs) def get_url(self) -> URL: return URL(self.str_url) def set_url(self, url: Union[str, URL]) -> None: self.str_url = str(url) url = property(get_url, set_url) def create_timeout(self) -> aiohttp.ClientTimeout: return aiohttp.ClientTimeout(total=self.timeout) def before_request(self, request_hook: RequestHook) -> RequestHook: self._before_request = request_hook return request_hook def after_request(self, request_hook: RequestHook) -> RequestHook: self._after_request = request_hook return request_hook async def call_before_request_hook(self) -> Optional[T]: if self._before_request: return await maybe_coroutine(self._before_request, self) return None async def call_after_request_hook(self) -> Optional[T]: if self._after_request: return await maybe_coroutine(self._after_request, self) return None async def close(self) -> None: if self.session is not None: await self.session.close() self.session = None async def create_session(self) -> aiohttp.ClientSession: return aiohttp.ClientSession(skip_auto_headers=self.DEFAULT_SKIP_HEADERS) async def ensure_session(self) -> None: if self.session is None: self.session = await self.create_session() loop = get_running_loop() maybe_loop = getattr(self.session, "_loop", None) # XXX: keep up with aiohttp's internals if maybe_loop is not None and maybe_loop is not loop: await self.close() self.session = await self.create_session()
[docs] async def download( self, url: Union[URL, str], method: str = GET, chunk_size: int = CHUNK_SIZE, with_bar: bool = False, close: bool = False, file: Optional[Union[str, Path, IO]] = None, **kwargs, ) -> Optional[bytes]: r"""Download the file at ``url`` with ``method``. Parameters ---------- method: :class:`str` HTTP method to send request with. Default is ``GET``. url: Union[:class:`~yarl.URL`, :class:`str`] URL to request file from. chunk_size: :class:`int` Amount of data to read for one chunk. ``-1`` to read until EOF. with_bar: :class:`bool` Whether to show progress bar when downloading. ``False`` by default. close: :class:`bool` Whether to close the underlying ``file`` after finishing. file: Optional[Union[:class:`str`, :class:`~pathlib.Path`, IO]] File to write downloaded data to. If not given, this function returns all the data as the result. \*\*kwargs Keywoard arguments to pass to :meth:`aiohttp.ClientSession.request`. Returns ------- Optional[:class:`bytes`] Data downloaded, if ``file`` is not given or ``None``. Otherwise, returns ``None``. """ if isinstance(file, (str, Path)): file = open(file, "wb") close = True await self.ensure_session() async with self.session.request( # type: ignore url=url, method=method, **kwargs ) as response: if file is None: result = bytes() if with_bar: bar = tqdm.tqdm(total=response.content_length, unit="b", unit_scale=True) while True: chunk = await response.content.read(chunk_size) if not chunk: break if file is None: result += chunk else: file.write(chunk) if with_bar: bar.update(len(chunk)) if with_bar: bar.close() if close and file: file.close() if file is None: return result return None
async def request_route( self, route: Route, raw: bool = False, json: bool = False, error_codes: Optional[Mapping[int, BaseException]] = None, headers: Optional[Dict[str, Any]] = None, base_url: Optional[Union[str, URL]] = None, retries: int = 2, ) -> Optional[ResponseData]: url = URL(self.url if base_url is None else base_url) args = dict( method=route.method, url=url / route.path, raw=raw, json=json, error_codes=error_codes, ) args["params" if route.are_params else "data"] = route.parameters return await self.request(**args) # type: ignore async def request( self, method: str, url: Union[str, URL], data: Optional[Mapping[str, Any]] = None, params: Optional[Mapping[str, Any]] = None, raw: bool = False, json: bool = False, read: bool = True, error_codes: Optional[Mapping[int, BaseException]] = None, headers: Optional[Dict[str, Any]] = None, retries: int = 2, ) -> Optional[ResponseData]: await self.ensure_session() await self.call_before_request_hook() if retries < 0: attempt_left = -1 elif retries == 0: attempt_left = 1 else: attempt_left = retries + 1 if headers is None: headers = {} if self.use_user_agent: headers.setdefault("User-Agent", self.USER_AGENT) if self.forwarded_for: headers.setdefault("X-Forwarded-For", self.forwarded_for) lock = asyncio.Lock() error: Optional[BaseException] = None while attempt_left: try: async with lock, self.session.request( # type: ignore url=url, method=method, data=data, params=params, proxy=self.proxy, proxy_auth=self.proxy_auth, headers=headers, timeout=self.create_timeout(), ) as response: log.debug("%s %s has returned %d", method, url, response.status) if not read: return None response_data = await read_data(response, raw=raw, json=json) if 200 <= response.status < 300: # successful log.debug("%s %s has received %s", method, url, response_data) if isinstance(response_data, (bytes, str)): if error_codes and is_error_code(response_data): error_code = int(response_data) raise error_codes.get(error_code, unexpected_error_code(error_code)) return response_data if response.status >= 400: error = HTTPStatusError(response.status, response.reason) except VALID_ERRORS as valid_error: error = HTTPError(valid_error) finally: await asyncio.sleep(0) # let underlying connections close attempt_left -= 1 await self.call_after_request_hook() if error: raise error return None @staticmethod def generate_udid(id: Optional[int] = None, low: int = 1, high: int = 1_000_000_000) -> str: if id is None: id = random.randint(low, high) return f"S{id}" @staticmethod def generate_uuid() -> str: return f"{uuid.uuid4()}" @staticmethod def generate_extra_string(amount: int = 55) -> str: return "_".join(map(str, (0 for _ in range(amount)))) def get_game_version(self) -> int: return self.game_version.to_robtop_number() def get_binary_version(self) -> int: return self.binary_version.to_number() def get_gd_world(self) -> int: return int(self.gd_world) def get_secret(self, name: str) -> str: return Secret.from_name(name).value async def ping(self, url: Union[str, URL]) -> float: start = time.perf_counter() try: await self.request(GET, url, read=False) except Exception: # noqa pass end = time.perf_counter() return (end - start) * 1000 async def login(self, name: str, password: str) -> str: error_codes = { -1: LoginFailure(name=name, password=password), -12: MissingAccess(f"Account {name!r} (password {password!r}) is disabled."), } route = Route( POST, "/accounts/loginGJAccount.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), user_name=name, password=password, udid=self.generate_udid(), secret=self.get_secret("login"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def load(self, *, account_id: int, name: str, password: str) -> str: error_codes = {-11: MissingAccess("Failed to load save.")} route = Route( POST, "/accounts/syncGJAccountNew.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), user_name=name, password=password, secret=self.get_secret("login"), to_camel=True, ) base_url = await self.get_account_url(account_id, type=AccountURLType.LOAD) # type: ignore response = await self.request_route(route, error_codes=error_codes, base_url=base_url) return cast(str, response) async def save(self, data: str, *, account_id: int, name: str, password: str) -> int: error_codes = { -1: MissingAccess("Failed to save."), -4: MissingAccess("Data is too large."), -5: MissingAccess("Invalid login credentials."), -6: MissingAccess("Something went wrong."), } route = Route( POST, "/accounts/backupGJAccountNew.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), save_data=data, user_name=name, password=password, secret=self.get_secret("login"), to_camel=True, ) base_url = await self.get_account_url(account_id, type=AccountURLType.SAVE) # type: ignore response = await self.request_route(route, error_codes=error_codes, base_url=base_url) return int_or(cast(str, response), 0) async def get_account_url(self, account_id: int, type: AccountURLType) -> URL: error_codes = { -1: MissingAccess( f"Failed to find {type.name.lower()} URL for Account ID: {account_id}." ) } route = Route( POST, "/getAccountURL.php", account_id=account_id, type=type.value, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) url = URL(cast(str, response)) if url.path == ROOT: return url / DATABASE return url async def get_role_id(self, account_id: int, encoded_password: str) -> int: error_codes = {-1: MissingAccess("No role found.")} route = Route( POST, "/requestUserAccess.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), account_id=account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def update_settings( self, message_state: MessageState, friend_request_state: FriendRequestState, comment_state: CommentState, youtube: str, twitter: str, twitch: str, *, account_id: int, encoded_password: str, ) -> int: error_codes = {-1: MissingAccess("Failed to update settings.")} route = Route( POST, "/updateGJAccSettings20.php", account_id=account_id, gjp=encoded_password, secret=self.get_secret("login"), m_s=message_state.value, fr_s=friend_request_state.value, c_s=comment_state.value, yt=youtube, twitter=twitter, twitch=twitch, to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def update_profile( self, stars: int, diamonds: int, coins: int, user_coins: int, demons: int, icon_type: IconType, icon: int, color_1_id: int, color_2_id: int, has_glow: bool, cube: int, ship: int, ball: int, ufo: int, wave: int, robot: int, spider: int, death_effect: int, special: int = 0, *, account_id: int, name: str, encoded_password: str, ) -> int: error_codes = { -1: MissingAccess(f"Failed to update profile of a user by Account ID: {account_id}.") } rs = generate_rs() chk = generate_chk( values=[ account_id, user_coins, demons, stars, coins, icon_type.value, icon, diamonds, cube, ship, ball, ufo, wave, robot, int(has_glow), spider, death_effect, ], key=Key.USER_LEADERBOARD, # type: ignore salt=Salt.USER_LEADERBOARD, # type: ignore ) route = Route( POST, "/updateGJUserScore22.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), stars=stars, diamonds=diamonds, coins=coins, user_coins=user_coins, demons=demons, special=special, icon=icon, icon_type=icon_type.value, acc_icon=cube, acc_ship=ship, acc_ball=ball, acc_bird=ufo, acc_dart=wave, acc_robot=robot, acc_spider=spider, acc_explosion=death_effect, acc_glow=int(has_glow), color1=color_1_id, color2=color_2_id, user_name=name, account_id=account_id, gjp=encoded_password, seed=rs, seed2=chk, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def search_users_on_page(self, query: Union[int, str], page: int = 0) -> str: error_codes = {-1: MissingAccess(f"Can not find results for query: {query!r}.")} route = Route( POST, "/getGJUsers20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), str=query, total=0, page=page, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_user_profile( self, account_id: int, *, client_account_id: Optional[int] = None, encoded_password: Optional[str] = None, ) -> str: error_codes = { -1: MissingAccess(f"Can not find user with ID: {account_id}."), } route = Route( POST, "/getGJUserInfo20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), target_account_id=account_id, secret=self.get_secret("main"), to_camel=True, ) if client_account_id is not None and encoded_password is not None: route.update(account_id=client_account_id, gjp=encoded_password, to_camel=True) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_relationships( self, type: SimpleRelationshipType, *, account_id: int, encoded_password: str ) -> str: error_codes = { -1: MissingAccess(f"Failed to fetch {type.name.lower()} users."), -2: NothingFound("AbstractUser"), } route = Route( POST, "/getGJUserList20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), type=type.value, account_id=account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_top( self, strategy: LeaderboardStrategy, amount: int = 100, *, account_id: Optional[int] = None, encoded_password: Optional[str] = None, ) -> str: error_codes = { -1: MissingAccess(f"Failed to fetch leaderboard for strategy: {strategy!r}.") } route = Route( POST, "/getGJScores20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), type=strategy.name.lower(), count=amount, secret=self.get_secret("main"), to_camel=True, ) if strategy.requires_login(): if not account_id or not encoded_password: raise LoginRequired(f"{strategy!r} strategy requires logged in Client.") route.update(account_id=account_id, gjp=encoded_password, to_camel=True) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def search_levels_on_page( self, query: Optional[Union[int, str, Iterable[Any]]] = None, page: int = 0, filters: Optional[Filters] = None, user_id: Optional[int] = None, gauntlet: Optional[int] = None, *, client_account_id: Optional[int] = None, client_user_id: Optional[int] = None, encoded_password: Optional[str] = None, ) -> str: error_codes = {-1: NothingFound("Level")} if filters is None: filters = Filters() if query is None: query = "" if not isinstance(query, str) and isinstance(query, Iterable): query = ",".join(query) route = Route( POST, "/getGJLevels21.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), secret=self.get_secret("main"), to_camel=True, ) if gauntlet is not None: route.update(gauntlet=gauntlet) else: route.update(filters.to_parameters(), str=query, page=page, total=0, to_camel=True) if filters.strategy is SearchStrategy.BY_USER: if user_id is None: if ( client_account_id is None or client_user_id is None or encoded_password is None ): raise MissingAccess( "Can not use by-user strategy with no User ID or Client." ) route.update( account_id=client_account_id, str=client_user_id, gjp=encoded_password, local=1, ) else: route.update(str=user_id) elif filters.strategy is SearchStrategy.FRIENDS: if client_account_id is None or encoded_password is None: raise MissingAccess("Friends strategy requires logged in Client.") route.update(account_id=client_account_id, gjp=encoded_password) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_timely_info(self, weekly: bool) -> str: error_codes = {-1: MissingAccess(f"Can not find {'weekly' if weekly else 'daily'} info.")} route = Route( POST, "/getGJDailyLevel.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), weekly=int(weekly), secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def download_level( self, level_id: int, *, account_id: Optional[int] = None, encoded_password: Optional[str] = None, ) -> str: error_codes = {-1: MissingAccess(f"Can not download level with ID: {level_id}.")} route = Route( POST, "/downloadGJLevel22.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), level_id=level_id, secret=self.get_secret("main"), to_camel=True, ) if account_id is not None and encoded_password is not None: inc = 1 rs = generate_rs() udid = self.generate_udid() uuid = self.generate_uuid() chk = generate_chk( values=[level_id, inc, rs, account_id, udid, uuid], key=Key.LEVEL, # type: ignore salt=Salt.LEVEL, # type: ignore ) route.update( account_id=account_id, gjp=encoded_password, udid=udid, uuid=uuid, rs=rs, chk=chk, to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def report_level(self, level_id: int) -> int: error_codes = {-1: MissingAccess(f"Failed to report level with ID: {level_id}.")} route = Route(POST, "/reportGJLevel.php", level_id=level_id, to_camel=True) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def delete_level(self, level_id: int, *, account_id: int, encoded_password: str) -> int: error_codes = {-1: MissingAccess(f"Failed to delete level with ID: {level_id}.")} route = Route( POST, "/deleteGJLevelUser20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), level_id=level_id, account_id=account_id, gjp=encoded_password, secret=self.get_secret("level"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def update_level_description( self, level_id: int, description: str, *, account_id: int, encoded_password: str ) -> int: error_codes = {-1: MissingAccess(f"Can not update description of the level: {level_id}.")} route = Route( POST, "/updateGJDesc20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), level_id=level_id, account_id=account_id, gjp=encoded_password, level_desc=encode_base64_str(description), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def upload_level( self, name: str = "Unnamed", id: int = 0, version: int = 1, length: LevelLength = LevelLength.TINY, # type: ignore track_id: int = 0, description: str = "", song_id: int = 0, is_auto: bool = False, original: int = 0, two_player: bool = False, objects: int = 0, coins: int = 0, stars: int = 0, unlisted: bool = False, friends_only: bool = False, low_detail_mode: bool = False, password: Optional[Union[int, str]] = None, copyable: bool = False, recording: Iterable[RecordingEntry] = (), editor_seconds: int = 0, copies_seconds: int = 0, data: str = "", *, account_id: int, account_name: str, encoded_password: str, ) -> int: error_codes = {-1: MissingAccess("Failed to upload level.")} if is_level_probably_decoded(data): if not objects: objects = object_count(data) data = zip_level_str(data) extra_string = self.generate_extra_string() description = encode_base64_str(description) level_seed = generate_level_seed(data) seed = generate_rs() other_seed = generate_chk( values=[level_seed], key=Key.LEVEL, salt=Salt.LEVEL # type: ignore ) level_password = Password(password, copyable) just_unlisted = 0 listed_for_friends = 0 if friends_only: just_unlisted = 1 listed_for_friends = 1 elif unlisted: just_unlisted = 1 recording_str = zip_level_str(Recording.collect_string(recording)) route = Route( POST, "/uploadGJLevel21.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), level_id=id, level_name=name, level_desc=description, level_version=version, level_length=length.value, audio_track=track_id, song_id=song_id, auto=int(is_auto), original=original, two_player=int(two_player), objects=objects, coins=coins, requested_stars=stars, unlisted=just_unlisted, unlisted2=listed_for_friends, ldm=int(low_detail_mode), password=level_password.to_robtop_number(), level_string=data, extra_string=extra_string, level_info=recording_str, seed=seed, seed2=other_seed, wt=editor_seconds, wt2=copies_seconds, account_id=account_id, user_name=account_name, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def rate_level( self, level_id: int, stars: int, *, account_id: int, encoded_password: str ) -> int: error_codes = {-1: MissingAccess(f"Failed to rate level by ID: {level_id}.")} udid = self.generate_udid() uuid = self.generate_uuid() rs = generate_rs() chk = generate_chk( values=[level_id, stars, rs, account_id, udid, uuid], key=Key.LIKE_RATE, # type: ignore salt=Salt.LIKE_RATE, # type: ignore ) route = Route( POST, "/rateGJStars211.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), level_id=level_id, stars=stars, udid=udid, uuid=uuid, rs=rs, chk=chk, account_id=account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def rate_demon( self, level_id: int, rating: DemonDifficulty, as_mod: bool = False, *, account_id: int, encoded_password: str, ) -> int: error_codes = { -1: MissingAccess(f"Failed to demon-rate level by ID: {level_id}."), -2: MissingAccess( f"Missing moderator permissions to demon-rate level by ID: {level_id}." ), } route = Route( POST, "/rateGJDemon21.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), level_id=level_id, rating=rating.value, mode=int(as_mod), account_id=account_id, gjp=encoded_password, secret=self.get_secret("mod"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def send_level( self, level_id: int, stars: int, feature: bool, *, account_id: int, encoded_password: str, ) -> int: error_codes = { -1: MissingAccess(f"Failed to send a level by ID: {level_id}."), -2: MissingAccess(f"Missing moderator permissions to send level by ID: {level_id}."), } route = Route( POST, "/suggestGJStars20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), level_id=level_id, stars=stars, feature=int(feature), account_id=account_id, gjp=encoded_password, secret=self.get_secret("mod"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def get_level_top( self, level_id: int, strategy: LevelLeaderboardStrategy, *, account_id: int, encoded_password: str, ) -> str: # timely_type: TimelyType = TimelyType.NOT_TIMELY, # timely_id: int = 0, # played: bool = False, # percent: int = 0, # jumps: int = 0, # attempts: int = 0, # seconds: int = 0, # coins: int = 0, error_codes = { -1: MissingAccess(f"Failed to get leaderboard of the level by ID: {level_id}.") } # seed = generate_leaderboard_seed(jumps, percentage, seconds, played) # if timely_type is TimelyType.WEEKLY: # timely_id += 100_000 # chk = generate_chk( # values=[ # account_id, # level_id, # percentage, # seconds, # jumps, # attempts, # percent, # 100 - percent, # 1, # coins, # timely_id, # rs, # ], # key=Key.LEVEL_LEADERBOARD, # type: ignore # salt=Salt.LEVEL_LEADERBOARD, # type: ignore # ) route = Route( POST, "/getGJLevelScores211.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), level_id=level_id, account_id=account_id, gjp=encoded_password, type=strategy.value, secret=self.get_secret("main"), to_camel=True, ) # route.update( # percent=percent, # s1=attempts + 8354, # s2=jumps + 3991, # s3=seconds + 4085, # s4=seed, # s5=random.randint(100, 10_000), # not sure about this one # s6="", # this is progress string, we will add that later # s7=rs, # s8=attempts, # s9=coins + 5819, # s10=timely_id, # chk=chk, # ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def block_or_unblock( self, account_id: int, unblock: bool, *, client_account_id: int, encoded_password: str, ) -> int: if unblock: endpoint = "/unblockGJUser20.php" string = "unblock" else: endpoint = "/blockGJUser20.php" string = "block" error_codes = { -1: MissingAccess(f"Failed to {string} the user by Account ID: {account_id}.") } route = Route( POST, endpoint, game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), target_account_id=account_id, account_id=client_account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def unfriend_user( self, account_id: int, *, client_account_id: int, encoded_password: str ) -> int: error_codes = { -1: MissingAccess(f"Failed to unfriend the user by account ID: {account_id}.") } route = Route( POST, "/removeGJFriend20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), target_account_id=account_id, account_id=client_account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def send_message( self, account_id: int, subject: Optional[str] = None, content: Optional[str] = None, *, client_account_id: int, encoded_password: str, ) -> int: error_codes = { -1: MissingAccess(f"Failed to send a message to the user by account ID: {account_id}.") } if subject is None: subject = "" if content is None: content = "" route = Route( POST, "/uploadGJMessage20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), subject=encode_base64_str(subject), body=encode_robtop_str(content, Key.MESSAGE), # type: ignore to_account_id=account_id, account_id=client_account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def download_message( self, message_id: int, type: MessageType, *, account_id: int, encoded_password: str, ) -> str: error_codes = {-1: MissingAccess(f"Failed to read a message by ID: {message_id}.")} route = Route( POST, "/downloadGJMessage20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), account_id=account_id, message_id=message_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) if type is MessageType.OUTGOING: route.update(is_sender=1, to_camel=True) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def delete_message( self, message_id: int, type: MessageType, *, account_id: int, encoded_password: str, ) -> int: error_codes = {-1: MissingAccess(f"Failed to delete a message by ID: {message_id}.")} route = Route( POST, "/deleteGJMessages20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), account_id=account_id, message_id=message_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) if type is MessageType.OUTGOING: route.update(is_sender=1, to_camel=True) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def get_messages_on_page( self, type: MessageType, page: int, *, account_id: int, encoded_password: str ) -> str: error_codes = { -1: MissingAccess(f"Failed to get messages on page {page}."), -2: NothingFound("Message"), } route = Route( POST, "/getGJMessages20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), page=page, total=0, account_id=account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) if type is MessageType.OUTGOING: route.update(get_sent=1, to_camel=True) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def send_friend_request( self, account_id: int, message: Optional[str] = None, *, client_account_id: int, encoded_password: str, ) -> int: error_codes = { -1: MissingAccess(f"Failed to send a friend request to the user by ID: {account_id}.") } if message is None: message = "" route = Route( POST, "/uploadFriendRequest20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), comment=encode_base64_str(message), to_account_id=account_id, account_id=client_account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) if not response: return 1 return int_or(cast(str, response), 0) async def delete_friend_request( self, account_id: int, type: FriendRequestType, *, client_account_id: int, encoded_password: str, ) -> int: error_codes = { -1: MissingAccess( f"Failed to delete a friend request of the user with ID: {account_id}." ) } route = Route( POST, "/deleteGJFriendRequests20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), target_account_id=account_id, account_id=client_account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) if type is FriendRequestType.OUTGOING: route.update(is_sender=1, to_camel=True) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def accept_friend_request( self, account_id: int, request_id: int, type: FriendRequestType, *, client_account_id: int, encoded_password: str, ) -> int: error_codes = { -1: MissingAccess( f"Failed to accept a friend request of the user with ID: {account_id}." ) } route = Route( POST, "/acceptGJFriendRequest20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), target_account_id=account_id, request_id=request_id, account_id=client_account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) if type is FriendRequestType.OUTGOING: route.update(is_sender=1, to_camel=True) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def read_friend_request( self, request_id: int, *, account_id: int, encoded_password: str ) -> int: error_codes = {-1: MissingAccess(f"Failed to read a friend request with ID: {request_id}.")} route = Route( POST, "/readGJFriendRequest20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), request_id=request_id, account_id=account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def get_friend_requests_on_page( self, type: FriendRequestType, page: int, *, account_id: int, encoded_password: str, ) -> str: error_codes = { -1: MissingAccess(f"Failed to get friend requests on page {page}."), -2: NothingFound("FriendRequest"), } route = Route( POST, "/getGJFriendRequests20.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), page=page, total=0, account_id=account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) if type is FriendRequestType.OUTGOING: route.update(get_sent=1, to_camel=True) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def like_or_dislike( self, type: LikeType, item_id: int, special_id: int, dislike: bool = False, *, account_id: int, encoded_password: str, ) -> int: error_codes = { -1: MissingAccess(f"Failed to like an item by ID: {item_id} (special {special_id}).") } type_id = type.value like = not dislike udid = self.generate_udid() uuid = self.generate_uuid() rs = generate_rs() int_like = int(like) chk = generate_chk( values=[special_id, item_id, int_like, type_id, rs, account_id, udid, uuid], key=Key.LIKE_RATE, # type: ignore salt=Salt.LIKE_RATE, # type: ignore ) route = Route( POST, "/likeGJItem211.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), item_id=item_id, type=type_id, special=special_id, like=int_like, account_id=account_id, gjp=encoded_password, udid=udid, uuid=uuid, rs=rs, chk=chk, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def post_comment( self, type: CommentType, content: Optional[str] = None, level_id: int = 0, percent: int = 0, *, account_id: int, account_name: str, encoded_password: str, ) -> int: # XXX: We might want to use two separate functions in case of API update. type_name = type.name.casefold() error_codes = { -1: MissingAccess(f"Failed to post a {type_name} comment."), -10: CommentBanned(timeout=None), } if content is None: content = "" content = encode_base64_str(content) chk = generate_chk( values=[account_name, content, level_id, percent, type.value], key=Key.COMMENT, # type: ignore salt=Salt.COMMENT, # type: ignore ) if type is CommentType.LEVEL: endpoint = "/uploadGJComment21.php" else: endpoint = "/uploadGJAccComment20.php" route = Route( POST, endpoint, game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), comment=content, level_id=level_id, c_type=type.value, percent=percent, account_id=account_id, user_name=account_name, gjp=encoded_password, chk=chk, secret=self.get_secret("main"), to_camel=True, ) response = cast(str, await self.request_route(route, error_codes=error_codes)) if CommentBannedModel.maybe_in(response): ban = CommentBannedModel.from_string(response) raise CommentBanned(timeout=ban.timeout, reason=ban.reason) return int_or(response, 0) async def delete_comment( self, comment_id: int, type: CommentType, level_id: int = 0, *, account_id: int, encoded_password: str, ) -> int: error_codes = {-1: MissingAccess(f"Failed to delete a comment by ID: {comment_id}.")} if type is CommentType.LEVEL: endpoint = "/deleteGJComment20.php" else: endpoint = "/deleteGJAccComment20.php" route = Route( POST, endpoint, game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), comment_id=comment_id, c_type=type.value, level_id=level_id, account_id=account_id, gjp=encoded_password, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return int_or(cast(str, response), 0) async def get_user_comments_on_page( self, account_id: int, user_id: int, type: CommentType, page: int = 0, *, strategy: CommentStrategy, ) -> str: error_codes = { -1: MissingAccess(f"Failed to get comment for user by Account ID: {account_id}.") } if type is CommentType.LEVEL: endpoint = "/getGJCommentHistory.php" else: endpoint = "/getGJAccountComments20.php" route = Route( POST, endpoint, game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), page=page, total=0, mode=strategy.value, secret=self.get_secret("main"), to_camel=True, ) if type is CommentType.LEVEL: route.update(user_id=user_id, to_camel=True) else: route.update(account_id=account_id, to_camel=True) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_level_comments_on_page( self, level_id: int, amount: int, page: int = 0, *, strategy: CommentStrategy, ) -> str: error_codes = { -1: MissingAccess(f"Failed to get comments of a level by ID: {level_id}."), -2: NothingFound("Comment"), } if amount < 0: amount += COMMENT_TO_ADD route = Route( POST, "/getGJComments21.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), level_id=level_id, page=page, total=0, count=amount, mode=strategy.value, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_gauntlets(self) -> str: error_codes = {-1: MissingAccess("Failed to get gauntlets.")} route = Route( POST, "/getGJGauntlets21.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_map_packs_on_page(self, page: int = 0) -> str: error_codes = {-1: MissingAccess("Failed to get map packs.")} route = Route( POST, "/getGJMapPacks21.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), page=page, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_quests(self, account_id: int, encoded_password: str) -> str: error_codes = {-1: MissingAccess("Failed to get quests.")} udid = self.generate_udid() uuid = self.generate_uuid() chk = generate_rs_and_encode_number(length=5, key=Key.QUESTS) # type: ignore route = Route( POST, "/getGJChallenges.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), account_id=account_id, gjp=encoded_password, udid=udid, uuid=uuid, chk=chk, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_chests( self, reward_type: RewardType, chest_1_count: int = 0, chest_2_count: int = 0, *, account_id: int, encoded_password: str, ) -> str: error_codes = {-1: MissingAccess("Failed to get chests.")} udid = self.generate_udid() uuid = self.generate_uuid() chk = generate_rs_and_encode_number(length=5, key=Key.CHESTS) # type: ignore route = Route( POST, "/getGJRewards.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), reward_type=reward_type.value, account_id=account_id, gjp=encoded_password, udid=udid, uuid=uuid, chk=chk, r1=chest_1_count, r2=chest_2_count, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_featured_artists_on_page(self, page: int = 0) -> str: error_codes = {-1: MissingAccess("Failed to get featured artists.")} route = Route( POST, "/getGJTopArtists.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), page=page, total=0, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_song(self, song_id: int) -> str: error_codes = { -1: MissingAccess(f"Can not get song by ID: {song_id}."), -2: SongRestricted(song_id), } route = Route( POST, "/getGJSongInfo.php", game_version=self.get_game_version(), binary_version=self.get_binary_version(), gdw=self.get_gd_world(), song_id=song_id, secret=self.get_secret("main"), to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def get_ng_song(self, song_id: int) -> str: response = await self.request(GET, NEWGROUNDS_SONG_LISTEN.format(song_id=song_id)) return cast(str, response).replace("\\", "") async def get_artist_info(self, song_id: int) -> str: error_codes = {-1: MissingAccess(f"Failed to fetch artist info for ID: {song_id}")} route = Route( GET, "/testSong.php", song_id=song_id, are_params=True, to_camel=True, ) response = await self.request_route(route, error_codes=error_codes) return cast(str, response) async def search_ng_songs_on_page(self, query: str, page: int = 0) -> str: response = await self.request( GET, NEWGROUNDS_SEARCH.format(type="audio"), params=dict(terms=query, page=page + 1), ) return cast(str, response) async def search_ng_users_on_page(self, query: str, page: int = 0) -> str: response = await self.request( GET, NEWGROUNDS_SEARCH.format(type="users"), params=dict(terms=query, page=page + 1), ) return cast(str, response) async def get_ng_user_songs_on_page(self, name: str, page: int = 0) -> Mapping[str, Any]: response = await self.request( GET, NEWGROUNDS_SONG_PAGE.format(name=name, page=page + 1), json=True, headers={"X-Requested-With": XML_HTTP_REQUEST}, ) return cast(Mapping[str, Any], response)
class HTTPClientContextManager: def __init__(self, http_client: HTTPClient, **attrs) -> None: self.attrs = attrs self.saved_attrs: Dict[str, Any] = {} self.http_client = http_client def __repr__(self) -> str: info = {"http_client": self.http_client} info.update(self.attrs) return make_repr(self, info) def apply(self) -> None: http_client = self.http_client attrs = self.attrs saved_attrs = self.saved_attrs for attr, value in attrs.items(): saved_attrs[attr] = getattr(http_client, attr, None) # save attribute value setattr(http_client, attr, value) # update attribute value def discard(self) -> None: http_client = self.http_client saved_attrs = self.saved_attrs for saved_attr, saved_value in saved_attrs.items(): setattr(http_client, saved_attr, saved_value) # restore old attribute values def __enter__(self) -> HTTPClient: self.apply() return self.http_client def __exit__( self, error_type: Type[BaseException], error: BaseException, traceback: types.TracebackType ) -> None: self.discard()