-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Identifier rejected on 55U7KQ #14
Comments
i believe our issues my be linked however i cannot use the vidaa app only the remote now app. it is also only able to control volume now and wont do anything with a payload (changesource, launchapp etc) i am however abe to connect with mqtt explorer just check you have entered the certificates correctly for client cert and client key in mqtt explorer. other than that my knowledge is limited lol |
Having the same issue on a 55U8KQ. I'm pretty sure Hisense changed the way MQTT auth works with newer models (those supporting only the Vidaa App). I think this might be beyond the scope of what @Krazy998 is doing here, but here's what I could find so far:
This seems to indicate that the authentication process is much more complex now and I have no idea where to start with this. Anyways, I'll attach the database so you can have a look at them: |
Thank you for sharing @LeoKlaus - I was going to suggest wireshark however now that hisense is using TLS it would have been difficult. But worth a initial try. I think its important to establish if the new Vidaa app actually is using a local connect to the tv or is it using some broker or service on the internet. Wireshark should at least tell us that hopefully. The only other suggestion is to create a serial cable which can TTL directly to the TV and pull the config / app files for analysis. Im not sure how the new 55U8KQ TV's are, but my older 75" has a headphone jack (there is a service jack on the back of the tv) which effectively can connect to a usbTTL device. I haven't done this in a while however - When the TV is booting you can interrupt its boot and get local access. I found I could only a small portion of the filesystem is set to read only. There is a script that is run as root that kicks off the update check where you can modify it to also launch a telnet server with root access. This will give you ability to pull the code off the device (if it hasnt been patched already on newer devices). |
Just want to chime in... I battled with this for quite some time before giving up. I own a recently purchased (Oct 2023) 65E7KQ PRO which only connects via the VIDAA app. I can pretty much repeat everything listed above. I ended up buying a broadlink IR blaster as all I cared about was ON / OFF. I'll add another small (annoying) nugget : With this set, wifi / network connection is not established until after the TV has been powered on at least once, regardless of what you set in the menus etc. Pretty annoying for those of us who power socket the set OFF for the night (linked to my home alarm state etc...)... |
From what I recall when I was trying to overcome all of this after the initial handshake (eg: the TV was listed in the app), I was able to control it even when I firewalled it off from WAN access. 99% sure, but someone should verify, as I overcame most of what I wanted to do with a broadlink IR blaster and gave up...
|
The whole setup process and control does work without the TV having internet access. I've tried capturing and decrypting the traffic from the app to the TV, but haven't been able to do so for lack of a rootable Android device. On both Android and iOS, the app ignores the system proxy configuration and uses certificate pinning, so a simple mitm doesn't work. On Android, I've been able to at least capture the MQTT traffic (still on port 36669) using PCAPdroid. The app itself uses Baidu protect. This (among other things) prevents you from running with USB debugging enabled. If someone has a rooted Android device and a recent Hisense TV and wants to try getting this to work, the following might help: If that works and the TV can still be controlled, you should be able to capture and decrypt the traffic using PCAPdroid with the MITM add-on (as far as I could tell, all the commands are still sent on port 36669). It would be really cool to get this to work, but decompilation (and to some extent reverse engineering) is not exactly legal here in Germany, so even if I managed to break into the app, I couldn't post my findings. |
I have a brand new A6K 2023 model with the latest version of the Vidaa protocol. If you try a MITM (unpinning doesn't seem to work), the TV drops the connection. However, I was able to see what the initial connection message was including the username, password and client ID and I was able to connect for a limited amount of time using this information using the MQTT Explorer. The "multiscreen123" is the password for the keystore, so this is why I think this might be the case. However, it could be a key hardcoded into the java, which we can't see as it won't decompile. The "passNewword ===[C@d0eef3f" I guess is the "10121AEA780F5D61B6BD93C202F0A6AE", but they didn't convert the byte array to hex for debugging. Shame because it would make testing a lot easier than having to use a rooted phone with PCAPdroid |
Very interesting findings. I'm really surprised you were able to use Logcat, as I remember seeing something along the lines of
(don't remember the exact implementation but it was very clear what it was supposed to do) in the decompiled code. I didn't quite understand what information let you connect, though. The password you used to connect was The key takeway (for me) here is that, even if somebody managed to reverse engineer or find the algorithm/key they use to determine the password, the whole timestamp thing would make it a giant pain in the ass to integrate this into a smart home system. |
Yes, that's the plain text password, hex values. vs the older way |
The timestamp has been put in to stop it being just hardcoded and used on any TV. Hisense obviously don't want people to be able to control their own TV! |
Looks like they work for about 4 hours in each direction. I just captured these details with my clock set to 4 hours time. I can confirm these are working with my Hisense. If anybody wants to try them in the next 8 hours or so just to prove that the TV details aren't involved. You'll notice that even once it's expired that the error in MQTT explorer is no longer "invalid client ID" but "Not authorised" |
I keep getting "Disconnected from server" within MQTTExplorer. Tried your MAC, and both the "private" one and the real MAC of my iPhone. This is really interesting though. If what you assume is true, this implementation does not only suck ass for us users but also from a security-standpoint. |
What certs do you have setup? I get that error message of the certificate is added, |
None, actually. I just got a new Mac and didn't carry over any of the MQTTExplorer settings. I just installed it. |
Okay, I've used the certs from d3nd3/Hisense-mqtt-keyfiles.
|
Excellent. Saves me having to get out of bed, go to my local TV shop and loiter around in the carpark with my laptop! We now know that there's no "extra" info put into the password or the client ID from the TV itself. If we can correctly generate the password to match the clientID and username we can connect to any TV/ |
Hey @chimpzilla i also own the same model (A6K) and i would be happy to help as much as i can. |
No luck, sorry. It's pretty hardened to trying work out what's going on. |
I think only way would be to get root access to the tv via serial console (if that is still possible) and examine the code. Clearly hisense has gone out of its way to limit the ability to manage their latest TV's. On a sidenote - I have root access to my old 3 year hisense and I have examined their logs and various other things. I can tell you they send logs to unified-ter-na.hismarttv.com which include things like what is playing on the tv (using data from the tv guide) what source is selected. |
I have dug up a little bit more and by using logcat i found a lot of interesting details.
There is much more in the logcat, just not sure it is useful. i'll keep digging |
I've found out a lot more information including how to generate the password for pairing. However I am stuck on the last step of pairing, hopefully somebody can help with this, as I've ran out of ideas. So this is how the password and client ID are generated, they are a series of MD5 hashes based on some fixed data, the current unix timestamp and the connecting device's mac address . You can use any mac address, as the TV does not verify this. You can use this online tool to generate this info yourself: These are the series of hashes you need to use: These are the results How you get the hashes: username his$1701415028 (unix timestamp) Once you connect in using these login details, you send the following topics, subscribes. I could get most of it to work by either sending the raw TCP data or using the Android Paho MQTT client (this is what the Vidaa app uses). The topic contains the MQTT clientID publish topic :/remoteapp/tv/ui_service/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/actions/vidaa_app_connect subscribe: /remoteapp/mobile/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/ui_service/data/authentication publish topic :/remoteapp/tv/ui_service/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/actions/authenticationcode subscribe: /remoteapp/mobile/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/ui_service/data/authenticationcode or if incorrect pin publish topic:/remoteapp/tv/platform_service/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/data/gettoken publish topic:/remoteapp/tv/ui_service/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/actions/authenticationcodeclose subscribe: /remoteapp/mobile/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/platform_service/data/tokenissuance I can get all of this to work, including the "{"result":1,"info":""} message back from the TV, which confirms the pin is correct. But....... the TV never replies with the access token. I can see from the logcat from the Vidaa app that should happen. |
You're an absolute madlad! I'll later try to reproduce this with my unit and see how it goes.
It's insane. Hisense could've literally spent less time just providing some official MQTT integration/documentation and turned this utter shitshow into a great unique selling point for their hardware (be it for a very small target audience). |
Update Here's a walkthrough of what was sent and received via the Windows MQTTX client. Received messages in bold Topic: /remoteapp/mobile/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/ui_service/data/authentication Topic: /remoteapp/tv/ui_service/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/actions/authenticationcode Topic: /remoteapp/mobile/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/ui_service/data/authenticationcode Topic: /remoteapp/mobile/broadcast/ui_service/data/hotelmodechange Topic: /remoteapp/tv/platform_service/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/data/gettoken Topic: /remoteapp/mobile/C0:BD:D1:3D:6E:3E$his$44DE1F_vidaacommon_001/platform_service/data/tokenissuance |
I just tried again in MQTTX and can't get it to respond. I've tried using fresh login details, but nothing comes back from the TV (the pin is displayed on the TV, so it's connected and the messages are sent correctly). |
you are amazing @chimpzilla! |
Did MQTT Explorer connect? |
it did yes but that was the only thing it was doing. no publishing, no subscribing. |
Mine won't connect unless it's TLS enabled. If you post your connection details I can check they are valid? |
I tried connecting now with your credentials and the connection keeps dropping in mqttx (although it connects). |
Hisense sent another firmware file. I'm now running V0007.06.12R.N0508. MQTT Explorer works as does the RemoteNOW app. I am now having trouble getting the https://github.com/sehaas/ha_hisense_tv integration to work. I just get the spinning wheel and the tv does not bring up a code. This is so frustrating. |
Could you share the firmware file? |
The link will be valid for 7 days: |
Has anybody had any luck getting the picture setting info or channel list from the new firmware? and you would get a list of the current TV channels the TV has. Also older versions would have the following: Publish But I can't get this to work or figure out what the subscribe would be for it to work? |
EDIT: RESOLVED Hey there, I encountered this thread two weeks ago, and managed to control the TV through Home Assistant sensors, switches and number entities using some rudimentary manual YAML configuration. But, as of a few days ago the sendkey topic seems to have stopped working, can anyone confirm that? The topic is this one (excluding my client id): Tested with payloads Note that every other topic in the spreadsheet posted by LeoKlaus still works, the sendkey topic used to work until a few days ago and that I'm using two different clients (with different client ids) to control the TV (which also worked fine). |
Did your TV perform a firmware update by chance? Maybe they changed something. |
It's working again, but I can only guess what the problem was. Maybe I was just stupid somewhere. One of my two access tokens expired a few hours ago, that might be related. Interestingly though, all the other topics kept working on both clients and the sendkey topic stopped working on both clients. |
Hi, Ive tried the above python routine to generate a access token and at first I was joyed as it connected from MQTT Explorer. However, if I publish events they dont seem to action on the TV. I have also tried the same configuration on Home Assistant, and it seems to like the configuration ( doesnt error ) but again it does not appear to be receiving any events or allowing me to publish simple events to the TV. Ive also tried in MQTTX which connects fine but gives the following error when I try to subscribe to anything "Failed to Subscribe TOPIC, Error: Not authorized(Code: 135). Make sure the permissions are correct, and check MQTT broker ACL configuration". I am running a brand new 2023 model - Hisense 55U8KQTUK. However, the scripts that were written on this thread appear to work fine and they are publishing and subscribing with no problem. Thanks Steve |
I have a Hisense 65E7KQ PRO, bought via Amazon DE in Dec 2023 with software V0002.07.50B.N0715. Thanks to this thread I can connect to it via MQTT. I slightly modified the two scripts provided, call refresh_token.py and it will either create a new set of credentials or refresh an existing one. The credentials will be saved in credentials.json. You will be asked for the PIN code shown on the TV if a fresh set is created. I confirmed I can control and view the TV state via MQTT Explorer. Some notes, in case they help someone:
Next steps (in the medium term) for me are to pretty up this code and try my hand at creating my first HASS integration or modifying an existing one. Not sure how to handle the client certs yet. Sadly you can't easily extract them from the APK. generate_token.py: import re, uuid
import hashlib
import time
import paho.mqtt.client as mqtt
import json
from pprint import pprint
reply = None
authentication_payload = None
authentication_code_payload = None
tokenissuance = None
topicTVUIBasepath = None
topicTVPlatformBasepath = None
topicMobileBasepath = None
def cross_sum(n):
r = 0
while n:
r, n = r + n % 10, n // 10
return r
def stringToHash(input: str):
result = hashlib.md5(input.encode("utf-8"))
return result.hexdigest().upper()
def on_connect(client, userdata, flags, rc):
global topicTVUIBasepath
global topicTVPlatformBasepath
global topicMobileBasepath
if rc == 0:
client.connected_flag=True #set flag
print("connected ok")
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("#")
client.subscribe(topicTVUIBasepath + "actions/vidaa_app_connect")
client.subscribe(topicMobileBasepath + 'ui_service/data/authentication')
client.subscribe(topicMobileBasepath + 'ui_service/data/authenticationcode')
client.subscribe("/remoteapp/mobile/broadcast/ui_service/data/hotelmodechange")
client.subscribe(topicMobileBasepath + 'platform_service/data/tokenissuance')
else:
print("Bad connection Returned code=",rc)
client.bad_connection_flag=True
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, message):
print("i'm on message")
global reply
print("message received " ,str(message.payload.decode("utf-8")))
print("message topic=",message.topic)
print("message qos=",message.qos)
print("message retain flag=",message.retain)
reply = message
def on_subscribe(client, userdata, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))
def on_publish(client, userdata, mid):
print("Published message " + str(mid))
def on_disconnect(client, userdata, rc):
print("disconnecting reason " +str(rc))
def on_log(client, userdata, level, buf):
print("log: ",buf)
def on_message_msgs(mosq, obj, msg):
print("MESSAGES: " + msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
def on_authentication(mosq, obj, msg):
global authentication_payload
authentication_payload = msg
def on_authentication_code(mosq, obj, msg):
global authentication_code_payload
authentication_code_payload = msg
def on_tokenissuance(mosq, obj, msg):
global tokenissuance
tokenissuance = msg
def write_token_to_creds_file():
global topicTVUIBasepath
global topicTVPlatformBasepath
global topicMobileBasepath
timestamp = int(time.time())
#timestamp = 1702583685
firstHash = stringToHash("&vidaa#^app")
mac = ':'.join(re.findall('..', '%012x' % uuid.getnode())).upper()
#mac = "C1:BD:D1:3D:6E:3E"
print(f'mac {mac}')
secondHash = stringToHash("38D65DC30F45109A369A86FCE866A85B$" + mac)
lastDigitOfCrossSum = cross_sum(timestamp)%10
thirdHash = stringToHash("his"+ str(lastDigitOfCrossSum) +"h*i&s%e!r^v0i1c9")
fourthHash = stringToHash(str(timestamp) + "$" + thirdHash[:6])
print(firstHash)
print(secondHash)
print(thirdHash)
print(fourthHash)
clientID = mac + "$his$" + secondHash[:6] + "_vidaacommon_001"
client = mqtt.Client(client_id=clientID, clean_session=True, userdata=None, protocol=mqtt.MQTTv311, transport="tcp")
client.tls_set(ca_certs=None, certfile="./rcm_certchain_pem.cer", keyfile="./rcm_pem_privkey.pkcs8", cert_reqs=mqtt.ssl.CERT_NONE,
tls_version=mqtt.ssl.PROTOCOL_TLS, ciphers=None)
client.on_connect = on_connect
client.on_message = on_message
# client.on_subscribe = on_subscribe
client.on_publish = on_publish
client.on_disconnect = on_disconnect
# client.on_log = on_log
client.connected_flag=False
client.tls_insecure_set(True)
username = "his$" + str(timestamp)
print(f'username: {username}')
print(f'password: {fourthHash}')
print(f'client_id: {clientID}')
client.username_pw_set(username=username, password=fourthHash)
topicTVUIBasepath = "/remoteapp/tv/ui_service/" + clientID + "/"
topicTVPlatformBasepath = "/remoteapp/tv/platform_service/" + clientID + "/"
topicMobileBasepath = "/remoteapp/mobile/" + clientID + "/"
client.message_callback_add(topicMobileBasepath + 'ui_service/data/authentication' , on_authentication)
client.message_callback_add(topicMobileBasepath + 'ui_service/data/authenticationcode' , on_authentication_code)
client.message_callback_add('/remoteapp/mobile/broadcast/ui_service/data/hotelmodechange' , on_message_msgs)
client.message_callback_add(topicMobileBasepath + 'platform_service/data/tokenissuance' , on_tokenissuance)
client.connect_async("192.168.65.61", 36669, 60)
client.loop_start()
while not client.connected_flag: #wait in loop
print("In wait loop")
time.sleep(1)
print('publishing message to actions/vidaa_app_connect ...')
client.publish( topicTVUIBasepath + "actions/vidaa_app_connect", '{"app_version":2,"connect_result":0,"device_type":"Mobile App"}')
print(f'subscribing to {topicMobileBasepath}ui_service/data/authentication ...')
while authentication_payload is None:
time.sleep(0.1)
if authentication_payload.payload.decode() != '""' :
print('Problems with the authentication message!')
print(authentication_payload.payload)
print('Exiting...')
exit()
authNum = input("Enter the four digits displayed on your TV: ")
print(f'publishing message to {topicTVUIBasepath}actions/authenticationcode ...')
client.publish( topicTVUIBasepath + "actions/authenticationcode", '{"authNum":' + authNum + '}')
print(f'subscribing to {topicMobileBasepath}ui_service/data/authenticationcode ...')
client.subscribe(topicMobileBasepath + 'ui_service/data/authenticationcode')
while authentication_code_payload is None:
time.sleep(0.1)
print(authentication_code_payload.payload.decode())
if json.loads(authentication_code_payload.payload.decode()) != json.loads('{"result": 1,"info": ""}') :
print('Problems with the authentication message!')
print(authentication_code_payload.payload)
print('Exiting...')
exit()
print("Success! Getting access token...")
print(f'publishing message to {topicTVPlatformBasepath}data/gettoken ...')
client.publish( topicTVPlatformBasepath + "data/gettoken", '{"refreshtoken": ""}')
print(f'publishing message to {topicTVUIBasepath}actions/authenticationcodeclose ...')
client.publish( topicTVUIBasepath + "actions/authenticationcodeclose")
print(f'subscribing to /remoteapp/mobile/broadcast/ui_service/data/hotelmodechange ...')
client.subscribe('/remoteapp/mobile/broadcast/ui_service/data/hotelmodechange')
print(f'subscribing to {topicMobileBasepath}platform_service/data/tokenissuance ...')
client.subscribe(topicMobileBasepath + 'platform_service/data/tokenissuance')
while tokenissuance is None:
time.sleep(0.1)
t = tokenissuance.payload.decode()
t2 = json.loads(t)
t2['client_id'] = clientID
t2['username'] = username
t2['password'] = fourthHash
pprint(t2)
print('token issued...well done!')
json.dump(t2, open('credentials.json', 'w'))
print('credentials saved to credentials.json')
client.loop_stop()
client.disconnect()
if __name__ == "__main__":
write_token_to_creds_file() refresh_token.py: import paho.mqtt.client as mqtt
import time
import json
from generate_token import write_token_to_creds_file
tv_ip = "192.168.65.61"
certfile = "./rcm_certchain_pem.cer"
keyfile = "./rcm_pem_privkey.pkcs8"
def load_or_generate_creds(rec=False):
global oldCreds
try:
file = open('credentials.json')
except FileNotFoundError:
if not rec:
print('No stored credentials found, starting auth with TV...')
write_token_to_creds_file()
load_or_generate_creds(True)
else:
print('Unable to generate credentials.')
else:
with file:
oldCreds = json.load(file)
load_or_generate_creds()
refreshtoken = oldCreds["refreshtoken"]
client_id = oldCreds['client_id']
username = oldCreds['username']
password = oldCreds['password']
credentials = ""
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.connected_flag=True #set flag
print("connected ok")
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("#")
def on_message(client, userdata, message):
print("message received " ,str(message.payload.decode("utf-8")))
global credentials
credentials = json.loads(message.payload.decode("utf-8"))
def on_publish(client, userdata, mid):
print("Published message " + str(mid))
def on_disconnect(client, userdata, rc):
print("disconnecting reason " +str(rc))
def refresh_token():
client = mqtt.Client(client_id=client_id, clean_session=True, userdata=None, protocol=mqtt.MQTTv311, transport="tcp")
client.tls_set(ca_certs=None, certfile=certfile, keyfile=keyfile, cert_reqs=mqtt.ssl.CERT_NONE,
tls_version=mqtt.ssl.PROTOCOL_TLS, ciphers=None)
client.on_connect = on_connect
client.on_message = on_message
# client.on_subscribe = on_subscribe
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.connected_flag=False
client.tls_insecure_set(True)
client.username_pw_set(username=username, password=password)
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
while not client.connected_flag: #wait in loop
print("In wait loop")
time.sleep(1)
client.subscribe("/remoteapp/mobile/" + client_id + "/platform_service/data/tokenissuance")
client.publish("/remoteapp/tv/platform_service/" + client_id + "/data/gettoken", '{"refreshtoken": "' + refreshtoken + '"}')
while credentials == "":
print("waiting for refreshed credentials...")
time.sleep(1)
credentials['client_id'] = client_id
credentials['username'] = username
credentials['password'] = password
with open("credentials.json", "w") as file:
json.dump(credentials, file, indent= 4)
print("got new credentials, terminating...")
client.loop_stop()
client.disconnect()
return credentials["accesstoken"]
#exit()
if __name__ == "__main__":
refresh_token() |
Thats great thanks. I worked out what my problem was. I had been sending the password instead of the access key in the password field. Now got the same issue you mention in that I cannot subscribe using a wildcard #. Thats fine though as the spreadsheet you mention has all of the useful ones in. I found some more in the source code of this plugin. https://github.com/sehaas/ha_hisense_tv Thanks Steve |
Hi guys |
So I have been playing around with a script that initially gets the token etc and then one that runs every 12 hour to refresh it. I cant the refresh to work consistently. Sometimes it just fails to get a response. I'm not sure what's going on. I need to find more time to play around. Even if the scripts were sorted ( and I think they work for most ), someone would have to spend the time building it into some existing Hisense TV plugin in Home Assistant. As far as I know, no one has done that yet. Its definitely possible though. I just wish Hisense would make the TV a bit simpler to connect into Home Automation. Every other major manufacturer appears to be doing this. I cant even get DLNA working consistently. My 8 year old Sony TV supported this without issue. |
I have combined all scripts into one script (that could also be used for other home automation apps and manual scripts) and tried adding getting the state and power cycling the tv to it as well. I don't think anyone tried that using python from what I can see in the scripts? For me it doesn't work, any thoughts on what I am doing wrong here? import re
import uuid
import hashlib
import time
import json
import logging
import paho.mqtt.client as mqtt
from pprint import pprint
# Configuration
tv_ip = "192.168.178.134"
certfile = "./rcm_certchain_pem.cer"
keyfile = "./rcm_pem_privkey.pkcs8"
check_interval = 0.1
# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class TVAuthenticator:
def __init__(self):
self.reply = None
self.authentication_payload = None
self.authentication_code_payload = None
self.tokenissuance = None
self.credentials = None
self.saved_credentials = None
self.accesstoken = None
self.accesstoken_time = None
self.accesstoken_duration_day = None
self.refreshtoken = None
self.refreshtoken_time = None
self.refreshtoken_duration_day = None
self.client_id = None
self.username = None
self.password = None
self.topicTVUIBasepath = None
self.topicTVPSBasepath = None
self.topicMobiBasepath = None
self.topicBrcsBasepath = None
self.topicRemoBasepath = None
self.tv_state = None
@staticmethod
def cross_sum(n):
return sum(int(digit) for digit in str(n))
@staticmethod
def string_to_hash(input_str):
return hashlib.md5(input_str.encode("utf-8")).hexdigest().upper()
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
client.connected_flag = True
logging.info("Connected to MQTT broker")
client.subscribe([
(self.topicTVUIBasepath + "actions/vidaa_app_connect", 0),
(self.topicMobiBasepath + 'ui_service/data/authentication', 0),
(self.topicMobiBasepath + 'ui_service/data/authenticationcode', 0),
(self.topicBrcsBasepath + "ui_service/data/hotelmodechange", 0),
(self.topicMobiBasepath + 'platform_service/data/tokenissuance', 0),
(self.topicBrcsBasepath + 'ui_service/state', 0),
])
else:
logging.error(f"Bad connection. Returned code: {rc}")
client.bad_connection_flag = True
def on_message(self, client, userdata, msg):
logging.info(f"Message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.reply = msg
def on_subscribe(self, client, userdata, mid, granted_qos):
logging.info(f"Subscribed: {mid} {granted_qos}")
def on_publish(self, client, userdata, mid):
logging.info(f"Published message {mid}")
def on_disconnect(self, client, userdata, rc):
logging.info(f"Disconnected. Reason: {rc}")
def on_authentication(self, mosq, obj, msg):
logging.info(f"Authentication message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.authentication_payload = msg
def on_authentication_code(self, mosq, obj, msg):
logging.info(f"Authentication code message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.authentication_code_payload = msg
def on_tokenissuance(self, mosq, obj, msg):
logging.info(f"Token issuance message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.tokenissuance = msg
def on_tv_state(self, mosq, obj, msg):
logging.info(f"TV State message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.tv_state = msg.payload.decode('utf-8')
def wait_for_message(self, condition):
time.sleep(1) # Initial delay to prevent false negatives
while condition():
time.sleep(check_interval)
def create_mqtt_client(self, client_id, certfile, keyfile, username, password, userdata=None):
logging.info("Creating MQTT client...")
client = mqtt.Client(client_id=client_id, clean_session=True, userdata=userdata, protocol=mqtt.MQTTv311, transport="tcp")
client.tls_set(ca_certs=None, certfile=certfile, keyfile=keyfile, cert_reqs=mqtt.ssl.CERT_NONE, tls_version=mqtt.ssl.PROTOCOL_TLS)
client.tls_insecure_set(True)
client.username_pw_set(username=username, password=password)
# Attach event handlers
client.on_connect = self.on_connect
client.on_message = self.on_message
client.on_publish = self.on_publish
client.on_disconnect = self.on_disconnect
client.connected_flag = False
client.bad_connection_flag = False
return client
def refresh_token(self):
current_time = time.time()
expiration_time = int(self.accesstoken_time) + (int(self.accesstoken_duration_day) * 24 * 60 * 60)
if current_time <= expiration_time:
logging.info("Token still valid, no need to refresh")
time_diff = expiration_time - current_time
days = time_diff // (24 * 60 * 60)
hours = (time_diff % (24 * 60 * 60)) // (60 * 60)
minutes = (time_diff % (60 * 60)) // 60
seconds = time_diff % 60
logging.info(f"Token expires in {int(days)} days, {int(hours)} hours, {int(minutes)} minutes, and {int(seconds)} seconds")
logging.info(f"Token expires at {time.ctime(expiration_time)}")
return self.accesstoken
logging.info("Token not valid, refreshing the token")
client = self.create_mqtt_client(client_id=self.client_id, certfile=certfile, keyfile=keyfile, username=self.username, password=self.password)
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
while not client.connected_flag:
logging.info("Waiting for connection...")
time.sleep(1)
client.publish(f"/remoteapp/tv/platform_service/{self.client_id}/data/gettoken", json.dumps({"refreshtoken": self.refreshtoken}))
while self.credentials is None:
logging.info("Waiting for refreshed credentials...")
time.sleep(1)
self.credentials.update({"client_id": self.client_id, "username": self.username, "password": self.password})
with open("credentials.json", "w") as file:
json.dump(self.credentials, file, indent=4)
logging.info("Got new credentials, terminating...")
client.loop_stop()
client.disconnect()
return self.credentials["accesstoken"]
def write_token_to_creds_file(self):
timestamp = int(time.time())
mac = ':'.join(re.findall('..', '%012x' % uuid.getnode())).upper()
logging.info(f'MAC Address: {mac}')
first_hash = self.string_to_hash("&vidaa#^app")
second_hash = self.string_to_hash(f"38D65DC30F45109A369A86FCE866A85B${mac}")
last_digit_of_cross_sum = self.cross_sum(timestamp) % 10
third_hash = self.string_to_hash(f"his{last_digit_of_cross_sum}h*i&s%e!r^v0i1c9")
fourth_hash = self.string_to_hash(f"{timestamp}${third_hash[:6]}")
logging.info(f'First Hash: {first_hash}')
logging.info(f'Second Hash: {second_hash}')
logging.info(f'Third Hash: {third_hash}')
logging.info(f'Fourth Hash: {fourth_hash}')
client_id = f"{mac}$his${second_hash[:6]}_vidaacommon_001"
logging.info(f'Client ID: {client_id}')
self.topicTVUIBasepath = f"/remoteapp/tv/ui_service/{client_id}/"
self.topicTVPSBasepath = f"/remoteapp/tv/platform_service/{client_id}/"
self.topicMobiBasepath = f"/remoteapp/mobile/{client_id}/"
self.topicBrcsBasepath = f"/remoteapp/mobile/broadcast/"
self.topicRemoBasepath = f"/remoteapp/tv/remote_service/{client_id}/"
client = self.create_mqtt_client(client_id=client_id, certfile=certfile, keyfile=keyfile, username=f"his${timestamp}", password=fourth_hash)
client.message_callback_add(self.topicMobiBasepath + 'ui_service/data/authentication', self.on_authentication)
client.message_callback_add(self.topicMobiBasepath + 'ui_service/data/authenticationcode', self.on_authentication_code)
client.message_callback_add(self.topicBrcsBasepath + 'ui_service/data/hotelmodechange', self.on_message)
client.message_callback_add(self.topicMobiBasepath + 'platform_service/data/tokenissuance', self.on_tokenissuance)
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
self.wait_for_message(lambda: not client.connected_flag)
logging.info('Publishing message to actions/vidaa_app_connect...')
client.publish(self.topicTVUIBasepath + "actions/vidaa_app_connect", '{"app_version":2,"connect_result":0,"device_type":"Mobile App"}')
self.wait_for_message(lambda: self.authentication_payload is None)
if self.authentication_payload.payload.decode() != '""':
logging.error('Problems with the authentication message!')
logging.error(self.authentication_payload.payload)
return
logging.info(f'Subscribing to {self.topicMobiBasepath}ui_service/data/authenticationcode...')
client.subscribe(self.topicMobiBasepath + 'ui_service/data/authenticationcode')
authsuccess = False
while not authsuccess:
auth_num = input("Enter the four digits displayed on your TV: ")
client.publish(self.topicTVUIBasepath + "actions/authenticationcode", f'{{"authNum":{auth_num}}}')
self.wait_for_message(lambda: self.authentication_code_payload is None)
if json.loads(self.authentication_code_payload.payload.decode()) != {"result": 1, "info": ""}:
logging.error('Problems with the authentication message!')
logging.error(self.authentication_code_payload.payload)
else:
authsuccess = True
logging.info("Success! Getting access token...")
client.publish(self.topicTVPSBasepath + "data/gettoken", '{"refreshtoken": ""}')
client.publish(self.topicTVUIBasepath + "actions/authenticationcodeclose")
client.subscribe(self.topicBrcsBasepath + 'ui_service/data/hotelmodechange')
client.subscribe(self.topicMobiBasepath + 'platform_service/data/tokenissuance')
self.wait_for_message(lambda: self.tokenissuance is None)
token_data = json.loads(self.tokenissuance.payload.decode())
token_data.update({"client_id": client_id, "username": f"his${timestamp}", "password": fourth_hash})
pprint(token_data)
logging.info('Token issued successfully')
with open('credentials.json', 'w') as file:
json.dump(token_data, file)
logging.info('Credentials saved to credentials.json')
client.loop_stop()
client.disconnect()
def load_or_generate_creds(self, rec=False):
try:
with open('credentials.json', 'r') as file:
self.saved_credentials = json.load(file)
self.accesstoken = self.saved_credentials["accesstoken"]
self.accesstoken_time = self.saved_credentials["accesstoken_time"]
self.accesstoken_duration_day = self.saved_credentials["accesstoken_duration_day"]
self.refreshtoken = self.saved_credentials["refreshtoken"]
self.refreshtoken_time = self.saved_credentials["refreshtoken_time"]
self.refreshtoken_duration_day = self.saved_credentials["refreshtoken_duration_day"]
self.client_id = self.saved_credentials['client_id']
self.username = self.saved_credentials['username']
self.password = self.saved_credentials['password']
self.topicTVUIBasepath = f"/remoteapp/tv/ui_service/{self.client_id}/"
self.topicTVPSBasepath = f"/remoteapp/tv/platform_service/{self.client_id}/"
self.topicMobiBasepath = f"/remoteapp/mobile/{self.client_id}/"
self.topicBrcsBasepath = f"/remoteapp/mobile/broadcast/"
self.topicRemoBasepath = f"/remoteapp/tv/remote_service/{self.client_id}/"
except FileNotFoundError:
if not rec:
logging.info('No stored credentials found, starting auth with TV...')
self.write_token_to_creds_file()
self.load_or_generate_creds(True)
else:
logging.error('Unable to generate credentials.')
raise
# Action: Get TV State
# Publish: /remoteapp/tv/ui_service/36:BD:7A:BA:29:16$his$9D286C_vidaacommon_001/actions/gettvstate
# Subscribe: /remoteapp/mobile/broadcast/ui_service/state
# Response example: {"statetype":"remote_launcher"}
def get_tv_state(self):
client = self.create_mqtt_client(client_id=self.client_id, certfile=certfile, keyfile=keyfile, username=self.username, password=self.password)
client.message_callback_add(self.topicBrcsBasepath + 'ui_service/state', self.on_tv_state)
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
while not client.connected_flag:
logging.info("Waiting for connection...")
time.sleep(1)
logging.info(f"Publishing get TV state message to {self.topicTVUIBasepath}actions/gettvstate")
client.publish(self.topicTVUIBasepath + "actions/gettvstate")
self.wait_for_message(lambda: self.tv_state is None)
if self.tv_state:
logging.info(f"TV State received: {self.tv_state}")
return json.loads(self.tv_state)
else:
logging.error("Failed to get TV state")
return None
# Action: Power Cycle
# Publish: /remoteapp/tv/remote_service/C1:BE:D1:3D:6E:3E$his$7E1991_vidaacommon_001/actions/sendkey
# Message: KEY_POWER
def power_cycle_tv(self):
client = self.create_mqtt_client(client_id=self.client_id, certfile=certfile, keyfile=keyfile, username=self.username, password=self.password)
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
while not client.connected_flag:
logging.info("Waiting for connection...")
time.sleep(1)
logging.info(f"Publishing power cycle message to {self.topicRemoBasepath}actions/sendkey")
client.publish(self.topicRemoBasepath + "actions/sendkey", "KEY_POWER")
logging.info("Power cycle command sent.")
client.loop_stop()
client.disconnect()
if __name__ == "__main__":
# Initialize the TVAuthenticator class
auth = TVAuthenticator()
# Load or generate credentials
auth.load_or_generate_creds()
# Refresh the token if needed
auth.refresh_token()
# Power cycle the TV
auth.power_cycle_tv()
# Get TV State
tv_state = auth.get_tv_state()
if tv_state:
logging.info(f"TV State: {tv_state}")
# Action: Get Source List
# Publish: /remoteapp/tv/ui_service/36:BD:7A:BA:29:16$his$9D286C_vidaacommon_001/actions/sourcelist
# Subscribe: /remoteapp/mobile/C1:BE:D1:3D:6E:3E$his$7E1991_vidaacommon_001/ui_service/data/sourcelist
# Response example: [{"sourceid":"vidaahome","sourcename":"Home","displayname":"Home","displayname2":"","httpIcon":"","is_signal":"1","has_signal":"1","is_lock":"0","hotel_mode":"0"},{"sourceid":"TV","sourcename":"TV","displayname":"TV","displayname2":"","httpIcon":"","is_signal":"0","has_signal":"0","is_lock":"0","hotel_mode":"0"},{"sourceid":"284","sourcename":"VIDAA tv","displayname":"VIDAA tv","displayname2":"","httpIcon":"https://img.vidaahub.com/vidaa/2022/7/202207190701576882.png","is_signal":"0","has_signal":"1","is_lock":"0","hotel_mode":"0"},{"sourceid":"HDMI1","sourcename":"HDMI1","displayname":"Apple TV","displayname2":"Wohnzimmer","httpIcon":"","is_signal":"0","has_signal":"0","is_lock":"0","hotel_mode":"0"},{"sourceid":"HDMI2","sourcename":"HDMI2","displayname":"HDMI2","displayname2":"","httpIcon":"","is_signal":"0","has_signal":"0","is_lock":"0","hotel_mode":"0"},{"sourceid":"HDMI3","sourcename":"HDMI3","displayname":"PS5","displayname2":"PlayStation 5","httpIcon":"","is_signal":"0","has_signal":"0","is_lock":"0","hotel_mode":"0"},{"sourceid":"HDMI4","sourcename":"HDMI4","displayname":"PC","displayname2":"","httpIcon":"","is_signal":"0","has_signal":"0","is_lock":"0","hotel_mode":"0"},{"sourceid":"AVS","sourcename":"AV","displayname":"AV","displayname2":"","httpIcon":"","is_signal":"0","has_signal":"0","is_lock":"0","hotel_mode":"0"},{"sourceid":"MusicSharing","sourcename":"Music Sharing","displayname":"Music Sharing","displayname2":"","httpIcon":"","is_signal":"0","has_signal":"1","is_lock":"0","hotel_mode":"0"}]
# Action: Change Source
# Publish: /remoteapp/tv/ui_service/C1:BE:D1:3D:6E:3E$his$7E1991_vidaacommon_001/actions/changesource
# Message: {"sourceid":"HDMI3"}
# Action: Get Volume
# Publish: /remoteapp/tv/platform_service/C1:BE:D1:3D:6E:3E$his$7E1991_vidaacommon_001/actions/getvolume
# Subscribe: /remoteapp/mobile/broadcast/ui_service/volume
# Action: Change Volume
# Publish: /remoteapp/tv/platform_service/C1:BE:D1:3D:6E:3E$his$7E1991_vidaacommon_001/actions/changevolume
# Subscribe: /remoteapp/mobile/broadcast/ui_service/volume
# Message: 0-100
# Action: Get App List
# Publish: /remoteapp/tv/ui_service/C1:BE:D1:3D:6E:3E$his$7E1991_vidaacommon_001/actions/applist
# Subscribe: /remoteapp/mobile/C1:BE:D1:3D:6E:3E$his$7E1991_vidaacommon_001/ui_service/data/applist
# Response example: [{"url":"https://app.plex.tv/tv-vidaa","isunInstalled":false,"name":"Plex","from":"","storeType":98,"appId":"42","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090915047445.png"},{"url":"netflix","isunInstalled":false,"name":"Netflix","from":"","storeType":98,"appId":"1","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090857551807.png"},{"url":"youtube","isunInstalled":false,"name":"YouTube","from":"","storeType":98,"appId":"3","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090858299321.png"},{"url":"https://cd-dmgz.bamgrid.com/bbd/hisense_tv/index.html","isunInstalled":false,"name":"Disney+","from":"","storeType":99,"appId":"295","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090928459990.png"},{"url":"https://atve.tv.apple.com/94819831-5404-4438-810e-afb648d6a826/tvw_1dc2aab0a313427dbc11818be8a18bd9/","isunInstalled":false,"name":"Apple TV+","from":"","storeType":99,"appId":"351","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2022/1/202201280255184463.png"},{"url":"amazon","isunInstalled":false,"name":"Prime Video","from":"","storeType":98,"appId":"2","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090859207066.png"},{"url":"https://api-portal-v3.netrange.com/portal/v3/appredir?PrKey=645e94eb48044bb07a88090c250b472f842d77ee&AppId=190","isunInstalled":false,"name":"ARD Mediathek","from":"","storeType":93,"appId":"crawler_60061_nrmmh-190","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311060832013925.jpg"},{"url":"http://hbbtv.zdf.de/zdfm3/index.php?html5=1&portal=1&live=1","isunInstalled":false,"name":"ZDF","from":"","storeType":97,"appId":"187","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090927051313.png"},{"url":"https://tv.client.ott.sky.com/?MSoQ8ri2lztP50gc1YwprgPtZT3PkP&territory=DE","isunInstalled":false,"name":"WOW TV","from":"","storeType":99,"appId":"2201","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/12/202312131406047409.png?resize=w_300,h_300"},{"url":"https://app.hisense.rtl-smart.tv/","isunInstalled":false,"name":"RTL TVNOW","from":"","storeType":97,"appId":"300","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090925176358.png"},{"url":"vidaa-store","isunInstalled":false,"name":"APP STORE","from":"","storeType":99,"appId":"164","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311210659518385.png"},{"url":"https://www.intl.paramountplus.com/smart-console-apps/vidaa/","isunInstalled":false,"name":"Paramount+","from":"","storeType":99,"appId":"334","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2022/5/202205090739158843.png"},{"url":"browser","isunInstalled":false,"name":"TV Browser","from":"","storeType":100,"appId":"16","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311210704287492.png"},{"url":"https://prod-hisense-ui40-app.rakuten.tv/","isunInstalled":false,"name":"Rakuten TV","from":"","storeType":99,"appId":"65","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090900492844.png"},{"url":"https://zattoo-vidaa.zattoo.com","isunInstalled":false,"name":"ZATTOO","from":"","storeType":99,"appId":"1748","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311060834575429.png"},{"url":"https://tv.dazn.com/app/vidaa/","isunInstalled":false,"name":"DAZN","from":"","storeType":99,"appId":"162","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090902316899.png"},{"url":"https://tvmodules-vidaa.vidaahub.com/publicvalue/V1.0/index-publicvalue.html","isunInstalled":false,"name":"Public value","from":"","storeType":99,"appId":"2081","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311210713198850.png"},{"url":"https://app-hisense.pluto.tv/","isunInstalled":false,"name":"PlutoTV","from":"","storeType":98,"appId":"173","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090917583451.png"},{"url":"https://tv2.deezer.com/hisense","isunInstalled":false,"name":"Deezer","from":"","storeType":98,"appId":"47","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090916047381.png"},{"url":"https://live.prd.ott.s.joyn.de/?platform=hisense","isunInstalled":false,"name":"JOYN","from":"","storeType":99,"appId":"1908","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311060836507140.jpg"},{"url":"https://hisense.chili.com","isunInstalled":false,"name":"Chili","from":"","storeType":98,"appId":"73","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090913449937.png"},{"url":"hdplus","isunInstalled":false,"name":"HD+ LIVE TV","from":"","storeType":98,"appId":"325","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311060833285324.png"},{"url":"https://dn6q4f0q0kh23.cloudfront.net/receiver/hisense/haystackreceiver.html","isunInstalled":false,"name":"Haystack TV Local + World News","from":"","storeType":98,"appId":"227","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090918322670.png"},{"url":"https://hisense2-1080.rlaxxtv.com/","isunInstalled":false,"name":"RLAXX","from":"","storeType":99,"appId":"279","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090921287664.png"},{"url":"https://ctv-vidaa.kidoodle.tv","isunInstalled":false,"name":"Kidoodle.TV","from":"","storeType":98,"appId":"233","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090917185351.png"},{"url":"https://maxdomesmarttv3.meinvod.de/","isunInstalled":false,"name":"Maxdome","from":"","storeType":98,"appId":"143","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311060838581821.png"},{"url":"https://france24.tv.dotscreen.com/hisense/index.html?languages=EN,FR,ES,AR","isunInstalled":false,"name":"France24","from":"","storeType":99,"appId":"372","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2022/2/202202070800239017.png"},{"url":"https://hisense.vevo.com","isunInstalled":false,"name":"VEVO","from":"","storeType":98,"appId":"306","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311060832526783.png"},{"url":"https://hisense.plus.fifa.com/","isunInstalled":false,"name":"FIFA+","from":"","storeType":99,"appId":"1714","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311060834007733.png"},{"url":"https://ott-apps.fite.tv/hisense/index.html","isunInstalled":false,"name":"FITE - Boxing, Wrestling, MMA","from":"","storeType":99,"appId":"245","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090919469239.png"},{"url":"https://smarttv.uefa.tv","isunInstalled":false,"name":"UEFA.tv","from":"","storeType":98,"appId":"244","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090919094943.png"},{"url":"https://smarttv3.videociety.de","isunInstalled":false,"name":"Videociety","from":"","storeType":98,"appId":"78","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090912274669.png"},{"url":"youtube_kids","isunInstalled":false,"name":"YouTube Kids","from":"","storeType":98,"appId":"38","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090911395201.png"},{"url":"https://vidaa.servustv.com/","isunInstalled":false,"name":"ServusTV","from":"","storeType":98,"appId":"323","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090927409428.png"},{"url":"https://vidaa.redbull.tv/","isunInstalled":false,"name":"Red Bull TV","from":"","storeType":98,"appId":"79","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090916463905.png"},{"url":"https://html5tv.music.amazon.dev/?deviceModel=A27F3L5WMHDOWD","isunInstalled":false,"name":"Amazon Music","from":"","storeType":99,"appId":"376","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311060839281117.png"},{"url":"vidaa-plus","isunInstalled":false,"name":"VIDAA tv","from":"","storeType":98,"appId":"284","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311210736143340.png"},{"url":"vidaa-art","isunInstalled":false,"name":"VIDAA Art","from":"","storeType":98,"appId":"185","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311210729076057.png"},{"url":"screensaver","isunInstalled":false,"name":"Screensaver","from":"","storeType":98,"appId":"1672","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311210727367557.png"},{"url":"vidaa-free","isunInstalled":false,"name":"VIDAA Free","from":"","storeType":98,"appId":"184","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311210732337444.png"},{"url":"https://erosnowhtml.erosnow.com/main.html?partner_code=VIDD","isunInstalled":false,"name":"Eros Now","from":"","storeType":98,"appId":"178","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090924148666.png"},{"url":"accuweather","isunInstalled":false,"name":"AccuWeather","from":"","storeType":98,"appId":"15","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311091029582894.png"},{"url":"https://cweb-ott.nba.com/","isunInstalled":false,"name":"NBA","from":"","storeType":99,"appId":"293","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2022/9/202209160010272173.png"},{"url":"http://apps.tvgam.es/tv_games/hisense_portal/production/portal/index.html","isunInstalled":false,"name":"Game Center","from":"","storeType":98,"appId":"35","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090915362271.png"},{"url":"https://smarttv.molotov.tv/","isunInstalled":false,"name":"MOLOTOV.TV","from":"","storeType":99,"appId":"287","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090922406418.png"},{"url":"https://tv-production.mubi.com/vidaa/4.0.0/","isunInstalled":false,"name":"MUBI","from":"","storeType":98,"appId":"230","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2022/4/202204300235256422.png"},{"url":"https://html5.toongoggles.com/","isunInstalled":false,"name":"Toon Goggles","from":"","storeType":99,"appId":"36","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311060906446498.png"},{"url":"https://vewd.global.united.cloud/","isunInstalled":false,"name":"EON TV","from":"","storeType":99,"appId":"280","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090922112374.png"},{"url":"https://dice-tv.imggaming.com/vidaa/dce.ufc/index.html","isunInstalled":false,"name":"UFC","from":"","storeType":99,"appId":"355","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090930102217.png"},{"url":"https://prod.hisense.hisense.vidmind.cc/","isunInstalled":false,"name":"Kyivstar TV","from":"","storeType":98,"appId":"288","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090923183570.png"},{"url":"file:///APPS/emanual/Local_ebook/index.html","isunInstalled":false,"name":"E-Manual","from":"","storeType":99,"appId":"166","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311210706069065.png"},{"url":"file:///APPS/emanual/Local_ebook/index.html?fromTransparency=1","isunInstalled":false,"name":"Richtlinien","from":"","storeType":99,"appId":"296","move":true,"isFav":true,"httpIcon":"data:image/png;base64,iconDownloadhttps://img.vidaahub.com/vidaa/2023/11/202311090924486903.png"}]
# Action: Launch App Netflix
# Publish: /remoteapp/tv/ui_service/C1:BE:D1:3D:6E:3E$his$7E1991_vidaacommon_001/actions/launchapp
# Message: { "url": "netflix" }
# Action: Launch App Youtube
# Publish: /remoteapp/tv/ui_service/C1:BE:D1:3D:6E:3E$his$7E1991_vidaacommon_001/actions/launchapp
# Message: { "url" : "youtube" } |
Lol, found it... was using the password as password but should be using the access token ;-) |
In case someone needs an "all in one" script, here it is. This scripts initially authenticates using the 4-digit code if it doesn't find the credentials file, it refreshes the accesstoken if it has expired and allows running a set of commands either by choosing an option from a menu, or by specifying the correct command line... All options currently provided were tested on my TV and seem to work :) Any questions? Let me know... import re
import uuid
import hashlib
import time
import json
import logging
import paho.mqtt.client as mqtt
from pprint import pprint
import keyboard
import argparse
# Configuration
tv_ip = "192.168.178.134"
certfile = "./rcm_certchain_pem.cer"
keyfile = "./rcm_pem_privkey.pkcs8"
check_interval = 0.1
debug = True
# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class TVAuthenticator:
def __init__(self):
self.reply = None
self.authentication_payload = None
self.authentication_code_payload = None
self.tokenissuance = None
self.accesstoken = None
self.accesstoken_time = None
self.accesstoken_duration_day = None
self.refreshtoken = None
self.refreshtoken_time = None
self.refreshtoken_duration_day = None
self.client_id = None
self.username = None
self.password = None
self.timestamp = None
self.authenticated = False
self.topicTVUIBasepath = None
self.topicTVPSBasepath = None
self.topicMobiBasepath = None
self.topicBrcsBasepath = None
self.topicRemoBasepath = None
self.info = None
@staticmethod
# Sum all digits of a number
def cross_sum(n):
return sum(int(digit) for digit in str(n))
@staticmethod
# Convert a string to a hash
def string_to_hash(input_str):
return hashlib.md5(input_str.encode("utf-8")).hexdigest().upper()
# Action when connected
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
client.connected_flag = True
if debug:
logging.info("Connected to MQTT broker")
else:
logging.error(f"Bad connection. Returned code: {rc}")
client.bad_connection_flag = True
# Action when message received
def on_message(self, client, userdata, msg):
if debug:
logging.info(f"Message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.authenticated = False
self.reply = msg
# Action when subscribed
def on_subscribe(self, client, userdata, mid, granted_qos):
if debug:
logging.info(f"Subscribed: {mid} {granted_qos}")
# Action when published
def on_publish(self, client, userdata, mid):
if debug:
logging.info(f"Published message {mid}")
# Action when disconnected
def on_disconnect(self, client, userdata, rc):
if debug:
logging.info(f"Disconnected. Reason: {rc}")
# Action when authentication message received
def on_authentication(self, mosq, obj, msg):
if debug:
logging.info(f"Authentication message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.authentication_payload = msg
# Action when authentication code message received
def on_authentication_code(self, mosq, obj, msg):
if debug:
logging.info(f"Authentication code message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.authentication_code_payload = msg
# Action when token issuance message received
def on_tokenissuance(self, mosq, obj, msg):
if debug:
logging.info(f"Token issuance message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.tokenissuance = msg
# Action when information message received
def on_info(self, mosq, obj, msg):
if debug:
logging.info(f"Information message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.info = msg.payload.decode('utf-8')
# Wait for a message (condition is a lambda function that returns True or False)
def wait_for_message(self, condition, check_interval=1, debug=False):
time.sleep(1) # Initial delay to prevent false negatives
print("Waiting for message... (press and hold escape to cancel waiting)")
start_time = time.time()
print("Waiting...", end='', flush=True)
while condition():
if keyboard.is_pressed('esc'):
print("\nEscape pressed. Exiting...")
break
current_time = time.time()
if current_time - start_time >= 3:
print(".", end='', flush=True)
start_time = current_time
time.sleep(check_interval)
print("")
# Open the client and connect to the TV
def create_mqtt_client(self, client_id, certfile, keyfile, username, password, userdata=None):
if debug:
logging.info("Creating MQTT client...")
client = mqtt.Client(client_id=client_id, clean_session=True, userdata=userdata, protocol=mqtt.MQTTv311, transport="tcp")
client.tls_set(ca_certs=None, certfile=certfile, keyfile=keyfile, cert_reqs=mqtt.ssl.CERT_NONE, tls_version=mqtt.ssl.PROTOCOL_TLS)
client.tls_insecure_set(True)
client.username_pw_set(username=username, password=password)
# Attach event handlers
client.on_connect = self.on_connect
client.on_message = self.on_message
client.on_publish = self.on_publish
client.on_disconnect = self.on_disconnect
client.enable_logger()
client.connected_flag = False
client.bad_connection_flag = False
return client
# Refresh the token
def refresh_token(self):
if debug:
logging.info("Refreshing token...")
client = self.create_mqtt_client(client_id=self.client_id, certfile=certfile, keyfile=keyfile, username=self.username, password=self.accesstoken)
if debug:
logging.info(f"Adding callback message to {self.topicMobiBasepath}platform_service/data/tokenissuance")
client.message_callback_add(self.topicMobiBasepath + 'platform_service/data/tokenissuance', self.on_tokenissuance)
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
self.wait_for_message(lambda: not client.connected_flag)
client.subscribe(self.topicMobiBasepath + 'platform_service/data/tokenissuance')
client.publish(f"/remoteapp/tv/platform_service/{self.client_id}/data/gettoken", json.dumps({"refreshtoken": self.refreshtoken}))
self.wait_for_message(lambda: self.tokenissuance is None)
token_data = json.loads(self.tokenissuance.payload.decode())
token_data.update({"client_id": self.client_id, "username": self.username, "password": self.password})
pprint(token_data)
if debug:
logging.info('Token issued successfully')
with open('credentials.json', 'w') as file:
json.dump(token_data, file, indent=4)
if debug:
logging.info('Credentials saved to credentials.json')
client.loop_stop()
client.disconnect()
self.authenticated = True
return token_data["accesstoken"]
# Check and refresh the token if needed
def check_and_refresh_token(self):
current_time = time.time()
if debug:
logging.info(f"Current time is {time.ctime(current_time)}")
expiration_time = int(self.accesstoken_time) + (int(self.accesstoken_duration_day) * 24 * 60 * 60)
if debug:
logging.info(f"Access Token expires at {time.ctime(expiration_time)}")
refresh_expiration_time = int(self.refreshtoken_time) + (int(self.refreshtoken_duration_day) * 24 * 60 * 60)
if debug:
logging.info(f"Refresh Token expires at {time.ctime(refresh_expiration_time)}")
if current_time <= expiration_time:
if debug:
logging.info("Token still valid, no need to refresh")
time_diff = expiration_time - current_time
days = time_diff // (24 * 60 * 60)
hours = (time_diff % (24 * 60 * 60)) // (60 * 60)
minutes = (time_diff % (60 * 60)) // 60
seconds = time_diff % 60
if debug:
logging.info(f"Token expires in {int(days)} days, {int(hours)} hours, {int(minutes)} minutes, and {int(seconds)} seconds")
return self.accesstoken
if debug:
logging.info("Token not valid, refreshing the token")
return self.refresh_token()
# Define the hashes, username, password and client_id
def define_hashes(self):
self.timestamp = int(time.time())
mac = ':'.join(re.findall('..', '%012x' % uuid.getnode())).upper()
if debug:
logging.info(f'MAC Address: {mac}')
first_hash = self.string_to_hash("&vidaa#^app")
second_hash = self.string_to_hash(f"38D65DC30F45109A369A86FCE866A85B${mac}")
last_digit_of_cross_sum = self.cross_sum(self.timestamp) % 10
third_hash = self.string_to_hash(f"his{last_digit_of_cross_sum}h*i&s%e!r^v0i1c9")
fourth_hash = self.string_to_hash(f"{self.timestamp}${third_hash[:6]}")
self.username = f"his${self.timestamp}"
self.password = fourth_hash
if debug:
logging.info(f'First Hash: {first_hash}')
logging.info(f'Second Hash: {second_hash}')
logging.info(f'Third Hash: {third_hash}')
logging.info(f'Fourth Hash: {fourth_hash}')
self.client_id = f"{mac}$his${second_hash[:6]}_vidaacommon_001"
if debug:
logging.info(f'Client ID: {self.client_id}')
# Define the topic paths
def define_topic_paths(self):
self.topicTVUIBasepath = f"/remoteapp/tv/ui_service/{self.client_id}/"
self.topicTVPSBasepath = f"/remoteapp/tv/platform_service/{self.client_id}/"
self.topicMobiBasepath = f"/remoteapp/mobile/{self.client_id}/"
self.topicBrcsBasepath = f"/remoteapp/mobile/broadcast/"
self.topicRemoBasepath = f"/remoteapp/tv/remote_service/{self.client_id}/"
# Authenticate with the TV and write the credentials to the credentials file
def generate_creds(self):
self.define_hashes()
self.define_topic_paths()
client = self.create_mqtt_client(client_id=self.client_id, certfile=certfile, keyfile=keyfile, username=self.username, password=self.password)
if debug:
logging.info(f"Adding callback messages for authentication...")
client.message_callback_add(self.topicMobiBasepath + 'ui_service/data/authentication', self.on_authentication)
client.message_callback_add(self.topicMobiBasepath + 'ui_service/data/authenticationcode', self.on_authentication_code)
client.message_callback_add(self.topicBrcsBasepath + 'ui_service/data/hotelmodechange', self.on_message)
client.message_callback_add(self.topicMobiBasepath + 'platform_service/data/tokenissuance', self.on_tokenissuance)
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
self.wait_for_message(lambda: not client.connected_flag)
client.subscribe([
(self.topicBrcsBasepath + 'ui_service/state', 0),
(self.topicTVUIBasepath + 'actions/vidaa_app_connect', 0),
(self.topicMobiBasepath + 'ui_service/data/authentication', 0),
(self.topicMobiBasepath + 'ui_service/data/authenticationcode', 0),
(self.topicBrcsBasepath + "ui_service/data/hotelmodechange", 0),
(self.topicMobiBasepath + 'platform_service/data/tokenissuance', 0),
])
if debug:
logging.info('Publishing message to actions/vidaa_app_connect...')
client.publish(self.topicTVUIBasepath + "actions/vidaa_app_connect", '{"app_version":2,"connect_result":0,"device_type":"Mobile App"}')
self.wait_for_message(lambda: self.authentication_payload is None)
if self.authentication_payload.payload.decode() != '""':
logging.error('Problems with the authentication message!')
logging.error(self.authentication_payload.payload)
return
if debug:
logging.info(f'Subscribing to {self.topicMobiBasepath}ui_service/data/authenticationcode...')
client.subscribe(self.topicMobiBasepath + 'ui_service/data/authenticationcode')
authsuccess = False
while not authsuccess:
auth_num = input("Enter the four digits displayed on your TV: ")
client.publish(self.topicTVUIBasepath + "actions/authenticationcode", f'{{"authNum":{auth_num}}}')
self.wait_for_message(lambda: self.authentication_code_payload is None)
if json.loads(self.authentication_code_payload.payload.decode()) != {"result": 1, "info": ""}:
if debug:
logging.error('Problems with the authentication message!')
logging.error(self.authentication_code_payload.payload)
else:
authsuccess = True
if debug:
logging.info("Success! Getting access token...")
client.publish(self.topicTVPSBasepath + "data/gettoken", '{"refreshtoken": ""}')
client.publish(self.topicTVUIBasepath + "actions/authenticationcodeclose")
client.subscribe(self.topicBrcsBasepath + 'ui_service/data/hotelmodechange')
client.subscribe(self.topicMobiBasepath + 'platform_service/data/tokenissuance')
self.wait_for_message(lambda: self.tokenissuance is None)
token_data = json.loads(self.tokenissuance.payload.decode())
token_data.update({"client_id": self.client_id, "username": self.username, "password": self.password})
pprint(token_data)
if debug:
logging.info('Token issued successfully')
with open('credentials.json', 'w') as file:
json.dump(token_data, file, indent=4)
if debug:
logging.info('Credentials saved to credentials.json')
client.loop_stop()
client.disconnect()
self.authenticated = True
return token_data["accesstoken"]
# Load the credentials from the credentials file or generate new ones
def load_or_generate_creds(self, rec=False):
try:
with open('credentials.json', 'r') as file:
if debug:
logging.info('Loading stored credentials...')
saved_credentials = json.load(file)
self.accesstoken = saved_credentials["accesstoken"]
self.accesstoken_time = saved_credentials["accesstoken_time"]
self.accesstoken_duration_day = saved_credentials["accesstoken_duration_day"]
self.refreshtoken = saved_credentials["refreshtoken"]
self.refreshtoken_time = saved_credentials["refreshtoken_time"]
self.refreshtoken_duration_day = saved_credentials["refreshtoken_duration_day"]
self.client_id = saved_credentials['client_id']
self.username = saved_credentials['username']
self.password = saved_credentials['password']
self.authenticated = True
except FileNotFoundError:
if not rec:
if debug:
logging.info('No stored credentials found, starting auth with TV...')
self.generate_creds()
self.load_or_generate_creds(True)
else:
if debug:
logging.error('Unable to generate credentials.')
raise
# Show the credentials
def show_credentials(self):
current_time = time.time()
print(f"Current time is {time.ctime(current_time)}")
print("")
print("client_id: " + self.client_id)
print("username: " + self.username)
print("password: " + self.password)
print("")
print("accesstoken: " + self.accesstoken)
print("accesstoken_time: " + self.accesstoken_time)
print("accesstoken_duration_day: " + str(self.accesstoken_duration_day))
expiration_time = int(self.accesstoken_time) + (int(self.accesstoken_duration_day) * 24 * 60 * 60)
print(f"Access Token expires at {time.ctime(expiration_time)}")
print("")
print("refreshtoken: " + self.refreshtoken)
print("refreshtoken_time: " + self.refreshtoken_time)
print("refreshtoken_duration_day: " + str(self.refreshtoken_duration_day))
refresh_expiration_time = int(self.refreshtoken_time) + (int(self.refreshtoken_duration_day) * 24 * 60 * 60)
print(f"Refresh Token expires at {time.ctime(refresh_expiration_time)}")
print("")
# Get requested information from the TV
def get_info(self, callback_message, subscribe_topic, publish_topic):
if debug:
logging.info("Getting information...")
client = self.create_mqtt_client(client_id=self.client_id, certfile=certfile, keyfile=keyfile, username=self.username, password=self.accesstoken)
if debug:
logging.info(f"Adding callback to {callback_message}")
client.message_callback_add(callback_message, self.on_info)
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
self.wait_for_message(lambda: not client.connected_flag)
if debug:
logging.info(f"Subscribing for {subscribe_topic}")
# client.subscribe(subscribe_topic)
client.subscribe([
(subscribe_topic, 0),
(self.topicMobiBasepath + 'ui_service/data/authentication', 0), # if authentication fails, this will return a message
])
if debug:
logging.info(f"Publishing message to {publish_topic}")
client.publish(publish_topic, None)
self.wait_for_message(lambda: self.info is None)
# self.wait_for_message(lambda: True is True)
if not self.authenticated:
if debug:
logging.info("NOT Authenticated")
self.authenticate(client)
client.loop_stop()
client.disconnect()
if self.info:
if debug:
logging.info(f"Information received: {self.info}")
return json.loads(self.info)
else:
logging.error("Failed to get information")
return None
# Send a command to the TV
def send_command(self, publish_topic, command = None):
if debug:
logging.info("Sending command to TV...")
client = self.create_mqtt_client(client_id=self.client_id, certfile=certfile, keyfile=keyfile, username=self.username, password=self.accesstoken)
if debug:
logging.info("No callback message needed for command sending.")
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
self.wait_for_message(lambda: not client.connected_flag)
if debug:
logging.info(f"Publishing {command} command to {publish_topic}")
client.publish(publish_topic, command)
if debug:
logging.info("Command sent.")
client.loop_stop()
client.disconnect()
# Get the current state of the TV
def get_tv_state(self):
if debug:
logging.info("Getting TV state...")
get_tv_state_subscribe = self.topicBrcsBasepath + "ui_service/state"
get_tv_state_callback = self.topicBrcsBasepath + "ui_service/state"
get_tv_state_publish = self.topicTVUIBasepath + "actions/gettvstate"
tv_state = self.get_info(get_tv_state_callback, get_tv_state_subscribe, get_tv_state_publish)
return tv_state
# Get the source list of the TV
def get_source_list(self):
if debug:
logging.info("Getting source list...")
get_source_list_callback = self.topicMobiBasepath + "ui_service/data/sourcelist"
get_source_list_subscribe = self.topicMobiBasepath + "ui_service/data/sourcelist"
get_source_list_publish = self.topicTVUIBasepath + "actions/sourcelist"
source_list = self.get_info(get_source_list_callback, get_source_list_subscribe, get_source_list_publish)
return source_list
# Get the volume of the TV
def get_volume(self):
if debug:
logging.info("Getting volume...")
get_volume_callback = self.topicBrcsBasepath + "platform_service/actions/volumechange"
get_volume_subscribe = self.topicBrcsBasepath + "platform_service/actions/volumechange"
get_volume_publish = self.topicTVPSBasepath + "actions/getvolume"
volume = self.get_info(get_volume_callback, get_volume_subscribe, get_volume_publish)
return volume
# Get the app list of the TV
def get_app_list(self):
if debug:
logging.info("Getting app list...")
get_app_list_callback = self.topicMobiBasepath + "ui_service/data/applist"
get_app_list_subscribe = self.topicMobiBasepath + "ui_service/data/applist"
get_app_list_publish = self.topicTVUIBasepath + "actions/applist"
app_list = self.get_info(get_app_list_callback, get_app_list_subscribe, get_app_list_publish)
return app_list
# Power Cycle the TV
def power_cycle_tv(self):
if debug:
logging.info("Power cycling the TV...")
power_cycle_command = "KEY_POWER"
power_cycle_publish = self.topicRemoBasepath + "actions/sendkey"
self.send_command(power_cycle_publish, power_cycle_command)
return True
# Change the source of the TV
def change_source(self, source_id):
if debug:
logging.info(f"Changing source to {source_id}...")
change_source_publish = self.topicTVUIBasepath + "actions/changesource"
change_source_command = json.dumps({"sourceid": source_id})
self.send_command(change_source_publish, change_source_command)
return True
# Change the volume of the TV
def change_volume(self, volume):
if debug:
logging.info(f"Changing volume to {volume}...")
change_volume_publish = self.topicTVPSBasepath + "actions/changevolume"
change_volume_command = str(volume)
self.send_command(change_volume_publish, change_volume_command)
return True
# Launch an app on the TV
def launch_app(self, app_name, app_list = None):
if debug:
logging.info(f"Launching app {app_name}...")
if not app_list:
app_list = self.get_app_list()
if not app_list:
print("Failed to get app list.")
return False
app_id = None
app_url = None
for app in app_list:
if app["name"].upper() == app_name.upper():
app_id = app["appId"]
app_url = app["url"]
app_name = app["name"]
break
if app_id is None or app_url is None:
print("Failed to find app in app list.")
return False
launch_app_publish = self.topicTVUIBasepath + "actions/launchapp"
launch_app_command = json.dumps({"appId": app_id, "name": app_name, "url": app_url})
self.send_command(launch_app_publish, launch_app_command)
return True
# Show the help message
def show_help(self):
print("1. Get TV State, from command line: --action getstate")
print("2. Power Cycle TV, from command line: --action powercycle (of use poweron or poweroff which first gets the state)")
print("3. Get Source List, from command line: --action getsourcelist")
print("4. Change Source, from command line: --action changesource --parameter <source_name>")
print("5. Get Volume, from command line: --action getvolume")
print("6. Change Volume, from command line: --action changevolume --parameter <volume>")
print("7. Get App List, from command line: --action getapplist")
print("8. Launch App, from command line: --action launchapp --parameter <app_name>\n")
print("C. Show Credentials, from command line: --action showcredentials")
print("R. Refresh Token, from command line: --action refreshtoken")
print("F. Force Refresh Token, from command line: --action forcerefresh")
print("S. Save Credentials, from command line: --action save")
print("L. Load Credentials, from command line: --action load")
print("A. Authenticate, from command line: --action authenticate\n")
print("H. Help, from command line: --action help\n")
print("0. Exit, from command line: --action exit\n")
# Main function
if __name__ == "__main__":
# Initialize the TVAuthenticator class
auth = TVAuthenticator()
# Parse command line arguments
parser = argparse.ArgumentParser(description='Hisense TV Control')
parser.add_argument('--action', type=str, help='Action to perform', choices=['getstate', 'powercycle', 'poweron', 'poweroff', 'getsourcelist', 'changesource', 'getvolume', 'changevolume', 'getapplist', 'launchapp', 'showcredentials', 'forcerefresh', 'refreshtoken', 'save', 'load', 'authenticate', 'help', 'exit'])
parser.add_argument('--parameter', type=str, help='Parameter for the action')
parser.add_argument('--debug', type=str, help='Set debug mode on or off (True/False)', choices=['True', 'False'], default='False')
args = parser.parse_args()
# Set debug mode
if args.debug == "True":
debug = True
elif args.debug == "False":
debug = False
# Load or generate credentials
auth.load_or_generate_creds()
# Show the credentials
auth.show_credentials()
# Define hashes and topic paths
auth.define_topic_paths()
# Refresh the token if needed
auth.check_and_refresh_token()
# Main loop
action = None
while action != "0":
if not args.action:
print("\nChoose an action:\n")
auth.show_help()
action = input("Action: ")
else:
action = args.action
action = action.upper()
if debug:
logging.info(f"Action: {action}")
if action == "1" or action == "GETSTATE":
# Get TV State
tv_state = auth.get_tv_state()
if tv_state:
print(f"TV State: \n{json.dumps(tv_state, indent=4)}")
else:
print("Failed to get TV state.")
elif action == "2" or action == "POWERCYCLE":
# Power cycle the TV
command_sent = auth.power_cycle_tv()
if command_sent:
print("Power cycle command sent.")
else:
print("Failed to send power cycle command.")
elif action == "3" or action == "GETSOURCELIST":
# Get source list
source_list = auth.get_source_list()
if source_list:
print(f"Source list: \n{json.dumps(source_list, indent=4)}")
else:
print("Failed to get source list.")
elif action == "4" or action == "CHANGESOURCE":
# Change Source
if not args.parameter:
source_list = auth.get_source_list()
if source_list:
print(f"Source list: \n{json.dumps(source_list, indent=4)}")
parameter = input("Enter the source ID: ")
else:
print("Failed to get source list.")
else:
source_id = args.parameter
source_changed = auth.change_source(source_id)
if source_changed:
print(f"Source changed to {source_id}")
else:
print("Failed to change source.")
elif action == "5" or action == "GETVOLUME":
# Get Volume
volume = auth.get_volume()
if volume:
print(f"Volume: \n{json.dumps(volume, indent=4)}")
else:
print("Failed to get volume.")
elif action == "6" or action == "CHANGEVOLUME":
# Change Volume
if not args.parameter:
volume = get_volume()
if volume:
print(f"Volume: \n{json.dumps(volume, indent=4)}")
volume = input("Enter the volume level (0-100): ")
else:
print("Failed to get volume.")
else:
volume = args.parameter
volume_changed = auth.change_volume(volume)
if volume_changed:
print(f"Volume changed to {volume}")
else:
print("Failed to change volume.")
elif action == "7" or action == "GETAPPLIST":
# Get App List
app_list = auth.get_app_list()
if app_list:
print(f"App list: \n{json.dumps(app_list, indent=4)}")
print("\nApps:\n")
for app in app_list:
print(app['name'])
else:
print("Failed to get app list.")
elif action == "8" or action == "LAUNCHAPP":
# Launch App
app_list = None
if not args.parameter:
app_list = auth.get_app_list()
if app_list:
print(f"App list: \n{json.dumps(app_list, indent=4)}")
app_name = input("Enter the app name to launch: ")
else:
print("Failed to get app list.")
else:
app_name = args.parameter
app_launched = auth.launch_app(app_name, app_list)
if app_launched:
print(f"App launched: {app_name}")
else:
print("Failed to launch app.")
elif action == "C" or action == "SHOWCREDENTIALS":
# Show credentials
auth.show_credentials()
elif action == "F" or action == "FORCEREFRESH":
# Refresh token
auth.refresh_token()
print("Token refreshed.")
elif action == "R" or action == "REFRESHTOKEN":
# Refresh token
auth.check_and_refresh_token()
print("Token refreshed.")
elif action == "S" or action == "SAVE":
# Save credentials
auth.write_token_to_creds_file()
print("Credentials saved.")
elif action == "L" or action == "LOAD":
# Load credentials
auth.load_or_generate_creds()
print("Credentials loaded.")
elif action == "A" or action == "AUTHENTICATE":
# Delete credentials
auth.generate_creds()
print("Credentials deleted.")
elif action == "H" or action == "HELP":
# Help
auth.show_help()
elif action == "0" or action == "EXIT":
if debug:
logging.info("Exiting...")
# Exit
break
elif action == "POWERON":
# Power on the TV
print("Powering on the TV...")
# Get TV State
tv_state = auth.get_tv_state()
if tv_state:
if "statetype" in tv_state and tv_state["statetype"] == "fake_sleep_0":
# Power cycle the TV
command_sent = auth.power_cycle_tv()
if command_sent:
print("Power cycle command sent.")
else:
print("Failed to send power cycle command.")
else:
print("TV is already on.")
else:
print("Failed to get TV state.")
elif action == "POWEROFF":
# Power off the TV
print("Powering off the TV...")
# Get TV State
tv_state = auth.get_tv_state()
if tv_state:
if "statetype" in tv_state and tv_state["statetype"] != "fake_sleep_0":
# Power cycle the TV
command_sent = auth.power_cycle_tv()
if command_sent:
print("Power cycle command sent.")
else:
print("Failed to send power cycle command.")
else:
print("TV is already off.")
else:
print("Failed to get TV state.")
# Exit when passed from command line
if args.action:
if debug:
logging.info("Command from command line done - Exiting...")
break |
Updated script... guess I'd need to add it to my Github? import re
import uuid
import hashlib
import time
import json
import logging
import paho.mqtt.client as mqtt
from pprint import pprint
import keyboard
import argparse
import os
import sys
script_directory = os.path.dirname(os.path.abspath(sys.argv[0]))
# Configuration
tv_ip = "192.168.178.134"
certfile = os.path.join(script_directory, "./rcm_certchain_pem.cer")
keyfile = os.path.join(script_directory, "./rcm_pem_privkey.pkcs8")
credentialsfile = os.path.join(script_directory, "credentials.json")
check_interval = 0.1
debug = True
# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class TVAuthenticator:
def __init__(self):
self.reply = None
self.authentication_payload = None
self.authentication_code_payload = None
self.tokenissuance = None
self.accesstoken = None
self.accesstoken_time = None
self.accesstoken_duration_day = None
self.refreshtoken = None
self.refreshtoken_time = None
self.refreshtoken_duration_day = None
self.client_id = None
self.username = None
self.password = None
self.timestamp = None
self.authenticated = False
self.topicTVUIBasepath = None
self.topicTVPSBasepath = None
self.topicMobiBasepath = None
self.topicBrcsBasepath = None
self.topicRemoBasepath = None
self.info = None
@staticmethod
# Sum all digits of a number
def cross_sum(n):
return sum(int(digit) for digit in str(n))
@staticmethod
# Convert a string to a hash
def string_to_hash(input_str):
return hashlib.md5(input_str.encode("utf-8")).hexdigest().upper()
# Action when connected
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
client.connected_flag = True
if debug:
logging.info("Connected to MQTT broker")
else:
logging.error(f"Bad connection. Returned code: {rc}")
client.bad_connection_flag = True
# Action when message received
def on_message(self, client, userdata, msg):
if debug:
logging.info(f"Message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.authenticated = False
self.reply = msg
# Action when subscribed
def on_subscribe(self, client, userdata, mid, granted_qos):
if debug:
logging.info(f"Subscribed: {mid} {granted_qos}")
# Action when published
def on_publish(self, client, userdata, mid):
if debug:
logging.info(f"Published message {mid}")
# Action when disconnected
def on_disconnect(self, client, userdata, rc):
if debug:
logging.info(f"Disconnected. Reason: {rc}")
# Action when authentication message received
def on_authentication(self, mosq, obj, msg):
if debug:
logging.info(f"Authentication message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.authentication_payload = msg
# Action when authentication code message received
def on_authentication_code(self, mosq, obj, msg):
if debug:
logging.info(f"Authentication code message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.authentication_code_payload = msg
# Action when token issuance message received
def on_tokenissuance(self, mosq, obj, msg):
if debug:
logging.info(f"Token issuance message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.tokenissuance = msg
# Action when information message received
def on_info(self, mosq, obj, msg):
if debug:
logging.info(f"Information message received: {msg.payload.decode('utf-8')} on topic {msg.topic}")
self.info = msg.payload.decode('utf-8')
# Wait for a message (condition is a lambda function that returns True or False)
def wait_for_message(self, condition, check_interval=1, debug=False):
time.sleep(1) # Initial delay to prevent false negatives
print("Waiting for message... (press and hold escape to cancel waiting)")
start_time = time.time()
print("Waiting...", end='', flush=True)
while condition():
if keyboard.is_pressed('esc'):
print("\nEscape pressed. Exiting...")
break
current_time = time.time()
if current_time - start_time >= 3:
print(".", end='', flush=True)
start_time = current_time
time.sleep(check_interval)
print("")
# Open the client and connect to the TV
def create_mqtt_client(self, client_id, certfile, keyfile, username, password, userdata=None):
if debug:
logging.info("Creating MQTT client...")
client = mqtt.Client(client_id=client_id, clean_session=True, userdata=userdata, protocol=mqtt.MQTTv311, transport="tcp")
client.tls_set(ca_certs=None, certfile=certfile, keyfile=keyfile, cert_reqs=mqtt.ssl.CERT_NONE, tls_version=mqtt.ssl.PROTOCOL_TLS)
client.tls_insecure_set(True)
client.username_pw_set(username=username, password=password)
# Attach event handlers
client.on_connect = self.on_connect
client.on_message = self.on_message
client.on_publish = self.on_publish
client.on_disconnect = self.on_disconnect
client.enable_logger()
client.connected_flag = False
client.bad_connection_flag = False
return client
# Refresh the token
def refresh_token(self):
if debug:
logging.info("Refreshing token...")
client = self.create_mqtt_client(client_id=self.client_id, certfile=certfile, keyfile=keyfile, username=self.username, password=self.refreshtoken)
if debug:
logging.info(f"Adding callback message to {self.topicMobiBasepath}platform_service/data/tokenissuance")
client.message_callback_add(self.topicMobiBasepath + 'platform_service/data/tokenissuance', self.on_tokenissuance)
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
self.wait_for_message(lambda: not client.connected_flag)
client.subscribe(self.topicMobiBasepath + 'platform_service/data/tokenissuance')
client.publish(f"/remoteapp/tv/platform_service/{self.client_id}/data/gettoken", json.dumps({"refreshtoken": self.refreshtoken}))
self.wait_for_message(lambda: self.tokenissuance is None)
credentials = json.loads(self.tokenissuance.payload.decode())
credentials.update({"client_id": self.client_id, "username": self.username, "password": self.password})
pprint(credentials)
if debug:
logging.info('Token issued successfully')
with open(credentialsfile, 'w') as file:
json.dump(credentials, file, indent=4)
if debug:
logging.info('Credentials saved to {credentialsfile}')
client.loop_stop()
client.disconnect()
self.accesstoken = credentials["accesstoken"]
self.accesstoken_time = credentials["accesstoken_time"]
self.accesstoken_duration_day = credentials["accesstoken_duration_day"]
self.refreshtoken = credentials["refreshtoken"]
self.refreshtoken_time = credentials["refreshtoken_time"]
self.refreshtoken_duration_day = credentials["refreshtoken_duration_day"]
self.client_id = credentials['client_id']
self.username = credentials['username']
self.password = credentials['password']
self.authenticated = True
return credentials["accesstoken"]
# Check and refresh the token if needed
def check_and_refresh_token(self):
current_time = time.time()
if debug:
logging.info(f"Current time is {time.ctime(current_time)}")
expiration_time = int(self.accesstoken_time) + (int(self.accesstoken_duration_day) * 24 * 60 * 60)
if debug:
logging.info(f"Access Token expires at {time.ctime(expiration_time)}")
refresh_expiration_time = int(self.refreshtoken_time) + (int(self.refreshtoken_duration_day) * 24 * 60 * 60)
if debug:
logging.info(f"Refresh Token expires at {time.ctime(refresh_expiration_time)}")
if current_time <= expiration_time:
if debug:
logging.info("Token still valid, no need to refresh")
time_diff = expiration_time - current_time
days = time_diff // (24 * 60 * 60)
hours = (time_diff % (24 * 60 * 60)) // (60 * 60)
minutes = (time_diff % (60 * 60)) // 60
seconds = time_diff % 60
if debug:
logging.info(f"Token expires in {int(days)} days, {int(hours)} hours, {int(minutes)} minutes, and {int(seconds)} seconds")
return self.accesstoken
if debug:
logging.info("Token not valid, refreshing the token")
return self.refresh_token()
# Define the hashes, username, password and client_id
def define_hashes(self):
self.timestamp = int(time.time())
mac = ':'.join(re.findall('..', '%012x' % uuid.getnode())).upper()
if debug:
logging.info(f'MAC Address: {mac}')
first_hash = self.string_to_hash("&vidaa#^app")
second_hash = self.string_to_hash(f"38D65DC30F45109A369A86FCE866A85B${mac}")
last_digit_of_cross_sum = self.cross_sum(self.timestamp) % 10
third_hash = self.string_to_hash(f"his{last_digit_of_cross_sum}h*i&s%e!r^v0i1c9")
fourth_hash = self.string_to_hash(f"{self.timestamp}${third_hash[:6]}")
self.username = f"his${self.timestamp}"
self.password = fourth_hash
if debug:
logging.info(f'First Hash: {first_hash}')
logging.info(f'Second Hash: {second_hash}')
logging.info(f'Third Hash: {third_hash}')
logging.info(f'Fourth Hash: {fourth_hash}')
self.client_id = f"{mac}$his${second_hash[:6]}_vidaacommon_001"
if debug:
logging.info(f'Client ID: {self.client_id}')
# Define the topic paths
def define_topic_paths(self):
self.topicTVUIBasepath = f"/remoteapp/tv/ui_service/{self.client_id}/"
self.topicTVPSBasepath = f"/remoteapp/tv/platform_service/{self.client_id}/"
self.topicMobiBasepath = f"/remoteapp/mobile/{self.client_id}/"
self.topicBrcsBasepath = f"/remoteapp/mobile/broadcast/"
self.topicRemoBasepath = f"/remoteapp/tv/remote_service/{self.client_id}/"
# Authenticate with the TV and write the credentials to the credentials file
def generate_creds(self):
self.define_hashes()
self.define_topic_paths()
client = self.create_mqtt_client(client_id=self.client_id, certfile=certfile, keyfile=keyfile, username=self.username, password=self.password)
if debug:
logging.info(f"Adding callback messages for authentication...")
client.message_callback_add(self.topicMobiBasepath + 'ui_service/data/authentication', self.on_authentication)
client.message_callback_add(self.topicMobiBasepath + 'ui_service/data/authenticationcode', self.on_authentication_code)
client.message_callback_add(self.topicBrcsBasepath + 'ui_service/data/hotelmodechange', self.on_message)
client.message_callback_add(self.topicMobiBasepath + 'platform_service/data/tokenissuance', self.on_tokenissuance)
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
self.wait_for_message(lambda: not client.connected_flag)
client.subscribe([
(self.topicBrcsBasepath + 'ui_service/state', 0),
(self.topicTVUIBasepath + 'actions/vidaa_app_connect', 0),
(self.topicMobiBasepath + 'ui_service/data/authentication', 0),
(self.topicMobiBasepath + 'ui_service/data/authenticationcode', 0),
(self.topicBrcsBasepath + "ui_service/data/hotelmodechange", 0),
(self.topicMobiBasepath + 'platform_service/data/tokenissuance', 0),
])
if debug:
logging.info('Publishing message to actions/vidaa_app_connect...')
client.publish(self.topicTVUIBasepath + "actions/vidaa_app_connect", '{"app_version":2,"connect_result":0,"device_type":"Mobile App"}')
self.wait_for_message(lambda: self.authentication_payload is None)
if self.authentication_payload.payload.decode() != '""':
logging.error('Problems with the authentication message!')
logging.error(self.authentication_payload.payload)
return
if debug:
logging.info(f'Subscribing to {self.topicMobiBasepath}ui_service/data/authenticationcode...')
client.subscribe(self.topicMobiBasepath + 'ui_service/data/authenticationcode')
authsuccess = False
while not authsuccess:
auth_num = input("Enter the four digits displayed on your TV: ")
client.publish(self.topicTVUIBasepath + "actions/authenticationcode", f'{{"authNum":{auth_num}}}')
self.wait_for_message(lambda: self.authentication_code_payload is None)
if json.loads(self.authentication_code_payload.payload.decode()) != {"result": 1, "info": ""}:
if debug:
logging.error('Problems with the authentication message!')
logging.error(self.authentication_code_payload.payload)
else:
authsuccess = True
if debug:
logging.info("Success! Getting access token...")
client.publish(self.topicTVPSBasepath + "data/gettoken", '{"refreshtoken": ""}')
client.publish(self.topicTVUIBasepath + "actions/authenticationcodeclose")
client.subscribe(self.topicBrcsBasepath + 'ui_service/data/hotelmodechange')
client.subscribe(self.topicMobiBasepath + 'platform_service/data/tokenissuance')
self.wait_for_message(lambda: self.tokenissuance is None)
credentials = json.loads(self.tokenissuance.payload.decode())
credentials.update({"client_id": self.client_id, "username": self.username, "password": self.password})
pprint(credentials)
if debug:
logging.info('Token issued successfully')
with open(credentialsfile, 'w') as file:
json.dump(credentials, file, indent=4)
if debug:
logging.info('Credentials saved to {credentialsfile}')
client.loop_stop()
client.disconnect()
self.accesstoken = credentials["accesstoken"]
self.accesstoken_time = credentials["accesstoken_time"]
self.accesstoken_duration_day = credentials["accesstoken_duration_day"]
self.refreshtoken = credentials["refreshtoken"]
self.refreshtoken_time = credentials["refreshtoken_time"]
self.refreshtoken_duration_day = credentials["refreshtoken_duration_day"]
self.client_id = credentials['client_id']
self.username = credentials['username']
self.password = credentials['password']
self.authenticated = True
return credentials["accesstoken"]
# Load the credentials from the credentials file or generate new ones
def load_or_generate_creds(self, rec=False):
try:
with open(credentialsfile, 'r') as file:
if debug:
logging.info('Loading stored credentials from {credentialsfile}...')
credentials = json.load(file)
self.accesstoken = credentials["accesstoken"]
self.accesstoken_time = credentials["accesstoken_time"]
self.accesstoken_duration_day = credentials["accesstoken_duration_day"]
self.refreshtoken = credentials["refreshtoken"]
self.refreshtoken_time = credentials["refreshtoken_time"]
self.refreshtoken_duration_day = credentials["refreshtoken_duration_day"]
self.client_id = credentials['client_id']
self.username = credentials['username']
self.password = credentials['password']
self.authenticated = True
except FileNotFoundError:
if not rec:
if debug:
logging.info('No stored credentials found, starting auth with TV...')
self.generate_creds()
self.load_or_generate_creds(True)
else:
if debug:
logging.error('Unable to generate credentials.')
raise
# Show the credentials
def show_credentials(self):
current_time = time.time()
print(f"Current time is {time.ctime(current_time)}")
print("")
print("client_id: " + self.client_id)
print("username: " + self.username)
print("password: " + self.password)
print("")
print("accesstoken: " + self.accesstoken)
print("accesstoken_time: " + self.accesstoken_time)
print("accesstoken_duration_day: " + str(self.accesstoken_duration_day))
expiration_time = int(self.accesstoken_time) + (int(self.accesstoken_duration_day) * 24 * 60 * 60)
print(f"Access Token expires at {time.ctime(expiration_time)}")
print("")
print("refreshtoken: " + self.refreshtoken)
print("refreshtoken_time: " + self.refreshtoken_time)
print("refreshtoken_duration_day: " + str(self.refreshtoken_duration_day))
refresh_expiration_time = int(self.refreshtoken_time) + (int(self.refreshtoken_duration_day) * 24 * 60 * 60)
print(f"Refresh Token expires at {time.ctime(refresh_expiration_time)}")
print("")
# Get requested information from the TV
def get_info(self, callback_message, subscribe_topic, publish_topic):
if debug:
logging.info("Getting information...")
client = self.create_mqtt_client(client_id=self.client_id, certfile=certfile, keyfile=keyfile, username=self.username, password=self.accesstoken)
if debug:
logging.info(f"Adding callback to {callback_message}")
client.message_callback_add(callback_message, self.on_info)
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
self.wait_for_message(lambda: not client.connected_flag)
if debug:
logging.info(f"Subscribing for {subscribe_topic}")
# client.subscribe(subscribe_topic)
client.subscribe([
(subscribe_topic, 0),
(self.topicMobiBasepath + 'ui_service/data/authentication', 0), # if authentication fails, this will return a message
])
if debug:
logging.info(f"Publishing message to {publish_topic}")
client.publish(publish_topic, None)
self.wait_for_message(lambda: self.info is None)
# self.wait_for_message(lambda: True is True)
if not self.authenticated:
if debug:
logging.info("NOT Authenticated")
self.authenticate(client)
client.loop_stop()
client.disconnect()
if self.info:
if debug:
logging.info(f"Information received: {self.info}")
return json.loads(self.info)
else:
logging.error("Failed to get information")
return None
# Send a command to the TV
def send_command(self, publish_topic, command = None):
if debug:
logging.info("Sending command to TV...")
client = self.create_mqtt_client(client_id=self.client_id, certfile=certfile, keyfile=keyfile, username=self.username, password=self.accesstoken)
if debug:
logging.info("No callback message needed for command sending.")
client.connect_async(tv_ip, 36669, 60)
client.loop_start()
self.wait_for_message(lambda: not client.connected_flag)
if debug:
logging.info(f"Publishing {command} command to {publish_topic}")
client.publish(publish_topic, command)
if debug:
logging.info("Command sent.")
client.loop_stop()
client.disconnect()
# Get the current state of the TV
def get_tv_state(self):
if debug:
logging.info("Getting TV state...")
get_tv_state_subscribe = self.topicBrcsBasepath + "ui_service/state"
get_tv_state_callback = self.topicBrcsBasepath + "ui_service/state"
get_tv_state_publish = self.topicTVUIBasepath + "actions/gettvstate"
tv_state = self.get_info(get_tv_state_callback, get_tv_state_subscribe, get_tv_state_publish)
return tv_state
# Get the source list of the TV
def get_source_list(self):
if debug:
logging.info("Getting source list...")
get_source_list_callback = self.topicMobiBasepath + "ui_service/data/sourcelist"
get_source_list_subscribe = self.topicMobiBasepath + "ui_service/data/sourcelist"
get_source_list_publish = self.topicTVUIBasepath + "actions/sourcelist"
source_list = self.get_info(get_source_list_callback, get_source_list_subscribe, get_source_list_publish)
return source_list
# Get the volume of the TV
def get_volume(self):
if debug:
logging.info("Getting volume...")
get_volume_callback = self.topicBrcsBasepath + "platform_service/actions/volumechange"
get_volume_subscribe = self.topicBrcsBasepath + "platform_service/actions/volumechange"
get_volume_publish = self.topicTVPSBasepath + "actions/getvolume"
volume = self.get_info(get_volume_callback, get_volume_subscribe, get_volume_publish)
return volume
# Get the app list of the TV
def get_app_list(self):
if debug:
logging.info("Getting app list...")
get_app_list_callback = self.topicMobiBasepath + "ui_service/data/applist"
get_app_list_subscribe = self.topicMobiBasepath + "ui_service/data/applist"
get_app_list_publish = self.topicTVUIBasepath + "actions/applist"
app_list = self.get_info(get_app_list_callback, get_app_list_subscribe, get_app_list_publish)
return app_list
# Power Cycle the TV
def power_cycle_tv(self):
if debug:
logging.info("Power cycling the TV...")
power_cycle_command = "KEY_POWER"
power_cycle_publish = self.topicRemoBasepath + "actions/sendkey"
self.send_command(power_cycle_publish, power_cycle_command)
return True
# Change the source of the TV
def change_source(self, source_id):
if debug:
logging.info(f"Changing source to {source_id}...")
tv_state = auth.get_tv_state()
if tv_state:
if "statetype" in tv_state and tv_state["statetype"] == "fake_sleep_0":
logging.info("TV is off. Not changing source...")
return False
else:
logging.info("TV is on. Changing source...")
change_source_publish = self.topicTVUIBasepath + "actions/changesource"
change_source_command = json.dumps({"sourceid": source_id})
self.send_command(change_source_publish, change_source_command)
return True
else:
logging.error("Failed to get TV state.")
return False
# Change the volume of the TV
def change_volume(self, volume):
if debug:
logging.info(f"Changing volume to {volume}...")
tv_state = auth.get_tv_state()
if tv_state:
if "statetype" in tv_state and tv_state["statetype"] == "fake_sleep_0":
logging.info("TV is off. Not changing volume...")
return False
else:
change_volume_publish = self.topicTVPSBasepath + "actions/changevolume"
change_volume_command = str(volume)
self.send_command(change_volume_publish, change_volume_command)
return True
else:
logging.error("Failed to get TV state.")
return False
# Launch an app on the TV
def launch_app(self, app_name, app_list = None):
if debug:
logging.info(f"Launching app {app_name}...")
if not app_list:
app_list = self.get_app_list()
if not app_list:
print("Failed to get app list.")
return False
app_id = None
app_url = None
for app in app_list:
if app["name"].upper() == app_name.upper():
app_id = app["appId"]
app_url = app["url"]
app_name = app["name"]
break
if app_id is None or app_url is None:
print("Failed to find app in app list.")
return False
tv_state = auth.get_tv_state()
if tv_state:
if "statetype" in tv_state and tv_state["statetype"] == "fake_sleep_0":
logging.info("TV is off. Not launching app...")
return False
else:
launch_app_publish = self.topicTVUIBasepath + "actions/launchapp"
launch_app_command = json.dumps({"appId": app_id, "name": app_name, "url": app_url})
self.send_command(launch_app_publish, launch_app_command)
return True
else:
logging.error("Failed to get TV state.")
return False
# Show the help message
def show_help(self):
print("1. Get TV State, from command line: --action getstate")
print("2. Power Cycle TV, from command line: --action powercycle (of use poweron or poweroff which first gets the state)")
print("3. Get Source List, from command line: --action getsourcelist")
print("4. Change Source, from command line: --action changesource --parameter <source_name>")
print("5. Get Volume, from command line: --action getvolume")
print("6. Change Volume, from command line: --action changevolume --parameter <volume>")
print("7. Get App List, from command line: --action getapplist")
print("8. Launch App, from command line: --action launchapp --parameter <app_name>\n")
print("C. Show Credentials, from command line: --action showcredentials")
print("R. Refresh Token, from command line: --action refreshtoken")
print("F. Force Refresh Token, from command line: --action forcerefresh")
print("S. Save Credentials, from command line: --action save")
print("L. Load Credentials, from command line: --action load")
print("A. Authenticate, from command line: --action authenticate\n")
print("H. Help, from command line: --action help\n")
print("0. Exit, from command line: --action exit\n")
# Main function
if __name__ == "__main__":
# Initialize the TVAuthenticator class
auth = TVAuthenticator()
# Parse command line arguments
parser = argparse.ArgumentParser(description='Hisense TV Control')
parser.add_argument('--action', type=str, help='Action to perform', choices=['getstate', 'powercycle', 'poweron', 'poweroff', 'getsourcelist', 'changesource', 'getvolume', 'changevolume', 'getapplist', 'launchapp', 'showcredentials', 'forcerefresh', 'refreshtoken', 'save', 'load', 'authenticate', 'help', 'exit'])
parser.add_argument('--parameter', type=str, help='Parameter for the action')
parser.add_argument('--debug', type=str, help='Set debug mode on or off (True/False)', choices=['True', 'False'], default='False')
args = parser.parse_args()
# Set debug mode
if args.debug == "True":
debug = True
elif args.debug == "False":
debug = False
# Load or generate credentials
auth.load_or_generate_creds()
# Show the credentials
auth.show_credentials()
# Define hashes and topic paths
auth.define_topic_paths()
# Refresh the token if needed
auth.check_and_refresh_token()
# Main loop
action = None
while action != "0":
if not args.action:
print("\nChoose an action:\n")
auth.show_help()
action = input("Action: ")
else:
action = args.action
action = action.upper()
if debug:
logging.info(f"Action: {action}")
if action == "1" or action == "GETSTATE":
# Get TV State
tv_state = auth.get_tv_state()
if tv_state:
print(f"TV State: \n{json.dumps(tv_state, indent=4)}")
else:
print("Failed to get TV state.")
elif action == "2" or action == "POWERCYCLE":
# Power cycle the TV
command_sent = auth.power_cycle_tv()
if command_sent:
print("Power cycle command sent.")
else:
print("Failed to send power cycle command.")
elif action == "3" or action == "GETSOURCELIST":
# Get source list
source_list = auth.get_source_list()
if source_list:
print(f"Source list: \n{json.dumps(source_list, indent=4)}")
else:
print("Failed to get source list.")
elif action == "4" or action == "CHANGESOURCE":
# Change Source
if not args.parameter:
source_list = auth.get_source_list()
if source_list:
print(f"Source list: \n{json.dumps(source_list, indent=4)}")
source_id = input("Enter the source ID: ")
else:
print("Failed to get source list.")
else:
source_id = args.parameter
source_changed = auth.change_source(source_id)
if source_changed:
print(f"Source changed to {source_id}")
else:
print("Failed to change source.")
elif action == "5" or action == "GETVOLUME":
# Get Volume
volume = auth.get_volume()
if volume:
print(f"Volume: \n{json.dumps(volume, indent=4)}")
else:
print("Failed to get volume.")
elif action == "6" or action == "CHANGEVOLUME":
# Change Volume
if not args.parameter:
volume = get_volume()
if volume:
print(f"Volume: \n{json.dumps(volume, indent=4)}")
volume = input("Enter the volume level (0-100): ")
else:
print("Failed to get volume.")
else:
volume = args.parameter
volume_changed = auth.change_volume(volume)
if volume_changed:
print(f"Volume changed to {volume}")
else:
print("Failed to change volume.")
elif action == "7" or action == "GETAPPLIST":
# Get App List
app_list = auth.get_app_list()
if app_list:
print(f"App list: \n{json.dumps(app_list, indent=4)}")
print("\nApps:\n")
for app in app_list:
print(app['name'])
else:
print("Failed to get app list.")
elif action == "8" or action == "LAUNCHAPP":
# Launch App
app_list = None
if not args.parameter:
app_list = auth.get_app_list()
if app_list:
print(f"App list: \n{json.dumps(app_list, indent=4)}")
app_name = input("Enter the app name to launch: ")
else:
print("Failed to get app list.")
else:
app_name = args.parameter
app_launched = auth.launch_app(app_name, app_list)
if app_launched:
print(f"App launched: {app_name}")
else:
print("Failed to launch app.")
elif action == "C" or action == "SHOWCREDENTIALS":
# Show credentials
auth.show_credentials()
elif action == "F" or action == "FORCEREFRESH":
# Refresh token
auth.refresh_token()
print("Token refreshed.")
elif action == "R" or action == "REFRESHTOKEN":
# Refresh token
auth.check_and_refresh_token()
print("Token refreshed.")
elif action == "S" or action == "SAVE":
# Save credentials
auth.write_token_to_creds_file()
print("Credentials saved.")
elif action == "L" or action == "LOAD":
# Load credentials
auth.load_or_generate_creds()
print("Credentials loaded.")
elif action == "A" or action == "AUTHENTICATE":
# Delete credentials
auth.generate_creds()
print("Credentials deleted.")
elif action == "H" or action == "HELP":
# Help
auth.show_help()
elif action == "0" or action == "EXIT":
if debug:
logging.info("Exiting...")
# Exit
break
elif action == "POWERON":
# Power on the TV
print("Powering on the TV...")
# Get TV State
tv_state = auth.get_tv_state()
if tv_state:
if "statetype" in tv_state and tv_state["statetype"] == "fake_sleep_0":
# Power cycle the TV
command_sent = auth.power_cycle_tv()
if command_sent:
print("Power cycle command sent.")
else:
print("Failed to send power cycle command.")
else:
print("TV is already on.")
else:
print("Failed to get TV state.")
elif action == "POWEROFF":
# Power off the TV
print("Powering off the TV...")
# Get TV State
tv_state = auth.get_tv_state()
if tv_state:
if "statetype" in tv_state and tv_state["statetype"] != "fake_sleep_0":
# Power cycle the TV
command_sent = auth.power_cycle_tv()
if command_sent:
print("Power cycle command sent.")
else:
print("Failed to send power cycle command.")
else:
print("TV is already off.")
else:
print("Failed to get TV state.")
# Exit when passed from command line
if args.action:
if debug:
logging.info("Command from command line done - Exiting...")
break |
Here it is: |
Out of curiosity, do you also get the "Reason: 7" on_disconnect error? After I power off and on the TV I need to reauthenticate (with 4-digit number) to get a new token, it doesn't refresh anymore.... |
In my case it looks like it all started after it didn't get a response when getting the state:
So my script requested for a state callback at 00:29 and 1.5 hours later it decided to connect again. Then it disconnected with reason 16 and after 12 hours it started to generate reason 7...
Got it from here: |
@nikagl Fantastic work with the script, much better than what I came up with! So far no disconnects, guess I'll have to leave it running for a few days. Don't know if that could be the cause in your case, but we noticed there seems to be some sort of blacklisting happening when a lot of requests fail for a given identifier. In that case, you should be able to just generate a new identifier using a different MAC address. Also, quick PSA: After I recently replaced my router, the TV got internet access for a brief period and ran a software update, but the authentication mechanism seems to be unchanged, I was able to generate a new set of credentials using the scripts from this thread. |
I run your script @nikagl since it came out with a cron job every 8 hours to refresh token and it runs fine. Never had to reauthenticate again. Thank you very much! Sadly there seems to be no possibility to get topic like /remoteapp/tv/platform_service/ /actions/picturesetting action: { "action": "get_menu_info" } to work or? Had a script to change gamma while watching on my old 55U8QF... @LeoKlaus stayed on first N0519 after reading here in the thread that mqtt should no longer work on higher ones. But it seems to work after all. Which firmware is yet installed on your 55U8KQ? |
@derdershat I have the 55U8KQ on V0000.07.05T.N1222. So far, everything seems to work fine, including the token refresh over night. I'll report back if anything breaks. Slightly off-topic: They have decreased the size of the volume indicator on screen when using external speakers (an eARC soundbar in my case), but only by a tiny bit. There's still a big ugly indicator every time you change the volume... |
Has anyone done sendtext mqtt for vidaa yet? |
I have a question regarding Hisense TV with Vidaa OS discovery in the network. It's possible to find Vidaa TV with UPnP via urn:schemas-upnp-org:device:MediaRenderer:1 header by looking into modelDescription XML tag. Is there any better solution? |
You guys are the real MVP's. I have found this thread extremely useful. I can read and run the python script but I am not a python dev. I decided to port the script to c# if anyone is interested. @maksymsiruk you should be able to find the TV via Upnp. I can find mine , it shows like this. To get details of the TV you can append to the url /rendererdevicedesc.xml so its: http://192.168.1.xxx:38400/MediaServer/rendererdevicedesc.xml This will give you the mac, model , friendly name etc. Hope this helps. |
hi Everyone is ther a guide to use this script? |
Not really, but this was what helped me most: Based on this I created the following (python) tool: This has a menu interface and allows to run all commands from command line... |
is anyone able to create a fork of the https://github.com/sehaas/ha_hisense_tv repo and integrade the script from @nikagl? |
I managed to get the script working with the shell integration. With template integration I configured some buttons to turn the tv on/off or change the source. But everthing is stateless, so not that good, but I can turn on/off the TV throught alexa, and that was all I wanted. |
I bought a 55U7KQ that came out this year. It looks like Hisense did some changes to MQTT again, I am getting “Connection refused: Identifier rejected” over MQTT Explorer as well as within the mqtt logs on my raspberry although I am using my phone’s MAC address as the client ID.
Also on this newer model I can not use the Remotenow app, only the VIDAA app. Any help would be highy appreciated.
The text was updated successfully, but these errors were encountered: