Skip to content

Commit

Permalink
Add WSManFaultError (#382)
Browse files Browse the repository at this point in the history
Adds the new exception WSManFaultError which inherits from WinRMError
and will be raised when receiving a WSManFault from the server. This new
exception type contains detailed information that could be relevant to
the caller when trying to handle the specific exception.
  • Loading branch information
jborean93 authored Jun 6, 2024
1 parent fbb05e8 commit e8bb574
Show file tree
Hide file tree
Showing 4 changed files with 368 additions and 28 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
### Version 0.5.0
- Dropped Python 2.7, 3.6, and 3.7 support, minimum supported version is 3.8
- Migrate to PEP 517 compliant build with a `pyproject.toml` file
- Added type annotation
- Added `WSManFaultError` which contains WSManFault specific information when receiving a 500 WSMan fault response
- This contains pre-parsed values like the code, subcode, wsman fault code, wmi error code, and raw response
- It can be used by the caller to implement fallback behaviour based on specific error codes

### Version 0.4.3
- Fix invalid regex escape sequences.
Expand Down
58 changes: 58 additions & 0 deletions winrm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,64 @@ class WinRMError(Exception):
code = 500


class WSManFaultError(WinRMError):
"""WSMan Fault Error.
Exception that is raised when receiving a WSMan fault message. It
contains the raw response as well as the fault details parsed from the
response.
The wsman_fault_code is returned by the Microsoft WSMan server rather than
the WSMan protocol error code strings. The wmierror_code can contain more
fatal service error codes returned as a MSFT_WmiError object, for example
quota violations.
@param int code: The HTTP status code of the response.
@param str message: The error message.
@param str response: The raw WSMan response text.
@param str reason: The WSMan fault reason.
@param string fault_code: The WSMan fault code.
@param string fault_subcode: The WSMan fault subcode.
@param int wsman_fault_code: The MS WSManFault specific code.
@param int wmierror_code: The MS WMI error code.
"""

def __init__(
self,
code: int,
message: str,
response: str,
reason: str,
fault_code: str | None = None,
fault_subcode: str | None = None,
wsman_fault_code: int | None = None,
wmierror_code: int | None = None,
) -> None:
self.code = code
self.response = response
self.fault_code = fault_code
self.fault_subcode = fault_subcode
self.reason = reason
self.wsman_fault_code = wsman_fault_code
self.wmierror_code = wmierror_code

# Using the dict repr is for backwards compatibility.
fault_data = {
"transport_message": message,
"http_status_code": code,
}
if wsman_fault_code is not None:
fault_data["wsmanfault_code"] = wsman_fault_code

if fault_code is not None:
fault_data["fault_code"] = fault_code

if fault_subcode is not None:
fault_data["fault_subcode"] = fault_subcode

super().__init__("{0} (extended fault data: {1})".format(reason, fault_data))


class WinRMTransportError(Exception):
"""WinRM errors specific to transport-level problems (unexpected HTTP error codes, etc)"""

Expand Down
78 changes: 50 additions & 28 deletions winrm/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,19 @@

import xmltodict

from winrm.exceptions import WinRMError, WinRMOperationTimeoutError, WinRMTransportError
from winrm.exceptions import (
WinRMError,
WinRMOperationTimeoutError,
WinRMTransportError,
WSManFaultError,
)
from winrm.transport import Transport

xmlns = {
"soapenv": "http://www.w3.org/2003/05/soap-envelope",
"soapaddr": "http://schemas.xmlsoap.org/ws/2004/08/addressing",
"wsmanfault": "http://schemas.microsoft.com/wbem/wsman/1/wsmanfault",
"wmierror": "http://schemas.dmtf.org/wbem/wscim/1/cim-schema/2/MSFT_WmiError",
}


Expand Down Expand Up @@ -247,33 +253,49 @@ def send_message(self, message: str) -> bytes:
raise ex

fault = root.find("soapenv:Body/soapenv:Fault", xmlns)
if fault is not None:
fault_data = dict(transport_message=ex.message, http_status_code=ex.code)
wsmanfault_code = fault.find("soapenv:Detail/wsmanfault:WSManFault[@Code]", xmlns)
if wsmanfault_code is not None:
fault_data["wsmanfault_code"] = wsmanfault_code.get("Code")
# convert receive timeout code to WinRMOperationTimeoutError
if fault_data["wsmanfault_code"] == "2150858793":
# TODO: this fault code is specific to the Receive operation; convert all op timeouts?
raise WinRMOperationTimeoutError()

fault_code = fault.find("soapenv:Code/soapenv:Value", xmlns)
if fault_code is not None:
fault_data["fault_code"] = fault_code.text

fault_subcode = fault.find("soapenv:Code/soapenv:Subcode/soapenv:Value", xmlns)
if fault_subcode is not None:
fault_data["fault_subcode"] = fault_subcode.text

error_message_node = fault.find("soapenv:Reason/soapenv:Text", xmlns)
if error_message_node is not None:
error_message = error_message_node.text
else:
error_message = "(no error message in fault)"

raise WinRMError("{0} (extended fault data: {1})".format(error_message, fault_data))

raise
if fault is None:
raise

wsmanfault_code_raw = fault.find("soapenv:Detail/wsmanfault:WSManFault[@Code]", xmlns)
wsmanfault_code: int | None = None
if wsmanfault_code_raw is not None:
wsmanfault_code = int(wsmanfault_code_raw.attrib["Code"])

# convert receive timeout code to WinRMOperationTimeoutError
if wsmanfault_code == 2150858793:
# TODO: this fault code is specific to the Receive operation; convert all op timeouts?
raise WinRMOperationTimeoutError()

fault_code_raw = fault.find("soapenv:Code/soapenv:Value", xmlns)
fault_code: str | None = None
if fault_code_raw is not None and fault_code_raw.text:
fault_code = fault_code_raw.text

fault_subcode_raw = fault.find("soapenv:Code/soapenv:Subcode/soapenv:Value", xmlns)
fault_subcode: str | None = None
if fault_subcode_raw is not None and fault_subcode_raw.text:
fault_subcode = fault_subcode_raw.text

error_message_node = fault.find("soapenv:Reason/soapenv:Text", xmlns)
reason: str | None = None
if error_message_node is not None:
reason = error_message_node.text

wmi_error_code_raw = fault.find("soapenv:Detail/wmierror:MSFT_WmiError/wmierror:error_Code", xmlns)
wmi_error_code: int | None = None
if wmi_error_code_raw is not None and wmi_error_code_raw.text:
wmi_error_code = int(wmi_error_code_raw.text)

raise WSManFaultError(
code=ex.code,
message=ex.message,
response=ex.response_text,
reason=reason or "(no error message in fault)",
fault_code=fault_code,
fault_subcode=fault_subcode,
wsman_fault_code=wsmanfault_code,
wmierror_code=wmi_error_code,
)

def close_shell(self, shell_id: str, close_session: bool = True) -> None:
"""
Expand Down
Loading

0 comments on commit e8bb574

Please # to comment.