[ie/niconicochennelplus] Fix login; refresh token before extraction

pull/8645/head
Mozi 2 months ago
parent 68a966f203
commit 197994fc61

@ -11,6 +11,7 @@ from ..utils import (
clean_html,
filter_dict,
int_or_none,
get_domain,
parse_qs,
str_or_none,
time_seconds,
@ -28,7 +29,7 @@ class NiconicoChannelPlusBaseIE(InfoExtractor):
def _call_api(self, path, item_id, *args, **kwargs):
return self._download_json(
f'https://nfc-api.nicochannel.jp/fc/{path}', video_id=item_id, *args, **kwargs)
f'https://api.nicochannel.jp/fc/{path}', video_id=item_id, *args, **kwargs)
def _find_fanclub_site_id(self, channel_name):
fanclub_list_json = self._call_api(
@ -105,50 +106,83 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
},
}]
_LOGIN_API = 'https://auth.sheeta.com/auth/realms/FCS00001/protocol/openid-connect/auth?client_id=FCS00056&response_type=code&scope=openid&kc_idp_hint=niconico&redirect_uri=https%3A%2F%2Fnicochannel.jp%2Ftestman%2Flogin'
_LOGIN_API = 'https://auth.sheeta.com/auth/realms/FCS00001/protocol/openid-connect/auth?client_id=FCS00001&response_type=code&scope=openid&kc_idp_hint=niconico&redirect_uri=https%3A%2F%2Fnicochannel.jp%2Flogin'
_AUTH_BASE_URL = 'https://account.nicovideo.jp/'
_AUTH_TOKEN = None
_TOKEN_REFRESH_API = 'https://api.nicochannel.jp/fc/fanclub_groups/1/auth/refresh'
_AUTH_INFO = None
def _get_bearer_token_by_cookies(self):
def _get_token_with_cookies(self):
urlh = self._request_webpage(
self._LOGIN_API, None, note='Getting auth status',
expected_status=404, errnote='Unable to get auth status')
if not urlh.url.startswith('https://nicochannel.jp/testman/login'):
if not urlh.url.startswith('https://nicochannel.jp/login'):
return None
if not (sns_login_code := traverse_obj(parse_qs(urlh.url), ('code', 0))):
self.report_warning('Unable to get sns login code')
return None
token = traverse_obj(self._call_api(
'fanclub_groups/1/sns_login', item_id=None,
note='Fetching sns login info', errnote='Unable to fetch sns login info', fatal=False,
data=json.dumps({
'key_cloak_user': {
'code': sns_login_code,
'redirect_uri': 'https://nicochannel.jp/testman/login'
},
'fanclub_site': {'id': 56},
}).encode('ascii'), headers={
'Content-Type': 'application/json',
'fc_use_device': 'null',
'Referer': 'https://nicochannel.jp/',
}), ('data', 'access_token', {str_or_none}))
return f'Bearer {token}' if token else None
if token := traverse_obj(self._call_api(
'fanclub_groups/1/sns_login', item_id=None, fatal=False,
note='Fetching sns login info', errnote='Unable to fetch sns login info',
data=json.dumps({
'key_cloak_user': {
'code': sns_login_code,
'redirect_uri': 'https://nicochannel.jp/login',
},
'fanclub_site': {'id': 1},
}).encode('ascii'), headers={
'Content-Type': 'application/json',
'fc_use_device': 'null',
'Referer': 'https://nicochannel.jp/',
}), ('data', 'access_token', {str_or_none})):
return f'Bearer {token}'
self.report_warning('Unable to get token from sns login info')
return None
def _refresh_token(self, content_code):
if auth_token := traverse_obj(self._AUTH_INFO, ('auth_token', {str})):
response, urlh = self._download_json_handle(
self._TOKEN_REFRESH_API, content_code, expected_status=(400, 404),
headers={'Authorization': auth_token}, data=b'',
note='Getting new token', errnote='Unable to get new token')
if urlh.status == 404:
self.report_warning('Unable to get new token due to missing cookies', content_code)
elif error := traverse_obj(
response, ('error', 'message', {lambda msg: base64.b64decode(msg).decode()}), ('error', 'message')):
self.report_warning(f'Unable to get new token: {error!r}', content_code)
elif token := traverse_obj(response, ('data', 'access_token', {str_or_none})):
self._AUTH_INFO['auth_token'] = f'Bearer {token}'
else:
self.report_warning('Unable to get new token', content_code)
def _real_initialize(self):
if auth_cookies := traverse_obj(self._AUTH_INFO, ('auth_cookies', {dict})):
# For programming login
for name, value in auth_cookies.items():
self._set_cookie(get_domain(self._TOKEN_REFRESH_API), name, value)
elif auth_token := self._get_token_with_cookies():
# For valid cookies
self._AUTH_INFO = {'auth_token': auth_token}
def _perform_login(self, mail_tel, password):
mail_tel_b64 = base64.b64encode(mail_tel.encode('ascii')).decode('ascii')
cached_auth_data = self.cache.load(self._NETRC_MACHINE, mail_tel_b64)
if isinstance(cached_auth_data, dict) and time_seconds() - cached_auth_data.get('timestamp', 0) < 300:
self._AUTH_TOKEN = {
'mail_tel_b64': mail_tel_b64,
'bearer': cached_auth_data['bearer'],
}
return
# Cookies may be also specified
if bearer := self._get_bearer_token_by_cookies():
self._AUTH_TOKEN = {'mail_tel_b64': None, 'bearer': bearer}
if cached_auth_data and (cache_timestamp := traverse_obj(cached_auth_data, ('timestamp', {int})) or 0):
# Cookies expire in 30 days
if cache_timestamp + 30 * 86400 > time_seconds():
self._AUTH_INFO = {
'mail_tel_b64': mail_tel_b64,
'auth_token': traverse_obj(cached_auth_data, ('auth_token', {str})),
'auth_cookies': traverse_obj(cached_auth_data, ('auth_cookies', {dict})),
}
return
if auth_token := self._get_token_with_cookies():
# For valid cookies
self._AUTH_INFO = {'auth_token': auth_token}
return
login_url = urljoin(
@ -191,17 +225,22 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
self.report_warning(f'Unable to log in: MFA challenge failed, "{err_msg}"')
return
if bearer := self._get_bearer_token_by_cookies():
self.cache.store(self._NETRC_MACHINE, mail_tel_b64, {
'bearer': bearer,
'timestamp': int(time_seconds()),
})
self._AUTH_TOKEN = {'mail_tel_b64': mail_tel_b64, 'bearer': bearer}
if not (auth_token := self._get_token_with_cookies()):
self.report_warning('Unable to get token after login')
return
def _real_extract(self, url):
if (not self._AUTH_TOKEN) and (bearer := self._get_bearer_token_by_cookies()):
self._AUTH_TOKEN = {'mail_tel_b64': None, 'bearer': bearer}
common_info = {
'auth_token': auth_token,
'auth_cookies': dict(traverse_obj(
self.cookiejar.get_cookies_for_url(self._TOKEN_REFRESH_API), (..., {lambda item: (item.name, item.value)}))),
}
self.cache.store(self._NETRC_MACHINE, mail_tel_b64, {
**common_info,
'timestamp': int(time_seconds()),
})
self._AUTH_INFO = {'mail_tel_b64': mail_tel_b64, **common_info}
def _real_extract(self, url):
content_code, channel_id = self._match_valid_url(url).group('code', 'channel')
fanclub_site_id = self._find_fanclub_site_id(channel_id)
@ -210,6 +249,9 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
note='Fetching video page info', errnote='Unable to fetch video page info',
)['data']['video_page']
# The token for getting sessions expires in 5 minutes
self._refresh_token(content_code)
live_status, session_id = self._get_live_status_and_session_id(content_code, data_json)
release_timestamp_str = data_json.get('live_scheduled_start_at')
@ -326,19 +368,18 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
'Content-Type': 'application/json',
'fc_use_device': 'null',
'origin': 'https://nicochannel.jp',
'Authorization': traverse_obj(self._AUTH_INFO, ('auth_token', {str})),
}
if self._AUTH_TOKEN:
headers['Authorization'] = self._AUTH_TOKEN['bearer']
session_id = self._call_api(
f'video_pages/{content_code}/session_ids', item_id=f'{content_code}/session',
data=json.dumps(payload).encode('ascii'), headers=headers,
data=json.dumps(payload).encode('ascii'), headers=filter_dict(headers),
note='Getting session id', errnote='Unable to get session id',
)['data']['session_id']
except ExtractorError as e:
if not isinstance(e.cause, HTTPError) or e.cause.status not in (401, 403, 408):
raise e
if mail_tel_b64 := traverse_obj(self._AUTH_TOKEN, ('mail_tel_b64', {str})):
if mail_tel_b64 := traverse_obj(self._AUTH_INFO, ('mail_tel_b64', {str})):
self.cache.store(self._NETRC_MACHINE, mail_tel_b64, None)
self.raise_login_required(
msg={

Loading…
Cancel
Save