Mozi 3 weeks ago committed by GitHub
commit bdcd7f2162
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,38 +1,46 @@
import base64
import functools
import json
import re
from .common import InfoExtractor
from ..networking.exceptions import HTTPError
from ..utils import (
ExtractorError,
OnDemandPagedList,
clean_html,
filter_dict,
int_or_none,
get_domain,
parse_qs,
str_or_none,
time_seconds,
traverse_obj,
unified_timestamp,
url_or_none,
urlencode_postdata,
urljoin,
)
class NiconicoChannelPlusBaseIE(InfoExtractor):
_WEBPAGE_BASE_URL = 'https://nicochannel.jp'
_NETRC_MACHINE = 'niconicochannelplus'
def _call_api(self, path, item_id, *args, **kwargs):
return self._download_json(
f'https://nfc-api.nicochannel.jp/fc/{path}', video_id=item_id, *args, **kwargs)
f'https://api.nicochannel.jp/fc/{path}', video_id=item_id, *args, **kwargs)
def _find_fanclub_site_id(self, channel_name):
fanclub_list_json = self._call_api(
'content_providers/channels', item_id=f'channels/{channel_name}',
note='Fetching channel list', errnote='Unable to fetch channel list',
)['data']['content_providers']
fanclub_id = traverse_obj(fanclub_list_json, (
lambda _, v: v['domain'] == f'{self._WEBPAGE_BASE_URL}/{channel_name}', 'id'),
get_all=False)
if not fanclub_id:
raise ExtractorError(f'Channel {channel_name} does not exist', expected=True)
return fanclub_id
if fanclub_id := traverse_obj(fanclub_list_json, (
lambda _, v: v['domain'] == f'{self._WEBPAGE_BASE_URL}/{channel_name}', 'id', {int_or_none}),
get_all=False):
return fanclub_id
raise ExtractorError(f'Channel {channel_name} does not exist', expected=True)
def _get_channel_base_info(self, fanclub_site_id):
return traverse_obj(self._call_api(
@ -98,6 +106,140 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
},
}]
_LOGIN_API = 'https://auth.sheeta.com/auth/realms/FCS00001/protocol/openid-connect/auth?client_id=FCS00001&response_type=code&scope=openid&kc_idp_hint=niconico&redirect_uri=https%3A%2F%2Fnicochannel.jp%2Flogin'
_AUTH_BASE_URL = 'https://account.nicovideo.jp/'
_TOKEN_REFRESH_API = 'https://api.nicochannel.jp/fc/fanclub_groups/1/auth/refresh'
_AUTH_INFO = None
def _get_token_with_cookies(self):
urlh = self._request_webpage(
self._LOGIN_API, None, note='Getting auth status',
expected_status=404, errnote='Unable to get auth status')
if not urlh.url.startswith('https://nicochannel.jp/login'):
return None
if not (sns_login_code := traverse_obj(parse_qs(urlh.url), ('code', 0))):
self.report_warning('Unable to get sns login code')
return None
if token := traverse_obj(self._call_api(
'fanclub_groups/1/sns_login', item_id=None, fatal=False,
note='Fetching sns login info', errnote='Unable to fetch sns login info',
data=json.dumps({
'key_cloak_user': {
'code': sns_login_code,
'redirect_uri': 'https://nicochannel.jp/login',
},
'fanclub_site': {'id': 1},
}).encode('ascii'), headers={
'Content-Type': 'application/json',
'fc_use_device': 'null',
'Referer': 'https://nicochannel.jp/',
}), ('data', 'access_token', {str_or_none})):
return f'Bearer {token}'
self.report_warning('Unable to get token from sns login info')
return None
def _refresh_token(self, content_code):
if auth_token := traverse_obj(self._AUTH_INFO, ('auth_token', {str})):
response, urlh = self._download_json_handle(
self._TOKEN_REFRESH_API, content_code, expected_status=(400, 404),
headers={'Authorization': auth_token}, data=b'',
note='Getting new token', errnote='Unable to get new token')
if urlh.status == 404:
self.report_warning('Unable to get new token due to missing cookies', content_code)
elif error := traverse_obj(
response, ('error', 'message', {lambda msg: base64.b64decode(msg).decode()}), ('error', 'message')):
self.report_warning(f'Unable to get new token: {error!r}', content_code)
elif token := traverse_obj(response, ('data', 'access_token', {str_or_none})):
self._AUTH_INFO['auth_token'] = f'Bearer {token}'
else:
self.report_warning('Unable to get new token', content_code)
def _real_initialize(self):
if auth_cookies := traverse_obj(self._AUTH_INFO, ('auth_cookies', {dict})):
# For programming login
for name, value in auth_cookies.items():
self._set_cookie(get_domain(self._TOKEN_REFRESH_API), name, value)
elif auth_token := self._get_token_with_cookies():
# For valid cookies
self._AUTH_INFO = {'auth_token': auth_token}
def _perform_login(self, mail_tel, password):
mail_tel_b64 = base64.b64encode(mail_tel.encode('ascii')).decode('ascii')
cached_auth_data = self.cache.load(self._NETRC_MACHINE, mail_tel_b64)
if cached_auth_data and (cache_timestamp := traverse_obj(cached_auth_data, ('timestamp', {int})) or 0):
# Cookies expire in 30 days
if cache_timestamp + 30 * 86400 > time_seconds():
self._AUTH_INFO = {
'mail_tel_b64': mail_tel_b64,
'auth_token': traverse_obj(cached_auth_data, ('auth_token', {str})),
'auth_cookies': traverse_obj(cached_auth_data, ('auth_cookies', {dict})),
}
return
if auth_token := self._get_token_with_cookies():
# For valid cookies
self._AUTH_INFO = {'auth_token': auth_token}
return
login_url = urljoin(
self._AUTH_BASE_URL, self._search_regex(
r'<form[^>]+action=(["\'])(?P<url>/login/redirector.+?)\1',
self._download_webpage(
self._LOGIN_API, None, note='Getting login url', errnote='Unable to get login url'),
name='login url', group='url'))
webpage, urlh = self._download_webpage_handle(
login_url, None, note='Logging in', errnote='Unable to log in', expected_status=404,
data=urlencode_postdata({
'mail_tel': mail_tel,
'password': password,
'auth_id': 42,
}), headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Referer': 'https://auth.sheeta.com/',
})
if urlh.url.startswith('https://account.nicovideo.jp/login'):
self.report_warning('Unable to log in: bad email address or password')
return
elif urlh.url.startswith('https://account.nicovideo.jp/mfa'):
webpage, urlh = self._download_webpage_handle(
urljoin(self._AUTH_BASE_URL, self._search_regex(
r'<form[^>]+action=(["\'])(?P<url>.+?)\1', webpage, 'mfa post url', group='url')),
None, expected_status=404, note='Performing MFA', errnote='Unable to complete MFA',
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Referer': self._AUTH_BASE_URL,
},
data=urlencode_postdata({
'otp': self._get_tfa_info('6 digits code')
}))
if 'oneTimePw' in webpage or 'formError' in webpage:
err_msg = clean_html(self._html_search_regex(
r'formError"[^>]*>(.*?)</div>', webpage, 'form_error',
default='There\'s an error but the message can\'t be parsed.',
flags=re.DOTALL))
self.report_warning(f'Unable to log in: MFA challenge failed, "{err_msg}"')
return
if not (auth_token := self._get_token_with_cookies()):
self.report_warning('Unable to get token after login')
return
common_info = {
'auth_token': auth_token,
'auth_cookies': dict(traverse_obj(
self.cookiejar.get_cookies_for_url(self._TOKEN_REFRESH_API), (..., {lambda item: (item.name, item.value)}))),
}
self.cache.store(self._NETRC_MACHINE, mail_tel_b64, {
**common_info,
'timestamp': int(time_seconds()),
})
self._AUTH_INFO = {'mail_tel_b64': mail_tel_b64, **common_info}
def _real_extract(self, url):
content_code, channel_id = self._match_valid_url(url).group('code', 'channel')
fanclub_site_id = self._find_fanclub_site_id(channel_id)
@ -107,6 +249,9 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
note='Fetching video page info', errnote='Unable to fetch video page info',
)['data']['video_page']
# The token for getting sessions expires in 5 minutes
self._refresh_token(content_code)
live_status, session_id = self._get_live_status_and_session_id(content_code, data_json)
release_timestamp_str = data_json.get('live_scheduled_start_at')
@ -119,7 +264,7 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
else:
msg = 'This event has not started yet'
self.raise_no_formats(msg, expected=True, video_id=content_code)
else:
elif session_id:
formats = self._extract_m3u8_formats(
# "authenticated_url" is a format string that contains "{session_id}".
m3u8_url=data_json['video_stream']['authenticated_url'].format(session_id=session_id),
@ -132,7 +277,7 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
'channel': self._get_channel_base_info(fanclub_site_id).get('fanclub_site_name'),
'channel_id': channel_id,
'channel_url': f'{self._WEBPAGE_BASE_URL}/{channel_id}',
'age_limit': traverse_obj(self._get_channel_user_info(fanclub_site_id), ('content_provider', 'age_limit')),
'age_limit': traverse_obj(self._get_channel_user_info(fanclub_site_id), ('content_provider', 'age_limit', {int_or_none})),
'live_status': live_status,
'release_timestamp': unified_timestamp(release_timestamp_str),
**traverse_obj(data_json, {
@ -207,25 +352,41 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
video_allow_dvr_flg = traverse_obj(data_json, ('video', 'allow_dvr_flg'))
video_convert_to_vod_flg = traverse_obj(data_json, ('video', 'convert_to_vod_flg'))
self.write_debug(f'allow_dvr_flg = {video_allow_dvr_flg}, convert_to_vod_flg = {video_convert_to_vod_flg}.')
self.write_debug(f'{content_code}: allow_dvr_flg = {video_allow_dvr_flg}, convert_to_vod_flg = {video_convert_to_vod_flg}.')
if not (video_allow_dvr_flg and video_convert_to_vod_flg):
raise ExtractorError(
'Live was ended, there is no video for download.', video_id=content_code, expected=True)
else:
raise ExtractorError(f'Unknown type: {video_type}', video_id=content_code, expected=False)
raise ExtractorError(f'Unknown type: {video_type!r}', video_id=content_code)
self.write_debug(f'{content_code}: video_type={video_type}, live_status={live_status}')
session_id = self._call_api(
f'video_pages/{content_code}/session_ids', item_id=f'{content_code}/session',
data=json.dumps(payload).encode('ascii'), headers={
session_id = None
try:
headers = {
'Content-Type': 'application/json',
'fc_use_device': 'null',
'origin': 'https://nicochannel.jp',
},
note='Getting session id', errnote='Unable to get session id',
)['data']['session_id']
'Authorization': traverse_obj(self._AUTH_INFO, ('auth_token', {str})),
}
session_id = self._call_api(
f'video_pages/{content_code}/session_ids', item_id=f'{content_code}/session',
data=json.dumps(payload).encode('ascii'), headers=filter_dict(headers),
note='Getting session id', errnote='Unable to get session id',
)['data']['session_id']
except ExtractorError as e:
if not isinstance(e.cause, HTTPError) or e.cause.status not in (401, 403, 408):
raise e
if mail_tel_b64 := traverse_obj(self._AUTH_INFO, ('mail_tel_b64', {str})):
self.cache.store(self._NETRC_MACHINE, mail_tel_b64, None)
self.raise_login_required(
msg={
401: 'members only content',
403: 'login required',
408: 'outdated cached token',
}[e.cause.status], metadata_available=True)
return live_status, session_id
@ -244,7 +405,7 @@ class NiconicoChannelPlusChannelBaseIE(NiconicoChannelPlusBaseIE):
note=f'Getting channel info (page {page + 1})',
errnote=f'Unable to get channel info (page {page + 1})')
for content_code in traverse_obj(response, ('data', 'video_pages', 'list', ..., 'content_code')):
for content_code in traverse_obj(response, ('data', 'video_pages', 'list', ..., 'content_code', {str_or_none})):
# "video/{content_code}" works for both VOD and live, but "live/{content_code}" doesn't work for VOD
yield self.url_result(
f'{self._WEBPAGE_BASE_URL}/{channel_name}/video/{content_code}', NiconicoChannelPlusIE)
@ -418,9 +579,6 @@ class NiconicoChannelPlusChannelLivesIE(NiconicoChannelPlusChannelBaseIE):
OnDemandPagedList(
functools.partial(
self._fetch_paged_channel_video_list, f'fanclub_sites/{fanclub_site_id}/live_pages',
{
'live_type': 4,
},
channel_id, f'{channel_id}/lives'),
{'live_type': 4}, channel_id, f'{channel_id}/lives'),
self._PAGE_SIZE),
playlist_id=f'{channel_id}-lives', playlist_title=f'{channel_name}-lives')

Loading…
Cancel
Save