initial commit
This commit is contained in:
155
app/igdb_helpers.py
Normal file
155
app/igdb_helpers.py
Normal file
@@ -0,0 +1,155 @@
|
||||
import os
|
||||
import requests
|
||||
import time
|
||||
|
||||
class IGDBHelper:
|
||||
def __init__(self):
|
||||
self.client_id = os.environ.get('IGDB_CLIENT_ID')
|
||||
self.client_secret = os.environ.get('IGDB_CLIENT_SECRET')
|
||||
self.access_token = None
|
||||
self.token_expires = 0
|
||||
self.base_url = "https://api.igdb.com/v4"
|
||||
|
||||
def get_access_token(self):
|
||||
"""Get a new access token from Twitch API for IGDB"""
|
||||
if not self.client_id or not self.client_secret:
|
||||
raise ValueError("IGDB API credentials not found in environment variables")
|
||||
|
||||
current_time = time.time()
|
||||
|
||||
# If token exists and not expired, return it
|
||||
if self.access_token and current_time < self.token_expires:
|
||||
return self.access_token
|
||||
|
||||
# Otherwise get a new token
|
||||
auth_url = "https://id.twitch.tv/oauth2/token"
|
||||
auth_params = {
|
||||
"client_id": self.client_id,
|
||||
"client_secret": self.client_secret,
|
||||
"grant_type": "client_credentials"
|
||||
}
|
||||
|
||||
response = requests.post(auth_url, params=auth_params)
|
||||
data = response.json()
|
||||
|
||||
if response.status_code == 200 and "access_token" in data:
|
||||
self.access_token = data["access_token"]
|
||||
# Set expiry time (subtract 60 seconds to be safe)
|
||||
self.token_expires = current_time + data["expires_in"] - 60
|
||||
return self.access_token
|
||||
else:
|
||||
raise Exception(f"Failed to get access token: {data}")
|
||||
|
||||
def search_games(self, query, limit=10):
|
||||
"""Search for games by name"""
|
||||
token = self.get_access_token()
|
||||
|
||||
headers = {
|
||||
'Client-ID': self.client_id,
|
||||
'Authorization': f'Bearer {token}'
|
||||
}
|
||||
|
||||
# IGDB API endpoint for games
|
||||
endpoint = f"{self.base_url}/games"
|
||||
|
||||
# Search for games with the given query, include relevant fields
|
||||
body = f"""
|
||||
search "{query}";
|
||||
fields name, cover.url, summary, first_release_date, platforms.name, involved_companies.company.name, involved_companies.developer, involved_companies.publisher;
|
||||
limit {limit};
|
||||
"""
|
||||
|
||||
response = requests.post(endpoint, headers=headers, data=body)
|
||||
|
||||
if response.status_code == 200:
|
||||
games = response.json()
|
||||
return self._process_game_results(games)
|
||||
else:
|
||||
raise Exception(f"IGDB API error: {response.status_code} - {response.text}")
|
||||
|
||||
def get_game_by_id(self, game_id):
|
||||
"""Get a game by its IGDB ID"""
|
||||
token = self.get_access_token()
|
||||
|
||||
headers = {
|
||||
'Client-ID': self.client_id,
|
||||
'Authorization': f'Bearer {token}'
|
||||
}
|
||||
|
||||
# IGDB API endpoint for games
|
||||
endpoint = f"{self.base_url}/games"
|
||||
|
||||
# Get game with the given ID, include relevant fields
|
||||
body = f"""
|
||||
where id = {game_id};
|
||||
fields name, cover.url, summary, first_release_date, platforms.name, involved_companies.company.name, involved_companies.developer, involved_companies.publisher;
|
||||
limit 1;
|
||||
"""
|
||||
|
||||
response = requests.post(endpoint, headers=headers, data=body)
|
||||
|
||||
if response.status_code == 200 and response.json():
|
||||
game = response.json()[0]
|
||||
return self._process_game_results([game])[0]
|
||||
else:
|
||||
raise Exception(f"IGDB API error: {response.status_code} - {response.text}")
|
||||
|
||||
def _process_game_results(self, games):
|
||||
"""Process the raw API results into a more usable format"""
|
||||
processed_games = []
|
||||
|
||||
for game in games:
|
||||
# Extract developer and publisher
|
||||
developer = None
|
||||
publisher = None
|
||||
|
||||
if 'involved_companies' in game:
|
||||
for company in game['involved_companies']:
|
||||
if company.get('developer', False):
|
||||
try:
|
||||
developer = company['company']['name']
|
||||
except (KeyError, TypeError):
|
||||
pass
|
||||
|
||||
if company.get('publisher', False):
|
||||
try:
|
||||
publisher = company['company']['name']
|
||||
except (KeyError, TypeError):
|
||||
pass
|
||||
|
||||
# Format cover URL
|
||||
cover_url = None
|
||||
if 'cover' in game and 'url' in game['cover']:
|
||||
# IGDB returns image URLs without https:, and in thumbnail size
|
||||
# Replace to get larger images: t_thumb -> t_cover_big
|
||||
cover_url = game['cover']['url']
|
||||
if cover_url.startswith('//'):
|
||||
cover_url = f"https:{cover_url}"
|
||||
cover_url = cover_url.replace('t_thumb', 't_cover_big')
|
||||
|
||||
# Format platforms
|
||||
platforms = []
|
||||
if 'platforms' in game:
|
||||
platforms = [platform['name'] for platform in game['platforms']]
|
||||
|
||||
# Format release date
|
||||
release_date = None
|
||||
if 'first_release_date' in game:
|
||||
release_date = game['first_release_date']
|
||||
# Convert Unix timestamp to readable format
|
||||
release_date = time.strftime('%Y-%m-%d', time.gmtime(release_date))
|
||||
|
||||
processed_game = {
|
||||
'id': game['id'],
|
||||
'name': game['name'],
|
||||
'cover_url': cover_url,
|
||||
'description': game.get('summary'),
|
||||
'release_date': release_date,
|
||||
'platforms': platforms,
|
||||
'developer': developer,
|
||||
'publisher': publisher
|
||||
}
|
||||
|
||||
processed_games.append(processed_game)
|
||||
|
||||
return processed_games
|
||||
Reference in New Issue
Block a user