Skip to content

Commit

Permalink
🏡 add filter devices via home (#862)
Browse files Browse the repository at this point in the history
  • Loading branch information
al-one committed Sep 17, 2024
1 parent efb179e commit f7988a8
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 138 deletions.
298 changes: 164 additions & 134 deletions custom_components/xiaomi_miot/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import requests
import voluptuous as vol

from typing import Optional
from homeassistant import config_entries
from homeassistant.const import (
CONF_HOST,
Expand Down Expand Up @@ -111,122 +112,141 @@ async def check_miio_device(hass, user_input, errors):
return user_input


async def check_xiaomi_account(hass, user_input, errors, renew_devices=False):
dvs = []
mic = None
try:
mic = await MiotCloud.from_token(hass, user_input, login=False)
mic.login_times = 0
await mic.async_login(captcha=user_input.get('captcha'))
if not await mic.async_check_auth(False):
raise MiCloudException('Login failed')
user_input['xiaomi_cloud'] = mic
dvs = await mic.async_get_devices(renew=renew_devices) or []
if renew_devices:
await MiotSpec.async_get_model_type(hass, 'xiaomi.miot.auto', use_remote=True)
except (MiCloudException, MiCloudAccessDenied, Exception) as exc:
err = f'{exc}'
errors['base'] = 'cannot_login'
if isinstance(exc, MiCloudAccessDenied) and mic:
if url := mic.attrs.pop('notificationUrl', None):
err = f'The login of Xiaomi account needs security verification. [Click here]({url}) to continue!\n' \
f'本次登录小米账号需要安全验证,[点击这里]({url})继续!你需要在与HA宿主机同局域网的设备下完成安全验证,' \
'如果你使用的是云服务器,将无法验证通过。'
persistent_notification.create(
hass,
err,
f'Login to Xiaomi: {mic.username}',
f'{DOMAIN}-login',
)
elif url := mic.attrs.pop('captchaImg', None):
err = f'Captcha:\n![captcha](data:image/jpeg;base64,{url})'
user_input['xiaomi_cloud'] = mic
user_input['captchaIck'] = mic.attrs.get('captchaIck')
if isinstance(exc, requests.exceptions.ConnectionError):
errors['base'] = 'cannot_reach'
elif 'ZoneInfoNotFoundError' in err:
errors['base'] = 'tzinfo_error'
hass.data[DOMAIN]['placeholders'] = {'tip': f'⚠️ {err}'}
unm = mic.username if mic else user_input.get(CONF_USERNAME)
_LOGGER.error('Setup xiaomi cloud for user: %s failed: %s', unm, exc)
if not errors:
user_input['devices'] = dvs
persistent_notification.dismiss(hass, f'{DOMAIN}-login')
return user_input


async def get_cloud_filter_schema(hass, user_input, errors, schema=None, via_did=False):
if not schema:
schema = vol.Schema({})
dvs = user_input.get('devices') or []
if not dvs:
errors['base'] = 'none_devices'
else:
grp = {}
vls = {}
fls = ['did'] if via_did else ['model', 'home_id', 'ssid', 'bssid']
for d in dvs:
class BaseFlowHandler(config_entries.ConfigEntryBaseFlow):
cloud: MiotCloud = None
devices: Optional[list] = None

async def get_cloud(self, user_input):
if not self.cloud:
self.cloud = await MiotCloud.from_token(self.hass, user_input, login=False)
self.cloud.login_times = 0
if not await self.cloud.async_check_auth(False):
raise MiCloudException('Login failed')
if captcha := user_input.get('captcha'):
await self.cloud.async_login(captcha=captcha)
return self.cloud

async def check_xiaomi_account(self, user_input, errors, renew_devices=False):
dvs = []
mic = None
try:
mic = await self.get_cloud(user_input)
dvs = await mic.async_get_devices(renew=renew_devices) or []
if renew_devices:
await MiotSpec.async_get_model_type(self.hass, 'xiaomi.miot.auto', use_remote=True)
self.context.pop('captchaIck', None)
except (MiCloudException, MiCloudAccessDenied, Exception) as exc:
err = f'{exc}'
errors['base'] = 'cannot_login'
if isinstance(exc, MiCloudAccessDenied) and mic:
if url := mic.attrs.pop('notificationUrl', None):
err = f'The login of Xiaomi account needs security verification. [Click here]({url}) to continue!\n' \
f'本次登录小米账号需要安全验证,[点击这里]({url})继续!你需要在与HA宿主机同局域网的设备下完成安全验证,' \
'如果你的HA部署在云服务器,可能将无法验证通过。'
persistent_notification.create(
self.hass,
err,
f'Login to Xiaomi: {mic.username}',
f'{DOMAIN}-login',
)
elif url := mic.attrs.pop('captchaImg', None):
err = f'Captcha:\n![captcha](data:image/jpeg;base64,{url})'
self.context['captchaIck'] = mic.attrs.get('captchaIck')
if isinstance(exc, requests.exceptions.ConnectionError):
errors['base'] = 'cannot_reach'
elif 'ZoneInfoNotFoundError' in err:
errors['base'] = 'tzinfo_error'
self.hass.data[DOMAIN]['placeholders'] = {'tip': f'⚠️ {err}'}
unm = mic.username if mic else user_input.get(CONF_USERNAME)
_LOGGER.error('Setup xiaomi cloud for user: %s failed: %s', unm, exc)
if not errors:
self.devices = dvs
persistent_notification.dismiss(self.hass, f'{DOMAIN}-login')
return user_input

async def get_cloud_filter_schema(self, user_input, errors, schema=None, via_did=False, home_ids=None):
if not schema:
schema = vol.Schema({})
dvs = self.devices or []
if not dvs:
errors['base'] = 'none_devices'
else:
grp = {}
vls = {}
homes = {}
fls = ['did'] if via_did else ['model', 'home_id', 'ssid', 'bssid']
for f in fls:
v = d.get(f)
if v is None:
continue
grp.setdefault(v, 0)
grp[v] += 1
vls.setdefault(f, {})
dnm = f'{d.get("name")}'
des = '<empty>' if v == '' else v
if f == 'home_id':
des = d.get('home_name') or des
if f in ['did']:
if MiotCloud.is_hide(d):
for d in dvs:
v = d.get(f)
if v is None:
continue
dip = d.get('localip')
if not dip or d.get('pid') not in [0, '0', '8', '', None]:
dip = d.get('model')
vls[f][v] = f'{dnm} ({dip})'
elif f in ['model']:
if grp[v] > 1:
dnm += f' * {grp[v]}'
vls[f][v] = f'{des} ({dnm})'
else:
vls[f][v] = f'{des} ({grp[v]})'
ies = {
'exclude': 'Exclude (排除)',
'include': 'Include (包含)',
}
for f in fls:
if not vls.get(f):
continue
fk = f'filter_{f}'
fl = f'{f}_list'
lst = vls.get(f, {})
lst = dict(sorted(lst.items()))
ols = [
v
for v in user_input.get(fl, [])
if v in lst
]
schema = schema.extend({
vol.Required(fk, default=user_input.get(fk, 'exclude')): vol.In(ies),
vol.Optional(fl, default=ols): cv.multi_select(lst),
})
hass.data[DOMAIN]['prev_input'] = user_input
tip = ''
if user_input.get(CONF_CONN_MODE) == 'local':
url = 'https://github.com/al-one/hass-xiaomi-miot/issues/100#issuecomment-855183156'
if user_input.get(CONF_SERVER_COUNTRY) == 'cn':
tip = '⚠️ 在本地模式下,所有包含的设备都将通过本地miot协议连接,如果包含了不支持本地miot协议的设备,其实体会不可用,' \
f'建议只选择[支持本地模式的设备]({url})。'
else:
tip = '⚠️ In the local mode, all included devices will be connected via the local miot protocol.' \
'If the devices that does not support the local miot protocol are included,' \
'they will be unavailable. It is recommended to include only ' \
f'[the devices that supports the local mode]({url}).'
hass.data[DOMAIN]['placeholders'] = {'tip': tip}
return schema
if home_id := d.get('home_id', 0):
homes.setdefault(home_id, d.get('home_name') or 'Default Home')
if f in ['did'] and v in user_input.get(f'{f}_list', []):
pass
elif home_ids and home_id not in home_ids:
continue
grp.setdefault(v, 0)
grp[v] += 1
vls.setdefault(f, {})
dnm = f'{d.get("name")}'
des = '<empty>' if v == '' else v
if f == 'home_id':
des = d.get('home_name') or des
if f in ['did']:
if MiotCloud.is_hide(d):
continue
dip = d.get('localip')
if not dip or d.get('pid') not in [0, '0', '8', '', None]:
dip = d.get('model')
vls[f][v] = f'{dnm} ({dip})'
elif f in ['model']:
if grp[v] > 1:
dnm += f' * {grp[v]}'
vls[f][v] = f'{des} ({dnm})'
else:
vls[f][v] = f'{des} ({grp[v]})'
ies = {
'exclude': 'Exclude (排除)',
'include': 'Include (包含)',
}
for f in fls:
if not vls.get(f):
continue
fk = f'filter_{f}'
fl = f'{f}_list'
lst = vls.get(f, {})
lst = dict(sorted(lst.items()))
ols = [
v
for v in user_input.get(fl, [])
if v in lst
]
schema = schema.extend({
vol.Required(fk, default=user_input.get(fk, 'exclude')): vol.In(ies),
vol.Optional(fl, default=ols): cv.multi_select(lst),
})
if via_did and homes:
schema = schema.extend({
vol.Optional('home_ids', default=[]): cv.multi_select(homes),
})
self.hass.data[DOMAIN]['prev_input'] = user_input
tip = ''
if user_input.get(CONF_CONN_MODE) == 'local':
url = 'https://github.com/al-one/hass-xiaomi-miot/issues/100#issuecomment-855183156'
if user_input.get(CONF_SERVER_COUNTRY) == 'cn':
tip = '⚠️ 在本地模式下,所有包含的设备都将通过本地miot协议连接,如果包含了不支持本地miot协议的设备,其实体会不可用,' \
f'建议只选择[支持本地模式的设备]({url})。'
else:
tip = '⚠️ In the local mode, all included devices will be connected via the local miot protocol.' \
'If the devices that does not support the local miot protocol are included,' \
'they will be unavailable. It is recommended to include only ' \
f'[the devices that supports the local mode]({url}).'
self.hass.data[DOMAIN]['placeholders'] = {'tip': tip}
return schema


class XiaomiMiotFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
class XiaomiMiotFlowHandler(config_entries.ConfigFlow, BaseFlowHandler, domain=DOMAIN):
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL

@staticmethod
Expand Down Expand Up @@ -305,11 +325,12 @@ async def async_step_cloud(self, user_input=None):
if user_input is None:
user_input = {}
else:
await check_xiaomi_account(self.hass, user_input, errors, renew_devices=True)
await self.check_xiaomi_account(user_input, errors, renew_devices=True)
if not errors:
user_input['filtering'] = True
return await self.async_step_cloud_filter(user_input)
schema = {}
if user_input.get('captchaIck'):
if self.context.get('captchaIck'):
schema.update({
vol.Required('captcha', default=''): str,
})
Expand All @@ -334,15 +355,16 @@ async def async_step_cloud_filter(self, user_input=None):
schema = vol.Schema({})
if user_input is None:
user_input = {}
if 'devices' in user_input:
via_did = not user_input.get('filter_models')
schema = await get_cloud_filter_schema(self.hass, user_input, errors, schema, via_did=via_did)
via_did = not user_input.get('filter_models')
home_ids = user_input.pop('home_ids', [])
if user_input.get('filtering') or home_ids:
schema = await self.get_cloud_filter_schema(user_input, errors, schema, via_did=via_did, home_ids=home_ids)
elif 'prev_input' in self.hass.data[DOMAIN]:
prev_input = self.hass.data[DOMAIN].pop('prev_input', None) or {}
cfg = prev_input['xiaomi_cloud'].to_config() or {}
cfg = self.cloud.to_config() or {}
cfg.update({
CONF_CONN_MODE: prev_input.get(CONF_CONN_MODE),
**(user_input or {}),
**user_input,
})
cfg[CONF_CONFIG_VERSION] = ENTRY_VERSION
_LOGGER.debug('Setup xiaomi cloud: %s', {**cfg, CONF_PASSWORD: '*', 'service_token': '*'})
Expand Down Expand Up @@ -548,10 +570,17 @@ async def async_step_customizing(self, user_input=None):
)


class OptionsFlowHandler(config_entries.OptionsFlow):
class OptionsFlowHandler(config_entries.OptionsFlow, BaseFlowHandler):
def __init__(self, config_entry: config_entries.ConfigEntry):
self.config_entry = config_entry

@property
def saved_config(self):
return {
**self.config_entry.data,
**self.config_entry.options,
}

async def async_step_init(self, user_input=None):
data = self.config_entry.data
if CONF_USERNAME in data:
Expand Down Expand Up @@ -599,27 +628,24 @@ async def async_step_user(self, user_input=None):

async def async_step_cloud(self, user_input=None):
errors = {}
prev_input = {
**self.config_entry.data,
**self.config_entry.options,
}
prev_input = self.saved_config
if isinstance(user_input, dict):
user_input = {
**self.config_entry.data,
**self.config_entry.options,
**prev_input,
**user_input,
}
renew = not not user_input.pop('renew_devices', False)
await check_xiaomi_account(self.hass, user_input, errors, renew_devices=renew)
await self.check_xiaomi_account(user_input, errors, renew_devices=renew)
if not errors:
user_input['filter_models'] = prev_input.get('filter_models') and True
if prev_input.get('filter_model'):
user_input['filter_models'] = True
user_input['filtering'] = True
return await self.async_step_cloud_filter(user_input)
else:
user_input = prev_input
schema = {}
if user_input.get('captchaIck'):
if self.context.get('captchaIck'):
schema.update({
vol.Required('captcha', default=''): str,
})
Expand All @@ -646,18 +672,22 @@ async def async_step_cloud_filter(self, user_input=None):
schema = vol.Schema({})
if user_input is None:
user_input = {}
if 'devices' in user_input:
user_input = {**self.config_entry.data, **self.config_entry.options, **user_input}
via_did = not user_input.get('filter_models')
schema = await get_cloud_filter_schema(self.hass, user_input, errors, schema, via_did=via_did)
via_did = not self.saved_config.get('filter_models')
home_ids = user_input.pop('home_ids', [])
if user_input.get('filtering') or home_ids:
user_input = {
**self.saved_config,
**user_input,
}
schema = await self.get_cloud_filter_schema(user_input, errors, schema, via_did=via_did, home_ids=home_ids)
elif 'prev_input' in self.hass.data[DOMAIN]:
prev_input = self.hass.data[DOMAIN].pop('prev_input', None) or {}
cfg = prev_input['xiaomi_cloud'].to_config() or {}
cfg = self.cloud.to_config() or {}
cfg.update({
CONF_CONN_MODE: prev_input.get(CONF_CONN_MODE),
'disable_message': prev_input.get('disable_message'),
'disable_scene_history': prev_input.get('disable_scene_history'),
**(user_input or {}),
**user_input,
})
self.hass.config_entries.async_update_entry(
self.config_entry, data={**self.config_entry.data, **cfg}
Expand Down
6 changes: 4 additions & 2 deletions custom_components/xiaomi_miot/translations/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
"filter_bssid": "Filter WiFi BSSID",
"bssid_list": "WiFi BSSID List",
"filter_did": "Filter Device",
"did_list": "Device List"
"did_list": "Device List",
"home_ids": "Filter Home (Leave blank to save)"
}
},
"customizing": {
Expand Down Expand Up @@ -171,7 +172,8 @@
"filter_bssid": "Filter WiFi BSSID",
"bssid_list": "WiFi BSSID List",
"filter_did": "Filter Device",
"did_list": "Device List"
"did_list": "Device List",
"home_ids": "Filter Home (Leave blank to save)"
}
}
},
Expand Down
Loading

1 comment on commit f7988a8

@al-one
Copy link
Owner Author

@al-one al-one commented on f7988a8 Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please # to comment.