import asyncio
import json
import string
import time # for perf_counter in ping
from itertools import chain
from yarl import URL
from gd.typing import (
Any,
Client,
Dict,
Iterable,
List,
Optional,
Sequence,
Tuple,
Type,
Union,
)
from gd.errors import (
MissingAccess,
SongRestrictedForUsage,
NothingFound,
LoginFailure,
)
from gd.utils.converter import Converter
from gd.utils.decorators import check_logged_obj
from gd.utils.enums import (
CommentPolicyType,
CommentStrategy,
CommentType,
DemonDifficulty,
FriendRequestPolicyType,
LeaderboardStrategy,
LevelLeaderboardStrategy,
LevelLength,
MessageOrRequestType,
MessagePolicyType,
RewardType,
ShardType,
QuestType,
SearchStrategy,
)
from gd.utils.filters import Filters
from gd.utils.http_request import HTTPClient
from gd.utils.indexer import Index
from gd.utils.ng_parser import (
find_song_info,
search_song_data,
extract_info_from_endpoint,
extract_user_songs,
extract_users,
)
from gd.utils.params import Parameters as Params
from gd.utils.parser import ExtDict, Parser
from gd.utils.routes import Route
from gd.utils.text_tools import make_repr
from gd.utils.crypto.coders import Coder
from gd import api
REWARD_CHALLENGE_CHK_LENGTH = 13
REWARD_CHALLENGE_SLICE_LENGTH = 5
QUEST_AMOUNT = 3
CHEST_AMOUNT = 2
CHEST_INNER_PARTS = 3
[docs]class Session:
"""Implements all requests-related functionality.
No docstrings here yet...
"""
def __init__(self, **http_args) -> None:
self.http = HTTPClient(**http_args)
def __repr__(self) -> str:
info = {"http": self.http}
return make_repr(self, info)
async def ping_server(self, link: str) -> float:
start = time.perf_counter()
await self.http.normal_request(link)
end = time.perf_counter()
return round((end - start) * 1000, 2)
async def get_song(self, song_id: int = 0) -> ExtDict:
payload = Params().create_new().put_definer("song", song_id).finish()
codes = {
-1: MissingAccess(f"No songs were found with ID: {song_id}."),
-2: SongRestrictedForUsage(song_id),
}
resp = await self.http.request(Route.GET_SONG_INFO, payload, error_codes=codes)
return Parser().with_split("~|~").should_map().parse(resp)
async def test_song(self, song_id: int = 0) -> ExtDict:
codes = {-1: MissingAccess(f"Failed to fetch artist info for ID: {song_id}")}
payload = Params().create_new("web").put_definer("song", song_id).close()
resp = await self.http.request(
Route.TEST_SONG, params=payload, method="get", error_codes=codes
)
data = ExtDict(id=song_id)
try:
data.update(extract_info_from_endpoint(resp))
except ValueError:
raise MissingAccess(f"Failed to load data. Response: {resp!r}.") from None
return data
async def get_ng_song(self, song_id: int = 0) -> ExtDict:
# just like get_song(), but gets anything available on NG.
link = Route.NEWGROUNDS_SONG_LISTEN + str(song_id)
content = await self.http.normal_request(link)
html = content.decode().replace("\\", "")
try:
info = find_song_info(html)
except ValueError:
raise MissingAccess(f"Song was not found by ID: {song_id}") from None
return ExtDict(
name=info.name,
author=info.author,
id=song_id,
size=round(info.size / 1024 / 1024, 2),
links=dict(normal=link, download=info.link),
custom=True,
)
async def search_page_songs(self, query: str, page: int = 0) -> List[ExtDict]:
payload = {"terms": query, "page": page + 1}
data = await self.http.normal_request(
Route.NEWGROUNDS_SEARCH.format(type="audio"), params=payload
)
return search_song_data(data.decode())
async def search_songs(self, query: str, pages: Iterable[int]) -> List[ExtDict]:
to_run = [self.search_page_songs(query=query, page=page) for page in pages]
return await self.run_many(to_run)
async def search_page_users(self, query: str, page: int = 0) -> List[ExtDict]:
payload = {"terms": query, "page": page + 1}
data = await self.http.normal_request(
Route.NEWGROUNDS_SEARCH.format(type="users"), params=payload
)
return extract_users(data.decode())
async def search_users(self, query: str, pages: Iterable[int]) -> List[ExtDict]:
to_run = [self.search_page_users(query=query, page=page) for page in pages]
return await self.run_many(to_run)
async def get_page_user_songs(self, user_name: str, page: int = 0) -> List[ExtDict]:
link = URL("https://%s.newgrounds.com/" % user_name) / f"audio/page/{page + 1}"
data = await self.http.normal_request(link, headers={"X-Requested-With": "XMLHttpRequest"})
info = extract_user_songs(json.loads(data.decode(errors="replace")))
for part in info:
part.update(author=user_name)
return info
async def get_user_songs(self, user_name: str, pages: Iterable[int]) -> List[ExtDict]:
to_run = [self.get_page_user_songs(user_name=user_name, page=page) for page in pages]
return await self.run_many(to_run)
async def get_user(self, account_id: int = 0, return_only_stats: bool = False) -> ExtDict:
payload = Params().create_new().put_definer("user", account_id).finish()
codes = {-1: MissingAccess(f"No users were found with ID: {account_id}.")}
resp = await self.http.request(Route.GET_USER_INFO, payload, error_codes=codes)
mapped = Parser().with_split(":").should_map().parse(resp)
if return_only_stats:
return mapped
another = (
Params()
.create_new()
.put_definer("search", mapped.getcast(Index.USER_PLAYER_ID, 0, int))
.put_total(0)
.put_page(0)
.finish()
)
some_resp = await self.http.request(Route.USER_SEARCH, another)
new_resp = (
Parser().split("#").take(0).check_empty().split(":").should_map().parse(some_resp)
)
if new_resp is None:
raise codes.get(-1)
mapped.update(
{k: new_resp.get(k) for k in (Index.USER_NAME, Index.USER_ICON, Index.USER_ICON_TYPE)}
)
return mapped
async def search_user(self, query: Union[int, str], return_abstract: bool = False) -> ExtDict:
payload = (
Params().create_new().put_definer("search", query).put_total(0).put_page(0).finish()
)
codes = {-1: MissingAccess(f"Searching for {query!r} failed.")}
resp = await self.http.request(Route.USER_SEARCH, payload, error_codes=codes)
mapped = Parser().split("#").take(0).check_empty().split(":").should_map().parse(resp)
if mapped is None:
raise codes.get(-1)
account_id = mapped.getcast(Index.USER_ACCOUNT_ID, 0, int)
if return_abstract or not account_id:
return mapped
# ok; if we should not return abstract, let's find all other parameters
payload = Params().create_new().put_definer("user", account_id).finish()
resp = await self.http.request(Route.GET_USER_INFO, payload, error_codes=codes)
mapped.update(Parser().with_split(":").should_map().parse(resp))
return mapped
async def get_level_info(self, level_id: int = 0) -> Tuple[ExtDict, ExtDict, ExtDict]:
# level data, creator, song
assert level_id >= -2, "Invalid Level ID provided."
if level_id < 0:
type, number, cooldown = await self.get_timely_info(level_id)
else:
type, number, cooldown = 0, -1, -1
ext = {"101": type, "102": number, "103": cooldown}
codes = {-1: MissingAccess(f"Failed to get a level. Given ID: {level_id}")}
payload = Params().create_new().put_definer("levelid", level_id).finish()
resp = await self.http.request(Route.DOWNLOAD_LEVEL, payload, error_codes=codes)
level_data = Parser().split("#").take(0).split(":").add_ext(ext).should_map().parse(resp)
real_id = level_data.getcast(Index.LEVEL_ID, 0, int)
payload = (
Params()
.create_new()
.put_definer("search", real_id)
.put_filters(Filters.setup_empty())
.finish()
)
resp = await self.http.request(Route.LEVEL_SEARCH, payload, error_codes=codes)
if not resp or resp.count("#") < 2:
raise codes.get(-1)
data = resp.split("#")
# getting song
song_data = data[2]
if not song_data:
song = Converter.to_normal_song(level_data.getcast(Index.LEVEL_AUDIO_TRACK, 0, int))
else:
song = Parser().with_split("~|~").should_map().parse(song_data)
# getting creator
creator_data = data[1]
if not creator_data:
id, name, account_id = (0, "unknown", 0)
else:
id, name, account_id = creator_data.split(":")
creator = ExtDict(id=id, name=name, account_id=account_id)
return level_data, creator, song
async def get_timely_info(self, type_id: int = -1) -> Tuple[int, int, int]:
# Daily: -1, Weekly: -2
weekly = ~type_id
payload = Params().create_new().put_weekly(weekly).finish()
codes = {-1: MissingAccess(f"Failed to fetch a {type!r} level.")}
resp = await self.http.request(Route.GET_TIMELY, payload, error_codes=codes)
try:
number, cooldown, *_ = map(int, resp.split("|"))
except ValueError: # unpacking failed or something else
raise MissingAccess(
"Failed to fetch a timely level. Most likely it is being refreshed."
) from None
number %= 100000
weekly += 1
return (weekly, number, cooldown)
async def upload_level(
self,
data: str,
name: str,
level_id: int,
version: int,
length: LevelLength,
audio_track: int,
desc: str,
song_id: int,
is_auto: bool,
original: int,
two_player: bool,
objects: int,
coins: int,
stars: int,
unlisted: bool,
friends_only: bool,
ldm: bool,
password: Optional[Union[int, str]],
copyable: bool,
*,
client: Client,
) -> int:
data = Coder.zip(data)
extra_string = "_".join(map(str, (0 for _ in range(55))))
desc = Coder.do_base64(desc)
upload_seed = Coder.gen_level_upload_seed(data)
seed2 = Coder.gen_chk(type="level", values=[upload_seed])
seed = Coder.gen_rs()
pwd = 0
if copyable and password is None:
pwd = 1
check, add = str(password), 1000000
if check.isdigit() and int(check) < add:
pwd = add + int(password)
if friends_only:
unlisted, unlisted2 = (1, 1)
elif unlisted:
unlisted, unlisted2 = (1, 0)
else:
unlisted, unlisted2 = (0, 0)
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_definer("levelid", level_id)
.put_definer("song", song_id)
.put_seed(seed)
.put_seed(seed2, suffix=2)
.put_seed(0, prefix="wt")
.put_seed(0, prefix="wt", suffix=2)
.put_password(client.encodedpass)
.put_username(client.name)
.finish()
)
options = {
"level_name": name,
"level_desc": desc,
"level_version": version,
"level_length": length.value,
"audio_track": audio_track,
"auto": int(is_auto),
"original": int(original),
"two_player": int(two_player),
"objects": objects,
"coins": coins,
"requested_stars": stars,
"unlisted": unlisted,
"unlisted2": unlisted2,
"ldm": int(ldm),
"password": pwd,
"level_string": data,
"extra_string": extra_string,
"level_info": "H4sIAAAAAAAAC_NIrVQoyUgtStVRCMpPSi0qUbDStwYAsgpl1RUAAAA=",
}
payload_cased = {
Converter.snake_to_camel(key): str(value) for key, value in options.items()
}
payload.update(payload_cased)
level_id = await self.http.request(Route.UPLOAD_LEVEL, payload)
if level_id == -1:
raise MissingAccess("Failed to upload a level.")
return level_id
async def get_user_list(
self, type: int = 0, *, exclude: Tuple[Type[BaseException]] = (), client: Client
) -> List[ExtDict]:
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_type(type)
.finish()
)
codes = {
-1: MissingAccess("Failed to fetch a user list."),
-2: NothingFound("gd.AbstractUser"),
}
resp = await self.http.request(
Route.GET_USER_LIST, payload, error_codes=codes, exclude=exclude
)
if resp is None:
return []
resp, parser = resp.split("|"), Parser().with_split(":").should_map()
return list(map(parser.parse, resp))
async def get_leaderboard(
self, level_id: int, strategy: LevelLeaderboardStrategy, *, client: Client
) -> List[ExtDict]:
# timely_type: TimelyType, played: bool = False, timely_index: int = 0, percentage: int = 0,
# jumps: int = 0, attempts: int = 0, seconds: int = 0, coins: int = 0
# rs = Coder.gen_rs()
# seed = Coder.gen_level_lb_seed(jumps, percentage, seconds, played)
# if str(timely_type) == 'weekly':
# timely_index += 100000
# values = [
# client.account_id, level_id, percentage, seconds, jumps, attempts,
# percentage, 100 - percentage, 1, coins, timely_index, rs
# ]
# chk = Coder.gen_chk(type='levelscore', values=values)
params = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_definer("levelid", level_id)
.put_password(client.encodedpass)
.put_type(strategy.value)
)
# params.put_percent(percentage).put_chk(chk)
# for index, value in enumerate((
# attempts + 8354, jumps + 3991, seconds + 4085, seed, random.randint(100, 10000),
# "", rs, attempts, coins + 5819, timely_index
# ), 1):
# params.put_seed(value, prefix='s', suffix=index)
payload = params.finish()
codes = {-1: MissingAccess(f"Failed to get leaderboard of the level by ID: {level_id!r}.")}
resp = await self.http.request(Route.GET_LEVEL_SCORES, payload, error_codes=codes)
if not resp:
return []
resp, parser = (
resp.split("|"),
Parser().with_split(":").add_ext({"101": level_id}).should_map(),
)
return list(map(parser.parse, filter(is_not_empty, resp)))
async def get_top(
self, strategy: LeaderboardStrategy, count: int, *, client: Client
) -> List[ExtDict]:
needs_login = strategy.value in (1, 2)
# special case: map 'players' -> 'top'
strategy = strategy.name.lower() if strategy.value else "top"
params = Params().create_new().put_type(strategy).put_count(count)
codes = {-1: MissingAccess(f"Failed to fetch leaderboard for strategy: {strategy!r}.")}
if needs_login:
check_logged_obj(client, "get_top")
params.put_definer("accountid", client.account_id).put_password(client.encodedpass)
payload = params.finish()
resp = await self.http.request(Route.GET_USER_TOP, payload, error_codes=codes)
resp, parser = resp.split("|"), Parser().with_split(":").should_map()
return list(map(parser.parse, filter(is_not_empty, resp)))
async def login(self, user: str, password: str) -> Tuple[int, int]:
# account_id, id
payload = (
Params().create_new().put_login_definer(username=user, password=password).finish_login()
)
codes = {
-1: LoginFailure(login=user, password=password),
-12: MissingAccess(f"Account {user!r} (password {password!r}) is disabled."),
}
resp = await self.http.request(Route.LOGIN, payload, error_codes=codes)
account_id, id, *junk = resp.split(",")
return int(account_id), int(id)
async def load_save(self, client: Client) -> Optional[api.Database]:
link = Route.GD_URL
payload = (
Params()
.create_new()
.put_username(client.name)
.put_definer("password", client.password)
.finish_login()
)
codes = {-11: MissingAccess(f"Failed to load data for client: {client!r}.")}
resp = await self.http.request(
Route.LOAD_DATA, payload, error_codes=codes, custom_base=link
)
try:
main, levels, *_ = resp.split(";")
db = await api.save.from_string_async(main, levels, xor=False, follow_os=False)
return db
except Exception:
return
async def do_save(self, data: str, client: Client) -> None:
link = Route.GD_URL
codes = {
-4: MissingAccess("Data is too large."),
-5: MissingAccess("Invalid login credentials."),
-6: MissingAccess("Something wrong happened."),
}
payload = (
Params()
.create_new()
.put_username(client.name)
.put_definer("password", client.password)
.put_save_data(data)
.finish_login()
)
resp = await self.http.request(
Route.SAVE_DATA, payload, custom_base=link, error_codes=codes
)
if resp != 1:
raise MissingAccess(f"Failed to do backup for client: {client!r}")
async def search_levels_on_page(
self,
page: int = 0,
query: str = "",
filters: Optional[Filters] = None,
user_id: Optional[int] = None,
gauntlet: Optional[int] = None,
*,
exclude: Tuple[Type[BaseException]] = (),
client: Client,
) -> Tuple[List[ExtDict], List[ExtDict], List[ExtDict]]:
# levels, creators, songs
if filters is None:
filters = Filters.setup_empty()
codes = {-1: MissingAccess("No levels were found.")}
params = Params().create_new()
if gauntlet is not None:
params.put_definer("gauntlet", gauntlet)
else:
params.put_definer("search", query).put_page(page).put_total(0).put_filters(filters)
if filters.strategy == SearchStrategy.BY_USER:
if user_id is None:
check_logged_obj(client, "search_levels_on_page(...)")
user_id = client.id
params.put_definer("accountid", client.account_id).put_password(
client.encodedpass
)
params.put_local(1)
params.put_definer("search", user_id) # override the 'str' parameter in request
elif filters.strategy == SearchStrategy.FRIENDS:
check_logged_obj(client, "search_levels_on_page(..., client=client)")
params.put_definer("accountid", client.account_id).put_password(client.encodedpass)
payload = params.finish()
resp = await self.http.request(
Route.LEVEL_SEARCH, payload, exclude=exclude, error_codes=codes
)
if not resp:
return [], [], []
resp, parser = resp.split("#"), Parser().with_split("~|~").should_map()
try:
lvdata, cdata, sdata = resp[:3]
except ValueError:
return [], [], []
songs = list(map(parser.parse, filter(is_not_empty, sdata.split("~:~"))))
creators = [
ExtDict(zip(("id", "name", "account_id"), creator.split(":")))
for creator in filter(is_not_empty, cdata.split("|"))
]
parser.with_split(":").add_ext({"101": 0, "102": -1, "103": -1})
levels = list(map(parser.parse, filter(is_not_empty, lvdata.split("|"))))
return levels, creators, songs
async def search_levels(
self,
query: str = "",
filters: Optional[Filters] = None,
user_id: Optional[int] = None,
pages: Optional[Sequence[int]] = None,
gauntlet: Optional[int] = None,
*,
client: Client,
) -> List[ExtDict]:
to_run = [
self.search_levels_on_page(
query=query,
filters=filters,
user_id=user_id,
page=page,
gauntlet=gauntlet,
exclude=excluding(Exception),
client=client,
)
for page in pages
]
levels, creators, songs = [], [], []
for (level_part, creator_part, song_part) in await asyncio.gather(*to_run):
levels.extend(level_part)
creators.extend(creator_part)
songs.extend(song_part)
return levels, creators, songs
async def report_level(self, level_id: int) -> None:
payload = Params().create_new("web").put_definer("levelid", level_id).finish()
codes = {-1: MissingAccess(f"Failed to report a level by ID: {level_id!r}.")}
await self.http.request(Route.REPORT_LEVEL, payload, error_codes=codes)
async def delete_level(self, level_id: int, *, client: Client) -> None:
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_definer("levelid", level_id)
.put_password(client.encodedpass)
.finish_level()
)
resp = await self.http.request(Route.DELETE_LEVEL, payload)
if resp != 1:
raise MissingAccess(f"Failed to delete a level by ID: {level_id}.")
async def update_level_desc(self, level_id: int, content: str, *, client: Client) -> None:
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_definer("levelid", level_id)
.put_level_desc(content)
.finish()
)
resp = await self.http.request(Route.UPDATE_LEVEL_DESC, payload)
if resp != 1:
raise MissingAccess(f"Failed to update description of the level by ID: {level_id}.")
async def rate_level(self, level_id: int, rating: int, *, client: Client) -> None:
assert 0 < rating <= 10, "Invalid star value given."
rs, udid, uuid = Coder.gen_rs(), Params.gen_udid(), Params.gen_uuid()
values = [level_id, rating, rs, client.account_id, udid, uuid]
chk = Coder.gen_chk(type="like_rate", values=values)
payload = (
Params()
.create_new()
.put_definer("levelid", level_id)
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_udid(udid)
.put_uuid(uuid)
.put_definer("stars", rating)
.put_rs(rs)
.put_chk(chk)
.finish()
)
resp = await self.http.request(Route.RATE_LEVEL_STARS, payload)
if resp != 1:
raise MissingAccess(f"Failed to rate level by ID: {level_id}.")
async def rate_demon(
self, level_id: int, demon_rating: DemonDifficulty, mod: bool, *, client: Client
) -> bool:
rating_level = demon_rating.value
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_definer("levelid", level_id)
.put_definer("rating", rating_level)
.put_mode(int(mod))
.finish_mod()
)
codes = {-2: MissingAccess("Attempt to rate as mod without mod permissions.")}
resp = await self.http.request(Route.RATE_LEVEL_DEMON, payload, error_codes=codes)
if not resp:
return False
elif isinstance(resp, int) and resp > 0:
return True
async def send_level(
self, level_id: int, rating: int, featured: bool, *, client: Client
) -> None:
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_definer("levelid", level_id)
.put_definer("stars", rating)
.put_feature(int(featured))
.finish_mod()
)
codes = {
-2: MissingAccess(f"Missing moderator permissions to send a level by ID: {level_id!r}.")
}
resp = await self.http.request(Route.SUGGEST_LEVEL_STARS, payload, error_codes=codes)
if resp != 1:
raise MissingAccess(f"Failed to send a level by ID: {level_id!r}.")
async def like(
self, item_id: int, typeid: int, special: int, dislike: bool = False, *, client: Client
) -> None:
like = dislike ^ 1
rs, udid, uuid = Coder.gen_rs(), Params.gen_udid(), Params.gen_uuid()
values = [special, item_id, like, typeid, rs, client.account_id, udid, uuid]
chk = Coder.gen_chk(type="like_rate", values=values)
payload = (
Params()
.create_new(game_version=20)
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_udid(udid)
.put_uuid(uuid)
.put_definer("itemid", item_id)
.put_like(like)
.put_type(typeid)
.put_special(special)
.put_rs(rs)
.put_chk(chk)
.finish()
)
resp = await self.http.request(Route.LIKE_ITEM, payload)
if resp != 1:
raise MissingAccess(f"Failed to like an item by ID: {item_id}.")
async def get_page_messages(
self,
sent_or_inbox: str,
page: int,
*,
exclude: Tuple[Type[BaseException]] = (),
client: Client,
) -> List[ExtDict]:
assert sent_or_inbox in ("inbox", "sent")
inbox = 0 if sent_or_inbox != "sent" else 1
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_page(page)
.put_total(0)
.get_sent(inbox)
.finish()
)
codes = {-1: MissingAccess("Failed to get messages."), -2: NothingFound("gd.Message")}
resp = await self.http.request(
Route.GET_PRIVATE_MESSAGES, payload, error_codes=codes, exclude=exclude
)
resp = Parser().split("#").take(0).check_empty().split("|").parse(resp)
if resp is None:
return []
parser = Parser().with_split(":").should_map()
return list(map(parser.parse, resp))
async def get_messages(
self, sent_or_inbox: str, pages: Optional[Sequence[int]] = None, *, client: Client
) -> List[ExtDict]:
assert sent_or_inbox in ("inbox", "sent")
to_run = [
self.get_page_messages(
sent_or_inbox=sent_or_inbox, page=page, exclude=excluding(Exception), client=client
)
for page in pages
]
return await self.run_many(to_run)
async def post_comment(self, content: str, *, client: Client) -> None:
to_gen = [client.name, 0, 0, 1]
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_username(client.name)
.put_password(client.encodedpass)
.put_comment(content, to_gen)
.comment_for("profile")
.finish()
)
codes = {-1: MissingAccess("Failed to post a comment.")}
await self.http.request(Route.UPLOAD_ACC_COMMENT, payload, error_codes=codes)
async def comment_level(
self, level_id: int, content: str, percentage: int, *, client: Client
) -> None:
if percentage > 100:
raise ValueError(f"{percentage}% > 100% percentage arg was received.")
percentage = round(percentage) # just in case
to_gen = [client.name, level_id, percentage, 0]
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_username(client.name)
.put_password(client.encodedpass)
.put_comment(content, to_gen)
.comment_for("level", level_id)
.put_percent(percentage)
.finish()
)
codes = {-1: MissingAccess(f"Failed to post a comment on a level by ID: {level_id!r}.")}
await self.http.request(Route.UPLOAD_COMMENT, payload, error_codes=codes)
async def delete_comment(
self, typeof: CommentType, comment_id: int, level_id: int, *, client: Client
) -> None:
cases = {0: Route.DELETE_LEVEL_COMMENT, 1: Route.DELETE_ACC_COMMENT}
route = cases.get(typeof.value)
payload = (
Params()
.create_new()
.put_definer("commentid", comment_id)
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.comment_for(typeof.name.lower(), level_id)
.finish()
)
resp = await self.http.request(route, payload)
if resp != 1:
raise MissingAccess(f"Failed to delete a comment by ID: {comment_id!r}.")
async def send_friend_request(
self, target_id: int, message: str = "", *, client: Client
) -> None:
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_recipient(target_id)
.put_fr_comment(message)
.put_password(client.encodedpass)
.finish()
)
resp = await self.http.request(Route.SEND_REQUEST, payload)
if not resp: # if request is already sent
return
elif resp != 1:
raise MissingAccess(f"Failed to send a friend request to user by ID: {target_id!r}.")
async def delete_friend_request(
self, typeof: MessageOrRequestType, user_id: int, client: Client
) -> None:
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_definer("user", user_id)
.put_password(client.encodedpass)
.put_is_sender(typeof.name.lower())
.finish()
)
resp = await self.http.request(Route.DELETE_REQUEST, payload)
if resp != 1:
raise MissingAccess(
f"Failed to delete a friend request by User (with ID): {user_id!r}."
)
async def accept_friend_request(
self, typeof: MessageOrRequestType, request_id: int, user_id: int, client: Client
) -> None:
if typeof.value: # is gd.MessageOrRequestType.SENT
raise MissingAccess(
"Failed to accept a friend request. Reason: request is sent, not received one."
)
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_definer("user", user_id)
.put_definer("requestid", request_id)
.finish()
)
resp = await self.http.request(Route.ACCEPT_REQUEST, payload)
if resp != 1:
raise MissingAccess(f"Failed to accept a friend request by ID: {request_id!r}.")
async def read_friend_request(self, request_id: int, client: Client) -> None:
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_definer("requestid", request_id)
.finish()
)
resp = await self.http.request(Route.READ_REQUEST, payload)
if resp != 1:
raise MissingAccess(f"Failed to read a friend request by ID: {request_id!r}.")
async def read_message(
self, typeof: MessageOrRequestType, message_id: int, client: Client
) -> str:
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_definer("messageid", message_id)
.put_is_sender(typeof.name.lower())
.put_password(client.encodedpass)
.finish()
)
codes = {-1: MissingAccess(f"Failed to read a message by ID: {message_id!r}.")}
resp = await self.http.request(Route.READ_PRIVATE_MESSAGE, payload, error_codes=codes,)
mapped = Parser().with_split(":").should_map().parse(resp)
return Coder.decode(type="message", string=mapped.get(Index.MESSAGE_BODY, ""))
async def delete_message(
self, typeof: MessageOrRequestType, message_id: int, client: Client
) -> None:
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_definer("messageid", message_id)
.put_password(client.encodedpass)
.put_is_sender(typeof.name.lower())
.finish()
)
resp = await self.http.request(Route.DELETE_PRIVATE_MESSAGE, payload)
if resp != 1:
raise MissingAccess(f"Failed to delete a message by ID: {message_id!r}.")
async def get_gauntlets(self) -> List[ExtDict]:
payload = Params().create_new().finish()
resp = await self.http.request(Route.GET_GAUNTLETS, payload)
splitted = Parser().split("#").take(0).split("|").parse(resp)
parser = Parser().with_split(":").should_map()
return list(map(parser.parse, filter(is_not_empty, splitted)))
async def get_page_map_packs(
self, page: int = 0, *, exclude: Tuple[Type[BaseException]] = (),
) -> List[ExtDict]:
payload = Params().create_new().put_page(page).finish()
resp = await self.http.request(Route.GET_MAP_PACKS, payload)
splitted = Parser().split("#").take(0).split("|").check_empty().should_map().parse(resp)
if not splitted:
if issubclass(NothingFound, exclude):
return []
raise NothingFound("gd.MapPack")
parser = Parser().with_split(":").should_map()
return list(map(parser.parse, splitted))
async def get_map_packs(self, pages: Sequence[int]) -> List[ExtDict]:
to_run = [
self.get_page_map_packs(page=page, exclude=excluding(Exception)) for page in pages
]
return await self.run_many(to_run)
async def get_page_friend_requests(
self,
sent_or_inbox: str = "inbox",
page: int = 0,
*,
exclude: Tuple[Type[BaseException]] = (),
client: Client,
) -> List[ExtDict]:
inbox = int(sent_or_inbox == "sent")
payload = (
Params()
.create_new()
.put_definer("accountid", str(client.account_id))
.put_password(client.encodedpass)
.put_page(page)
.put_total(0)
.get_sent(inbox)
.finish()
)
codes = {
-1: MissingAccess(f"Failed to get friend requests on page {page}."),
-2: NothingFound("gd.FriendRequest"),
}
resp = await self.http.request(
Route.GET_FRIEND_REQUESTS, payload, error_codes=codes, exclude=exclude
)
splitted = Parser().split("#").take(0).split("|").check_empty().parse(resp)
if resp is None:
return []
parser = Parser().split(":").add_ext({"101": inbox}).should_map()
return list(map(parser.parse, splitted))
async def get_friend_requests(
self, pages: Sequence[int], sent_or_inbox: str = "inbox", *, client: Client
) -> List[ExtDict]:
assert sent_or_inbox in ("sent", "inbox")
to_run = [
self.get_page_friend_requests(
sent_or_inbox=sent_or_inbox, page=page, exclude=excluding(Exception), client=client
)
for page in pages
]
return await self.run_many(to_run)
async def retrieve_page_comments(
self,
account_id: int,
id: int,
type: str = "profile",
page: int = 0,
*,
strategy: CommentStrategy,
exclude: Tuple[Type[BaseException]] = (),
) -> List[ExtDict]:
assert isinstance(page, int) and page >= 0
assert type in ("profile", "level")
is_level = type == "level"
typeid = is_level ^ 1
definer = "userid" if is_level else "accountid"
selfid = id if is_level else account_id
route = Route.GET_COMMENT_HISTORY if is_level else Route.GET_ACC_COMMENTS
parser = Parser().add_ext({"101": typeid}).should_map()
if is_level:
parser.split(":").take(0).split("~")
else:
parser.with_split("~")
param_obj = Params().create_new().put_definer(definer, selfid).put_page(page).put_total(0)
if is_level:
param_obj.put_mode(strategy.value)
payload = param_obj.finish()
codes = {
-1: MissingAccess(f"Failed to retrieve comment for user by Account ID: {account_id!r}.")
}
resp = await self.http.request(route, payload, error_codes=codes, exclude=exclude)
if not resp:
return []
splitted = resp.split("#").pop(0)
if not splitted:
if issubclass(NothingFound, exclude):
return []
raise NothingFound("gd.Comment")
return list(map(parser.parse, filter(is_not_empty, splitted.split("|"))))
async def retrieve_comments(
self,
account_id: int,
id: int,
pages: Sequence[int],
type: str = "profile",
*,
strategy: CommentStrategy,
) -> List[ExtDict]:
assert type in ("profile", "level")
to_run = [
self.retrieve_page_comments(
type=type,
account_id=account_id,
id=id,
page=page,
exclude=excluding(Exception),
strategy=strategy,
)
for page in pages
]
return await self.run_many(to_run)
async def get_level_comments(
self,
level_id: int,
strategy: CommentStrategy,
amount: int,
exclude: Tuple[Type[BaseException]] = (),
) -> List[Tuple[ExtDict, ExtDict]]:
# comment, user
payload = (
Params()
.create_new()
.put_definer("levelid", level_id)
.put_page(0)
.put_total(0)
.put_mode(strategy.value)
.put_count(amount)
.finish()
)
codes = {
-1: MissingAccess(f"Failed to get comments of a level by ID: {level_id!r}."),
-2: NothingFound("gd.Comment"),
}
resp = await self.http.request(
Route.GET_COMMENTS, payload, error_codes=codes, exclude=exclude
)
if resp is None:
return []
splitted = Parser().split("#").take(0).split("|").parse(resp)
parser = Parser().with_split("~").should_map()
res = []
for elem in filter(is_not_empty, splitted):
com_data, user_data, *_ = map(parser.parse, elem.split(":"))
com_data.update({"1": level_id, "101": 0, "102": 0})
user_data = ExtDict(
account_id=user_data.getcast(Index.USER_ACCOUNT_ID, 0, int),
id=com_data.getcast(Index.COMMENT_AUTHOR_ID, 0, int),
name=user_data.get(Index.USER_NAME, "unknown"),
)
res.append((com_data, user_data))
return res
async def block_user(self, account_id: int, unblock: bool = False, *, client: Client) -> None:
route = Route.UNBLOCK_USER if unblock else Route.BLOCK_USER
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_definer("user", account_id)
.finish()
)
resp = await self.http.request(route, payload)
if resp != 1:
raise MissingAccess(
f"Failed to {'un' if unblock else ''}block a user by Account ID: {account_id!r}."
)
async def unfriend_user(self, account_id: int, *, client: Client) -> None:
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_definer("user", account_id)
.finish()
)
resp = await self.http.request(Route.REMOVE_FRIEND, payload)
if resp != 1:
raise MissingAccess(f"Failed to unfriend a user by Account ID: {account_id!r}.")
async def send_message(
self, account_id: int, subject: str, body: str, *, client: Client
) -> None:
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_message(subject, body)
.put_recipient(account_id)
.put_password(client.encodedpass)
.finish()
)
resp = await self.http.request(Route.SEND_PRIVATE_MESSAGE, payload)
if resp != 1:
raise MissingAccess(
f"Failed to send a message to a user by Account ID: {account_id!r}."
)
async def update_profile(self, settings: Dict[str, int], *, client: Client) -> None:
settings_cased = {Converter.snake_to_camel(name): value for name, value in settings.items()}
rs = Coder.gen_rs()
req_chk_params = [client.account_id]
req_chk_params.extend(
settings.get(param, 0)
for param in (
"user_coins",
"demons",
"stars",
"coins",
"icon_type",
"icon",
"diamonds",
"acc_icon",
"acc_ship",
"acc_ball",
"acc_bird",
"acc_dart",
"acc_robot",
"acc_glow",
"acc_spider",
"acc_explosion",
)
)
chk = Coder.gen_chk(type="userscore", values=req_chk_params)
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_username(client.name)
.put_seed(rs)
.put_seed(chk, suffix=str(2))
.finish()
)
payload.update(settings_cased)
resp = await self.http.request(Route.UPDATE_USER_SCORE, payload)
if not resp > 0:
raise MissingAccess(f"Failed to update profile of a client: {client!r}")
async def update_settings(
self,
message_policy: MessagePolicyType,
friend_request_policy: FriendRequestPolicyType,
comment_policy: CommentPolicyType,
youtube: str,
twitter: str,
twitch: str,
*,
client: Client,
) -> None:
payload = (
Params()
.create_new("web")
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_profile_upd(
message_policy.value,
friend_request_policy.value,
comment_policy.value,
youtube,
twitter,
twitch,
)
.finish_login()
)
resp = await self.http.request(Route.UPDATE_ACC_SETTINGS, payload)
if resp != 1:
raise MissingAccess(f"Failed to update profile settings of a client: {client!r}.")
async def get_quests(self, *, client: Client) -> List[ExtDict]:
udid, uuid = Params.gen_udid(), Params.gen_uuid()
rn = Coder.gen_rs(5, charset=string.digits)
chk = Coder.gen_chk(type="challenges", values=[rn])
codes = {-1: MissingAccess("Failed to get quests.")}
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_udid(udid)
.put_uuid(uuid)
.put_chk(chk[:REWARD_CHALLENGE_CHK_LENGTH])
.finish()
)
resp = await self.http.request(Route.GET_CHALLENGES, payload, error_codes=codes)
resp = Parser().split("|").take(0).check_empty().parse(resp)
if resp is None:
return []
data = Coder.decode(
type="challenges", string=resp[REWARD_CHALLENGE_SLICE_LENGTH:], use_bytes=True
)
time_left, *quests = data.split(":")[-QUEST_AMOUNT - 1 :]
time_left = int(time_left)
result = []
for quest in quests:
try:
id, type, amount, reward, name, *_ = quest.split(",")
except ValueError:
continue
result.append(
ExtDict(
id=int(id),
type=QuestType.from_value(int(type), "unknown"),
amount=int(amount),
reward=int(reward),
name=name,
seconds=time_left,
)
)
return result
async def get_chests(self, reward_type: RewardType, *, client: Client) -> List[ExtDict]:
udid, uuid = Params.gen_udid(), Params.gen_uuid()
rn = Coder.gen_rs(5, charset=string.digits)
chk = Coder.gen_chk(type="rewards", values=[rn])
codes = {-1: MissingAccess("Failed to get chests.")}
payload = (
Params()
.create_new()
.put_definer("accountid", client.account_id)
.put_password(client.encodedpass)
.put_udid(udid)
.put_uuid(uuid)
.put_definer("reward", reward_type.value)
.put_chk(chk[:REWARD_CHALLENGE_CHK_LENGTH])
.put_seed(0, prefix="r", suffix=1)
.put_seed(0, prefix="r", suffix=2)
.finish()
)
resp = await self.http.request(Route.GET_REWARDS, payload, error_codes=codes)
resp = Parser().split("|").take(0).check_empty().parse(resp)
if resp is None:
return []
data = Coder.decode(
type="rewards", string=resp[REWARD_CHALLENGE_SLICE_LENGTH:], use_bytes=True
)
chest_parts = data.split(":")[-CHEST_AMOUNT * CHEST_INNER_PARTS - 1 : -1]
result = []
for chest_id, (time_left, chest_info, chest_count) in enumerate(
group(chest_parts, CHEST_INNER_PARTS), 1
):
try:
orbs, diamonds, shard_id, keys, *_ = chest_info.split(",")
except ValueError:
continue
result.append(
ExtDict(
id=chest_id,
seconds=int(time_left),
count=int(chest_count),
orbs=int(orbs),
diamonds=int(diamonds),
shard_id=int(shard_id),
shard_type=ShardType.from_value(int(shard_id), "unknown"),
keys=int(keys),
)
)
return result
async def run_many(self, tasks: List[asyncio.Task]) -> Any:
res = await asyncio.gather(*tasks)
res = [elem for elem in res if elem]
if all(iterable(elem) for elem in res):
res = list(chain.from_iterable(res))
return res
def excluding(*args: Tuple[Type[BaseException]]) -> Tuple[Type[BaseException]]:
return args
def group(some_iterable: Iterable[Any], group_size: int = 2) -> Iterable[Tuple[Any]]:
return zip(*(iter(some_iterable),) * group_size)
def iterable(maybe_iterable: Iterable) -> bool:
try:
iter(maybe_iterable)
return True
except Exception:
return False
def is_not_empty(sequence: Sequence) -> bool:
return len(sequence) != 0