Client/cache_handler/cache_handler.py

206 lines
6.6 KiB
Python
Raw Normal View History

import json
import random
import asyncio
import hashlib
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict
from argparse import Namespace, ArgumentParser
import httpx
import aiofiles
# hack to get pyinstaller 3.5 to work
if False:
import anyio._backends._asyncio
BUF_SIZE = 1 << 16
SizeDict = Dict[str, Dict[str, Dict[str, int]]]
size_dict: SizeDict = {}
@dataclass
class FileInfo:
version: str
mode: str
url_path: str
local_path: Path
sha256: str
async def send_message(writer: asyncio.StreamWriter, obj: SizeDict) -> None:
message = (json.dumps(obj) + '\n').encode('utf-8')
writer.write(message)
await writer.drain()
async def check_file_hash(file_info: FileInfo) -> bool:
size = 0
state = 'altered'
sha256 = hashlib.sha256()
try:
async with aiofiles.open(file_info.local_path, mode='rb') as rb:
while True:
data = await rb.read(BUF_SIZE)
if not data:
break
sha256.update(data)
size += len(data)
state = 'intact' if sha256.hexdigest() == file_info.sha256 else 'altered'
size_dict[file_info.version][file_info.mode][state] += size
except:
pass
return state == 'intact'
async def download_file(writer: asyncio.StreamWriter, client: httpx.AsyncClient, file_info: FileInfo, retries: int = 5) -> None:
if (await check_file_hash(file_info)):
await send_message(writer, size_dict)
return
for i in range(retries):
try:
async with client.stream('GET', file_info.url_path) as stream:
stream.raise_for_status()
async with aiofiles.open(file_info.local_path, mode='wb') as wb:
async for chunk in stream.aiter_bytes(chunk_size=BUF_SIZE):
await wb.write(chunk)
except:
await asyncio.sleep(i + 1)
if (await check_file_hash(file_info)):
break
await send_message(writer, size_dict)
async def hash_check(writer: asyncio.StreamWriter, file_info_list: List[FileInfo], update_freq: int = 50) -> None:
coroutines = [check_file_hash(file_info) for file_info in file_info_list]
random.shuffle(coroutines)
for i in range(0, len(coroutines), update_freq):
await asyncio.gather(*coroutines[i:i+update_freq])
await send_message(writer, size_dict)
async def download(writer: asyncio.StreamWriter, file_info_list: List[FileInfo], max_connections: int = 5) -> None:
for file_info in file_info_list:
file_info.local_path.parent.mkdir(parents=True, exist_ok=True)
async with httpx.AsyncClient(limits=httpx.Limits(max_connections=max_connections),
timeout=httpx.Timeout(None)) as client:
coroutines = [download_file(writer, client, file_info) for file_info in file_info_list]
random.shuffle(coroutines)
await asyncio.gather(*coroutines)
async def remove(writer: asyncio.StreamWriter, file_info_list: List[FileInfo]) -> None:
roots = set()
for file_info in file_info_list:
if file_info.local_path.parent.is_dir():
roots.add(file_info.local_path.parent)
if file_info.local_path.is_file():
file_info.local_path.unlink()
await send_message(writer, size_dict)
roots_list: List[Path] = sorted(roots, key=lambda p: len(p.parts), reverse=True)
for root_dir in roots_list:
if not any(root_dir.iterdir()):
root_dir.rmdir()
async def prep_and_run_coroutine(args: Namespace) -> None:
file_info_list = compile_file_list(args)
_, writer = await asyncio.open_connection('localhost', args.port)
coroutines = {
'hash-check': hash_check,
'download': download,
'delete': remove,
}
await coroutines[args.mode](writer, file_info_list)
writer.close()
await writer.wait_closed()
def swapped_path(local_root: str, user_dir: str, cache_version: str) -> Path:
current_cache = Path(local_root) / 'FusionFall'
named_cache = Path(local_root) / cache_version
record_path = Path(user_dir) / '.lastver'
if (not named_cache.is_dir() and
current_cache.is_dir() and
record_path.is_file() and
cache_version == record_path.read_text(encoding='utf-8')):
return current_cache
return named_cache
def compile_file_list(args: Namespace) -> List[FileInfo]:
with open(Path(args.user_dir) / 'hashes.json') as r:
hash_dict = json.load(r)
cache_modes = ['offline', 'playable'] if args.cache_mode == 'all' else [args.cache_mode]
cache_versions = list(hash_dict) if args.cache_version == 'all' else [args.cache_version]
file_info_list = []
for cache_version in cache_versions:
for cache_mode in cache_modes:
local_root = args.offline_root if cache_mode == 'offline' else args.playable_root
local_dir = swapped_path(local_root, args.user_dir, cache_version)
if cache_version not in size_dict:
size_dict[cache_version] = {}
size_dict[cache_version][cache_mode] = {
'intact': 0,
'altered': 0,
'total': hash_dict[cache_version][cache_mode + '_size'],
}
file_info_list.extend([
FileInfo(
version=cache_version,
mode=cache_mode,
url_path='/'.join([args.cdn_root, cache_version, rel_path]),
local_path=(local_dir / rel_path),
sha256=file_hash,
) for rel_path, file_hash in hash_dict[cache_version][cache_mode].items()
])
return file_info_list
def parse_args() -> Namespace:
parser = ArgumentParser('Python executable for tasks relating to OpenFusionClient.')
parser.add_argument('--mode', type=str, required=True, choices=['hash-check', 'download', 'delete'])
parser.add_argument('--playable-root', dest='playable_root', type=str)
parser.add_argument('--offline-root', dest='offline_root', type=str)
parser.add_argument('--user-dir', dest='user_dir', type=str, required=True)
parser.add_argument('--cdn-root', dest='cdn_root', type=str, default='http://cdn.dexlabs.systems/ff/big')
parser.add_argument('--cache-mode', dest='cache_mode', type=str, default='all', choices=['all', 'offline', 'playable'])
parser.add_argument('--cache-version', dest='cache_version', type=str, default='all')
parser.add_argument('--port', type=str, required=True)
return parser.parse_args()
def main() -> None:
asyncio.run(prep_and_run_coroutine(parse_args()))
if __name__ == '__main__':
main()