diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 9ff73a7b5..f1175d808 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -5,7 +5,7 @@ default_language_version:
repos:
# Run manually in CI skipping the branch checks
- repo: https://github.com/astral-sh/ruff-pre-commit
- rev: v0.12.0
+ rev: v0.12.1
hooks:
- id: ruff
name: "Ruff check"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5d47c3971..858855752 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,9 @@
# Changelog
+## v1.7.7
+
+- Implement code quality improvements as suggested by SonarCloud via [#762](https://github.com/plugwise/python-plugwise/pull/762), [#763](https://github.com/plugwise/python-plugwise/pull/763), [#764](https://github.com/plugwise/python-plugwise/pull/764), and [#765](https://github.com/plugwise/python-plugwise/pull/765)
+
## v1.7.6
- Maintenance chores (mostly reworking Github CI Actions) backporting from efforts on Python Plugwise [USB: #264](https://github.com/plugwise/python-plugwise-usb/pull/264) after porting our progress using [USB: #263](https://github.com/plugwise/python-plugwise-usb/pull/263)
diff --git a/plugwise/__init__.py b/plugwise/__init__.py
index 9c56faa6a..47abbd837 100644
--- a/plugwise/__init__.py
+++ b/plugwise/__init__.py
@@ -5,6 +5,8 @@
from __future__ import annotations
+from typing import cast
+
from plugwise.constants import (
DEFAULT_LEGACY_TIMEOUT,
DEFAULT_PORT,
@@ -36,6 +38,7 @@
import aiohttp
from defusedxml import ElementTree as etree
+from munch import Munch
from packaging.version import Version, parse
@@ -72,16 +75,17 @@ def __init__(
self._smile_api: SmileAPI | SmileLegacyAPI
self._stretch_v2 = False
self._target_smile: str = NONE
- self.smile_hostname: str = NONE
- self.smile_hw_version: str | None = None
- self.smile_legacy = False
- self.smile_mac_address: str | None = None
- self.smile_model: str = NONE
- self.smile_model_id: str | None = None
- self.smile_name: str = NONE
- self.smile_type: str = NONE
- self.smile_version: Version = Version("0.0.0")
- self.smile_zigbee_mac_address: str | None = None
+ self.smile: Munch = Munch()
+ self.smile.hostname = NONE
+ self.smile.hw_version = None
+ self.smile.legacy = False
+ self.smile.mac_address = None
+ self.smile.model = NONE
+ self.smile.model_id = None
+ self.smile.name = NONE
+ self.smile.type = NONE
+ self.smile.version = Version("0.0.0")
+ self.smile.zigbee_mac_address = None
@property
def cooling_present(self) -> bool:
@@ -109,7 +113,7 @@ def reboot(self) -> bool:
All non-legacy devices support gateway-rebooting.
"""
- return not self.smile_legacy
+ return not self.smile.legacy
async def connect(self) -> Version:
"""Connect to the Plugwise Gateway and determine its name, type, version, and other data."""
@@ -158,16 +162,9 @@ async def connect(self) -> Version:
self._opentherm_device,
self._request,
self._schedule_old_states,
- self.smile_hostname,
- self.smile_hw_version,
- self.smile_mac_address,
- self.smile_model,
- self.smile_model_id,
- self.smile_name,
- self.smile_type,
- self.smile_version,
+ self.smile,
)
- if not self.smile_legacy
+ if not self.smile.legacy
else SmileLegacyAPI(
self._is_thermostat,
self._loc_data,
@@ -176,21 +173,14 @@ async def connect(self) -> Version:
self._request,
self._stretch_v2,
self._target_smile,
- self.smile_hostname,
- self.smile_hw_version,
- self.smile_mac_address,
- self.smile_model,
- self.smile_name,
- self.smile_type,
- self.smile_version,
- self.smile_zigbee_mac_address,
+ self.smile,
)
)
# Update all endpoints on first connect
await self._smile_api.full_xml_update()
- return self.smile_version
+ return cast(Version, self.smile.version)
async def _smile_detect(
self, result: etree.Element, dsmrmain: etree.Element
@@ -203,15 +193,17 @@ async def _smile_detect(
if (gateway := result.find("./gateway")) is not None:
if (v_model := gateway.find("vendor_model")) is not None:
model = v_model.text
- self.smile_version = parse(gateway.find("firmware_version").text)
- self.smile_hw_version = gateway.find("hardware_version").text
- self.smile_hostname = gateway.find("hostname").text
- self.smile_mac_address = gateway.find("mac_address").text
- self.smile_model_id = gateway.find("vendor_model").text
+ self.smile.version = parse(gateway.find("firmware_version").text)
+ self.smile.hw_version = gateway.find("hardware_version").text
+ self.smile.hostname = gateway.find("hostname").text
+ self.smile.mac_address = gateway.find("mac_address").text
+ self.smile.model_id = gateway.find("vendor_model").text
else:
model = await self._smile_detect_legacy(result, dsmrmain, model)
- if model == "Unknown" or self.smile_version is None: # pragma: no cover
+ if model == "Unknown" or self.smile.version == Version(
+ "0.0.0"
+ ): # pragma: no cover
# Corner case check
LOGGER.error(
"Unable to find model or version information, please create"
@@ -219,7 +211,7 @@ async def _smile_detect(
)
raise UnsupportedDeviceError
- version_major = str(self.smile_version.major)
+ version_major = str(self.smile.version.major)
self._target_smile = f"{model}_v{version_major}"
LOGGER.debug("Plugwise identified as %s", self._target_smile)
if self._target_smile not in SMILES:
@@ -230,7 +222,7 @@ async def _smile_detect(
)
raise UnsupportedDeviceError
- if not self.smile_legacy:
+ if not self.smile.legacy:
self._timeout = DEFAULT_TIMEOUT
if self._target_smile in ("smile_open_therm_v2", "smile_thermo_v3"):
@@ -240,14 +232,14 @@ async def _smile_detect(
) # pragma: no cover
raise UnsupportedDeviceError # pragma: no cover
- self.smile_model = "Gateway"
- self.smile_name = SMILES[self._target_smile].smile_name
- self.smile_type = SMILES[self._target_smile].smile_type
+ self.smile.model = "Gateway"
+ self.smile.name = SMILES[self._target_smile].smile_name
+ self.smile.type = SMILES[self._target_smile].smile_type
- if self.smile_type == "stretch":
+ if self.smile.type == "stretch":
self._stretch_v2 = int(version_major) == 2
- if self.smile_type == "thermostat":
+ if self.smile.type == "thermostat":
self._is_thermostat = True
# For Adam, Anna, determine the system capabilities:
# Find the connected heating/cooling device (heater_central),
@@ -275,13 +267,13 @@ async def _smile_detect_legacy(
return_model = model
# Stretch: find the MAC of the zigbee master_controller (= Stick)
if (network := result.find("./module/protocols/master_controller")) is not None:
- self.smile_zigbee_mac_address = network.find("mac_address").text
+ self.smile.zigbee_mac_address = network.find("mac_address").text
# Find the active MAC in case there is an orphaned Stick
if zb_networks := result.findall("./network"):
for zb_network in zb_networks:
if zb_network.find("./nodes/network_router") is not None:
network = zb_network.find("./master_controller")
- self.smile_zigbee_mac_address = network.find("mac_address").text
+ self.smile.zigbee_mac_address = network.find("mac_address").text
# Legacy Anna or Stretch:
if (
@@ -289,22 +281,22 @@ async def _smile_detect_legacy(
or network is not None
):
system = await self._request(SYSTEM)
- self.smile_version = parse(system.find("./gateway/firmware").text)
+ self.smile.version = parse(system.find("./gateway/firmware").text)
return_model = str(system.find("./gateway/product").text)
- self.smile_hostname = system.find("./gateway/hostname").text
+ self.smile.hostname = system.find("./gateway/hostname").text
# If wlan0 contains data it's active, eth0 should be checked last as is preferred
for network in ("wlan0", "eth0"):
locator = f"./{network}/mac"
if (net_locator := system.find(locator)) is not None:
- self.smile_mac_address = net_locator.text
+ self.smile.mac_address = net_locator.text
# P1 legacy:
elif dsmrmain is not None:
status = await self._request(STATUS)
- self.smile_version = parse(status.find("./system/version").text)
+ self.smile.version = parse(status.find("./system/version").text)
return_model = str(status.find("./system/product").text)
- self.smile_hostname = status.find("./network/hostname").text
- self.smile_mac_address = status.find("./network/mac_address").text
+ self.smile.hostname = status.find("./network/hostname").text
+ self.smile.mac_address = status.find("./network/mac_address").text
else: # pragma: no cover
# No cornercase, just end of the line
LOGGER.error(
@@ -313,7 +305,7 @@ async def _smile_detect_legacy(
)
raise ResponseError
- self.smile_legacy = True
+ self.smile.legacy = True
return return_model
async def async_update(self) -> dict[str, GwEntityData]:
diff --git a/plugwise/common.py b/plugwise/common.py
index c261eeb4b..9534a2263 100644
--- a/plugwise/common.py
+++ b/plugwise/common.py
@@ -10,6 +10,7 @@
from plugwise.constants import (
ANNA,
NONE,
+ PRIORITY_DEVICE_CLASSES,
SPECIAL_PLUG_TYPES,
SWITCH_GROUP_TYPES,
ApplianceType,
@@ -55,17 +56,16 @@ def __init__(self) -> None:
self._heater_id: str = NONE
self._on_off_device: bool
self.gw_entities: dict[str, GwEntityData] = {}
- self.smile_name: str
- self.smile_type: str
+ self.smile: Munch
@property
def heater_id(self) -> str:
"""Return the heater-id."""
return self._heater_id
- def smile(self, name: str) -> bool:
+ def check_name(self, name: str) -> bool:
"""Helper-function checking the smile-name."""
- return self.smile_name == name
+ return bool(self.smile.name == name)
def _appl_heater_central_info(
self,
@@ -153,6 +153,16 @@ def _create_gw_entities(self, appl: Munch) -> None:
self.gw_entities[appl.entity_id][appl_key] = value
self._count += 1
+ def _reorder_devices(self) -> None:
+ """Place the gateway and optional heater_central devices as 1st and 2nd."""
+ reordered = {}
+ for dev_class in PRIORITY_DEVICE_CLASSES:
+ for entity_id, entity in dict(self.gw_entities).items():
+ if entity["dev_class"] == dev_class:
+ reordered[entity_id] = self.gw_entities.pop(entity_id)
+ break
+ self.gw_entities = {**reordered, **self.gw_entities}
+
def _entity_switching_group(self, entity: GwEntityData, data: GwEntityData) -> None:
"""Helper-function for _get_device_zone_data().
@@ -173,7 +183,7 @@ def _get_group_switches(self) -> dict[str, GwEntityData]:
"""
switch_groups: dict[str, GwEntityData] = {}
# P1 and Anna don't have switchgroups
- if self.smile_type == "power" or self.smile(ANNA):
+ if self.smile.type == "power" or self.check_name(ANNA):
return switch_groups
for group in self._domain_objects.findall("./group"):
diff --git a/plugwise/constants.py b/plugwise/constants.py
index 4e1becd56..4710229e8 100644
--- a/plugwise/constants.py
+++ b/plugwise/constants.py
@@ -86,7 +86,7 @@
MODULE_LOCATOR: Final = "./logs/point_log/*[@id]"
NONE: Final = "None"
OFF: Final = "off"
-PRIORITY_DEVICE_CLASSES = ("heater_central", "gateway")
+PRIORITY_DEVICE_CLASSES = ("gateway", "heater_central")
# XML data paths
APPLIANCES: Final = "/core/appliances"
diff --git a/plugwise/data.py b/plugwise/data.py
index 97ec5b51a..5fafb14ff 100644
--- a/plugwise/data.py
+++ b/plugwise/data.py
@@ -35,7 +35,7 @@ def _all_entity_data(self) -> None:
Collect data for each entity and add to self.gw_entities.
"""
self._update_gw_entities()
- if self.smile(ADAM):
+ if self.check_name(ADAM):
self._update_zones()
self.gw_entities.update(self._zones)
@@ -86,7 +86,7 @@ def _detect_low_batteries(self) -> list[str]:
mac_pattern = re.compile(r"(?:[0-9A-F]{2}){8}")
matches = ["Battery", "below"]
if self._notifications:
- for msg_id, notification in list(self._notifications.items()):
+ for msg_id, notification in self._notifications.copy().items():
mac_address: str | None = None
message: str | None = notification.get("message")
warning: str | None = notification.get("warning")
@@ -111,7 +111,7 @@ def _add_or_update_notifications(
"""Helper-function adding or updating the Plugwise notifications."""
if (
entity_id == self._gateway_id
- and (self._is_thermostat or self.smile_type == "power")
+ and (self._is_thermostat or self.smile.type == "power")
) or (
"binary_sensors" in entity
and "plugwise_notification" in entity["binary_sensors"]
@@ -124,7 +124,7 @@ def _update_for_cooling(self, entity: GwEntityData) -> None:
"""Helper-function for adding/updating various cooling-related values."""
# For Anna and heating + cooling, replace setpoint with setpoint_high/_low
if (
- self.smile(ANNA)
+ self.check_name(ANNA)
and self._cooling_present
and entity["dev_class"] == "thermostat"
):
@@ -194,11 +194,11 @@ def _get_entity_data(self, entity_id: str) -> GwEntityData:
# Switching groups data
self._entity_switching_group(entity, data)
# Adam data
- if self.smile(ADAM):
+ if self.check_name(ADAM):
self._get_adam_data(entity, data)
# Thermostat data for Anna (presets, temperatures etc)
- if self.smile(ANNA) and entity["dev_class"] == "thermostat":
+ if self.check_name(ANNA) and entity["dev_class"] == "thermostat":
self._climate_data(entity_id, entity, data)
self._get_anna_control_state(data)
@@ -232,12 +232,12 @@ def _get_adam_data(self, entity: GwEntityData, data: GwEntityData) -> None:
if self._on_off_device and isinstance(self._heating_valves(), int):
data["binary_sensors"]["heating_state"] = self._heating_valves() != 0
# Add cooling_enabled binary_sensor
- if "binary_sensors" in data:
- if (
- "cooling_enabled" not in data["binary_sensors"]
- and self._cooling_present
- ):
- data["binary_sensors"]["cooling_enabled"] = self._cooling_enabled
+ if (
+ "binary_sensors" in data
+ and "cooling_enabled" not in data["binary_sensors"]
+ and self._cooling_present
+ ):
+ data["binary_sensors"]["cooling_enabled"] = self._cooling_enabled
# Show the allowed regulation_modes and gateway_modes
if entity["dev_class"] == "gateway":
diff --git a/plugwise/helper.py b/plugwise/helper.py
index 87c4e986e..16ef1f0a3 100644
--- a/plugwise/helper.py
+++ b/plugwise/helper.py
@@ -28,7 +28,6 @@
NONE,
OFF,
P1_MEASUREMENTS,
- PRIORITY_DEVICE_CLASSES,
TEMP_CELSIUS,
THERMOSTAT_CLASSES,
TOGGLES,
@@ -85,11 +84,7 @@ def __init__(self) -> None:
self._gateway_id: str = NONE
self._zones: dict[str, GwEntityData]
self.gw_entities: dict[str, GwEntityData]
- self.smile_hw_version: str | None
- self.smile_mac_address: str | None
- self.smile_model: str
- self.smile_model_id: str | None
- self.smile_version: version.Version
+ self.smile: Munch = Munch()
@property
def gateway_id(self) -> str:
@@ -160,11 +155,11 @@ def _all_appliances(self) -> None:
self._create_gw_entities(appl)
- if self.smile_type == "power":
+ if self.smile.type == "power":
self._get_p1_smartmeter_info()
# Sort the gw_entities
- self._sort_gw_entities()
+ self._reorder_devices()
def _get_p1_smartmeter_info(self) -> None:
"""For P1 collect the connected SmartMeter info from the Home/building location.
@@ -197,18 +192,6 @@ def _get_p1_smartmeter_info(self) -> None:
self._create_gw_entities(appl)
- def _sort_gw_entities(self) -> None:
- """Place the gateway and optional heater_central entities as 1st and 2nd."""
- for dev_class in PRIORITY_DEVICE_CLASSES:
- for entity_id, entity in dict(self.gw_entities).items():
- if entity["dev_class"] == dev_class:
- priority_entity = entity
- self.gw_entities.pop(entity_id)
- other_entities = self.gw_entities
- priority_entities = {entity_id: priority_entity}
- self.gw_entities = {**priority_entities, **other_entities}
- break
-
def _all_locations(self) -> None:
"""Collect all locations."""
loc = Munch()
@@ -268,16 +251,16 @@ def _appliance_info_finder(self, appl: Munch, appliance: etree.Element) -> Munch
def _appl_gateway_info(self, appl: Munch, appliance: etree.Element) -> Munch:
"""Helper-function for _appliance_info_finder()."""
self._gateway_id = appliance.attrib["id"]
- appl.firmware = str(self.smile_version)
- appl.hardware = self.smile_hw_version
- appl.mac = self.smile_mac_address
- appl.model = self.smile_model
- appl.model_id = self.smile_model_id
- appl.name = self.smile_name
+ appl.firmware = str(self.smile.version)
+ appl.hardware = self.smile.hw_version
+ appl.mac = self.smile.mac_address
+ appl.model = self.smile.model
+ appl.model_id = self.smile.model_id
+ appl.name = self.smile.name
appl.vendor_name = "Plugwise"
# Adam: collect the ZigBee MAC address of the Smile
- if self.smile(ADAM):
+ if self.check_name(ADAM):
if (
found := self._domain_objects.find(".//protocols/zig_bee_coordinator")
) is not None:
@@ -346,7 +329,7 @@ def _get_measurement_data(self, entity_id: str) -> GwEntityData:
# Get P1 smartmeter data from LOCATIONS
entity = self.gw_entities[entity_id]
# !! DON'T CHANGE below two if-lines, will break stuff !!
- if self.smile_type == "power":
+ if self.smile.type == "power":
if entity["dev_class"] == "smartmeter":
data.update(self._power_data_from_location())
@@ -383,7 +366,7 @@ def _get_measurement_data(self, entity_id: str) -> GwEntityData:
data.pop("c_heating_state")
self._count -= 1
- if self._is_thermostat and self.smile(ANNA):
+ if self._is_thermostat and self.check_name(ANNA):
self._update_anna_cooling(entity_id, data)
self._cleanup_data(data)
@@ -484,7 +467,7 @@ def _get_actuator_functionalities(
item == "thermostat"
and (
entity["dev_class"] != "climate"
- if self.smile(ADAM)
+ if self.check_name(ADAM)
else entity["dev_class"] != "thermostat"
)
):
@@ -539,7 +522,7 @@ def _get_actuator_mode(
Collect the requested gateway mode.
"""
- if not (self.smile(ADAM) and entity_id == self._gateway_id):
+ if not (self.check_name(ADAM) and entity_id == self._gateway_id):
return None
if (search := search_actuator_functionalities(appliance, key)) is not None:
@@ -605,10 +588,10 @@ def _process_on_off_device_c_heating_state(self, data: GwEntityData) -> None:
Solution for Core issue #81839.
"""
- if self.smile(ANNA):
+ if self.check_name(ANNA):
data["binary_sensors"]["heating_state"] = data["c_heating_state"]
- if self.smile(ADAM):
+ if self.check_name(ADAM):
# First count when not present, then create and init to False.
# When present init to False
if "heating_state" not in data["binary_sensors"]:
@@ -723,7 +706,7 @@ def _scan_thermostats(self) -> None:
for entity_id, entity in self.gw_entities.items():
self._rank_thermostat(thermo_matching, loc_id, entity_id, entity)
- for loc_id, loc_data in list(self._thermo_locs.items()):
+ for loc_id, loc_data in self._thermo_locs.items():
if loc_data["primary_prio"] != 0:
self._zones[loc_id] = {
"dev_class": "climate",
@@ -799,8 +782,8 @@ def _control_state(self, data: GwEntityData, loc_id: str) -> str | bool:
# Handle missing control_state in regulation_mode off for firmware >= 3.2.0 (issue #776)
# In newer firmware versions, default to "off" when control_state is not present
- if self.smile_version != version.Version("0.0.0"):
- if self.smile_version >= version.parse("3.2.0"):
+ if self.smile.version != version.Version("0.0.0"):
+ if self.smile.version >= version.parse("3.2.0"):
return "off"
# Older Adam firmware does not have the control_state xml-key
diff --git a/plugwise/legacy/helper.py b/plugwise/legacy/helper.py
index e22b489e5..d5e6918ab 100644
--- a/plugwise/legacy/helper.py
+++ b/plugwise/legacy/helper.py
@@ -23,7 +23,6 @@
NONE,
OFF,
P1_LEGACY_MEASUREMENTS,
- PRIORITY_DEVICE_CLASSES,
TEMP_CELSIUS,
THERMOSTAT_CLASSES,
UOM,
@@ -47,7 +46,6 @@
# This way of importing aiohttp is because of patch/mocking in testing (aiohttp timeouts)
from defusedxml import ElementTree as etree
from munch import Munch
-from packaging.version import Version
def etree_to_dict(element: etree.Element) -> dict[str, str]:
@@ -73,10 +71,7 @@ def __init__(self) -> None:
self._modules: etree.Element
self._stretch_v2: bool
self.gw_entities: dict[str, GwEntityData] = {}
- self.smile_mac_address: str | None
- self.smile_model: str
- self.smile_version: Version
- self.smile_zigbee_mac_address: str | None
+ self.smile: Munch = Munch()
@property
def gateway_id(self) -> str:
@@ -95,7 +90,7 @@ def _all_appliances(self) -> None:
self._create_legacy_gateway()
# For legacy P1 collect the connected SmartMeter info
- if self.smile_type == "power":
+ if self.smile.type == "power":
appl = Munch()
self._p1_smartmeter_info_finder(appl)
# Legacy P1 has no more devices
@@ -140,17 +135,7 @@ def _all_appliances(self) -> None:
continue # pragma: no cover
self._create_gw_entities(appl)
-
- # Place the gateway and optional heater_central devices as 1st and 2nd
- for dev_class in PRIORITY_DEVICE_CLASSES:
- for entity_id, entity in dict(self.gw_entities).items():
- if entity["dev_class"] == dev_class:
- tmp_entity = entity
- self.gw_entities.pop(entity_id)
- cleared_dict = self.gw_entities
- add_to_front = {entity_id: tmp_entity}
- self.gw_entities = {**add_to_front, **cleared_dict}
- break
+ self._reorder_devices()
def _all_locations(self) -> None:
"""Collect all locations."""
@@ -167,13 +152,13 @@ def _all_locations(self) -> None:
loc.loc_id = location.attrib["id"]
# Filter the valid single location for P1 legacy: services not empty
locator = "./services"
- if self.smile_type == "power" and len(location.find(locator)) == 0:
+ if self.smile.type == "power" and len(location.find(locator)) == 0:
continue
if loc.name == "Home":
self._home_loc_id = loc.loc_id
# Replace location-name for P1 legacy, can contain privacy-related info
- if self.smile_type == "power":
+ if self.smile.type == "power":
loc.name = "Home"
self._home_loc_id = loc.loc_id
@@ -185,18 +170,18 @@ def _create_legacy_gateway(self) -> None:
Use the home_location or FAKE_APPL as entity id.
"""
self._gateway_id = self._home_loc_id
- if self.smile_type == "power":
+ if self.smile.type == "power":
self._gateway_id = FAKE_APPL
self.gw_entities[self._gateway_id] = {"dev_class": "gateway"}
self._count += 1
for key, value in {
- "firmware": str(self.smile_version),
+ "firmware": str(self.smile.version),
"location": self._home_loc_id,
- "mac_address": self.smile_mac_address,
- "model": self.smile_model,
- "name": self.smile_name,
- "zigbee_mac_address": self.smile_zigbee_mac_address,
+ "mac_address": self.smile.mac_address,
+ "model": self.smile.model,
+ "name": self.smile.name,
+ "zigbee_mac_address": self.smile.zigbee_mac_address,
"vendor": "Plugwise",
}.items():
if value is not None:
@@ -224,14 +209,14 @@ def _energy_entity_info_finder(self, appliance: etree, appl: Munch) -> Munch:
Collect energy entity info (Smartmeter, Circle, Stealth, etc.): firmware, model and vendor name.
"""
- if self.smile_type in ("power", "stretch"):
+ if self.smile.type in ("power", "stretch"):
locator = "./services/electricity_point_meter"
module_data = self._get_module_data(
appliance, locator, self._modules, legacy=True
)
appl.zigbee_mac = module_data["zigbee_mac_address"]
# Filter appliance without zigbee_mac, it's an orphaned device
- if appl.zigbee_mac is None and self.smile_type != "power":
+ if appl.zigbee_mac is None and self.smile.type != "power":
return None
appl.hardware = module_data["hardware_version"]
@@ -253,7 +238,7 @@ def _p1_smartmeter_info_finder(self, appl: Munch) -> None:
appl.entity_id = loc_id
appl.location = loc_id
appl.mac = None
- appl.model = self.smile_model
+ appl.model = self.smile.model
appl.model_id = None
appl.name = "P1"
appl.pwclass = "smartmeter"
@@ -272,7 +257,7 @@ def _get_measurement_data(self, entity_id: str) -> GwEntityData:
# Get P1 smartmeter data from MODULES
entity = self.gw_entities[entity_id]
# !! DON'T CHANGE below two if-lines, will break stuff !!
- if self.smile_type == "power":
+ if self.smile.type == "power":
if entity["dev_class"] == "smartmeter":
data.update(self._power_data_from_modules())
diff --git a/plugwise/legacy/smile.py b/plugwise/legacy/smile.py
index fc4f3a2c5..61f219521 100644
--- a/plugwise/legacy/smile.py
+++ b/plugwise/legacy/smile.py
@@ -27,7 +27,6 @@
from plugwise.legacy.data import SmileLegacyData
from munch import Munch
-from packaging.version import Version
class SmileLegacyAPI(SmileLegacyData):
@@ -44,14 +43,7 @@ def __init__(
_request: Callable[..., Awaitable[Any]],
_stretch_v2: bool,
_target_smile: str,
- smile_hostname: str,
- smile_hw_version: str | None,
- smile_mac_address: str | None,
- smile_model: str,
- smile_name: str,
- smile_type: str,
- smile_version: Version,
- smile_zigbee_mac_address: str | None,
+ smile: Munch,
) -> None:
"""Set the constructor for this class."""
super().__init__()
@@ -63,14 +55,7 @@ def __init__(
self._request = _request
self._stretch_v2 = _stretch_v2
self._target_smile = _target_smile
- self.smile_hostname = smile_hostname
- self.smile_hw_version = smile_hw_version
- self.smile_mac_address = smile_mac_address
- self.smile_model = smile_model
- self.smile_name = smile_name
- self.smile_type = smile_type
- self.smile_version = smile_version
- self.smile_zigbee_mac_address = smile_zigbee_mac_address
+ self.smile = smile
self._first_update = True
self._previous_day_number: str = "0"
@@ -86,7 +71,7 @@ async def full_xml_update(self) -> None:
self._locations = await self._request(LOCATIONS)
self._modules = await self._request(MODULES)
# P1 legacy has no appliances
- if self.smile_type != "power":
+ if self.smile.type != "power":
self._appliances = await self._request(APPLIANCES)
def get_all_gateway_entities(self) -> None:
diff --git a/plugwise/smile.py b/plugwise/smile.py
index 0947401d4..970181ca1 100644
--- a/plugwise/smile.py
+++ b/plugwise/smile.py
@@ -35,7 +35,27 @@
# Dict as class
from munch import Munch
-from packaging.version import Version
+
+
+def model_to_switch_items(model: str, state: str, switch: Munch) -> tuple[str, Munch]:
+ """Translate state and switch attributes based on model name.
+
+ Helper function for set_switch_state().
+ """
+ match model:
+ case "dhw_cm_switch":
+ switch.device = "toggle"
+ switch.func_type = "toggle_functionality"
+ switch.act_type = "domestic_hot_water_comfort_mode"
+ case "cooling_ena_switch":
+ switch.device = "toggle"
+ switch.func_type = "toggle_functionality"
+ switch.act_type = "cooling_enabled"
+ case "lock":
+ switch.func = "lock"
+ state = "true" if state == STATE_ON else "false"
+
+ return state, switch
class SmileAPI(SmileData):
@@ -54,14 +74,7 @@ def __init__(
_opentherm_device: bool,
_request: Callable[..., Awaitable[Any]],
_schedule_old_states: dict[str, dict[str, str]],
- smile_hostname: str | None,
- smile_hw_version: str | None,
- smile_mac_address: str | None,
- smile_model: str,
- smile_model_id: str | None,
- smile_name: str,
- smile_type: str,
- smile_version: Version,
+ smile: Munch,
) -> None:
"""Set the constructor for this class."""
super().__init__()
@@ -74,14 +87,7 @@ def __init__(
self._opentherm_device = _opentherm_device
self._request = _request
self._schedule_old_states = _schedule_old_states
- self.smile_hostname = smile_hostname
- self.smile_hw_version = smile_hw_version
- self.smile_mac_address = smile_mac_address
- self.smile_model = smile_model
- self.smile_model_id = smile_model_id
- self.smile_name = smile_name
- self.smile_type = smile_type
- self.smile_version = smile_version
+ self.smile = smile
self.therms_with_offset_func: list[str] = []
@property
@@ -107,7 +113,7 @@ def get_all_gateway_entities(self) -> None:
self.therms_with_offset_func = (
self._get_appliances_with_offset_functionality()
)
- if self.smile(ADAM):
+ if self.check_name(ADAM):
self._scan_thermostats()
if group_data := self._get_group_switches():
@@ -340,7 +346,7 @@ async def set_schedule_state(
template = (
''
)
- if self.smile(ANNA):
+ if self.check_name(ANNA):
locator = f'.//*[@id="{schedule_rule_id}"]/template'
template_id = self._domain_objects.find(locator).attrib["id"]
template = f''
@@ -396,20 +402,7 @@ async def set_switch_state(
switch.device = "relay"
switch.func_type = "relay_functionality"
switch.func = "state"
- if model == "dhw_cm_switch":
- switch.device = "toggle"
- switch.func_type = "toggle_functionality"
- switch.act_type = "domestic_hot_water_comfort_mode"
-
- if model == "cooling_ena_switch":
- switch.device = "toggle"
- switch.func_type = "toggle_functionality"
- switch.act_type = "cooling_enabled"
-
- if model == "lock":
- switch.func = "lock"
- state = "true" if state == STATE_ON else "false"
-
+ state, switch = model_to_switch_items(model, state, switch)
data = (
f"<{switch.func_type}>"
f"<{switch.func}>{state}{switch.func}>"
@@ -476,7 +469,7 @@ async def set_temperature(self, loc_id: str, items: dict[str, float]) -> None:
if "setpoint" in items:
setpoint = items["setpoint"]
- if self.smile(ANNA) and self._cooling_present:
+ if self.check_name(ANNA) and self._cooling_present:
if "setpoint_high" not in items:
raise PlugwiseError(
"Plugwise: failed setting temperature: no valid input provided"
diff --git a/pyproject.toml b/pyproject.toml
index ede4e456a..4fc91a160 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "plugwise"
-version = "1.7.6"
+version = "1.7.7"
license = "MIT"
description = "Plugwise Smile (Adam/Anna/P1) and Stretch module for Python 3."
readme = "README.md"
@@ -25,6 +25,7 @@ maintainers = [
]
requires-python = ">=3.13"
dependencies = [
+ "aiofiles",
"aiohttp",
"defusedxml",
"munch",
diff --git a/tests/test_adam.py b/tests/test_adam.py
index 96b015ce5..46287b84a 100644
--- a/tests/test_adam.py
+++ b/tests/test_adam.py
@@ -21,9 +21,9 @@ async def test_connect_adam_plus_anna_new(self):
"""Test extended Adam (firmware 3.7) with Anna and a switch-group setup."""
self.smile_setup = "adam_plus_anna_new"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -139,7 +139,7 @@ async def test_connect_adam_plus_anna_new(self):
# Now change some data and change directory reading xml from
# emulating reading newer dataset after an update_interval
- testdata_updated = self.load_testdata(
+ testdata_updated = await self.load_testdata(
SMILE_TYPE, f"{self.smile_setup}_UPDATED_DATA"
)
self.smile_setup = "updated/adam_plus_anna_new"
@@ -167,7 +167,7 @@ async def test_connect_adam_plus_anna_new(self):
await self.disconnect(server, client)
self.smile_setup = "adam_plus_anna_new"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper(raise_timeout=True)
await self.device_test(
smile, "2023-12-17 00:00:01", testdata, skip_testing=True
@@ -190,9 +190,9 @@ async def test_connect_adam_plus_anna_new_regulation_off(self):
"""Test regultaion_mode off with control_state key missing for Adam."""
self.smile_setup = "adam_plus_anna_new_regulation_off"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
await self.device_test(smile, "2023-12-17 00:00:01", testdata)
@@ -204,9 +204,9 @@ async def test_connect_adam_zone_per_device(self):
"""Test an extensive setup of Adam with a zone per device."""
self.smile_setup = "adam_zone_per_device"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -285,9 +285,9 @@ async def test_connect_adam_multiple_devices_per_zone(self):
"""Test an extensive setup of Adam with multiple devices per zone."""
self.smile_setup = "adam_multiple_devices_per_zone"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -333,7 +333,7 @@ async def test_adam_heatpump_cooling(self):
"""Test Adam with heatpump in cooling mode and idle."""
self.smile_setup = "adam_heatpump_cooling"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
await self.device_test(smile, "2022-01-02 00:00:01", testdata)
@@ -359,9 +359,9 @@ async def test_connect_adam_onoff_cooling_fake_firmware(self):
"""Test an Adam with a fake OnOff cooling device in cooling mode."""
self.smile_setup = "adam_onoff_cooling_fake_firmware"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -382,9 +382,9 @@ async def test_connect_adam_plus_anna(self):
"""Test Adam (firmware 3.0) with Anna setup."""
self.smile_setup = "adam_plus_anna"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -432,7 +432,7 @@ async def test_adam_plus_jip(self):
"""Test Adam with Jip setup."""
self.smile_setup = "adam_jip"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
await self.device_test(smile, "2021-06-20 00:00:01", testdata)
diff --git a/tests/test_anna.py b/tests/test_anna.py
index bce21b1ab..e2210d91a 100644
--- a/tests/test_anna.py
+++ b/tests/test_anna.py
@@ -17,9 +17,9 @@ async def test_connect_anna_v4(self):
"""Test an Anna firmware 4 setup."""
self.smile_setup = "anna_v4"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -55,7 +55,7 @@ async def test_connect_anna_v4(self):
# Now change some data and change directory reading xml from
# emulating reading newer dataset after an update_interval
- testdata_updated = self.load_testdata(
+ testdata_updated = await self.load_testdata(
SMILE_TYPE, f"{self.smile_setup}_UPDATED_DATA"
)
@@ -97,9 +97,9 @@ async def test_connect_anna_v4_dhw(self):
"""Test an Anna firmware 4 setup for domestic hot water."""
self.smile_setup = "anna_v4_dhw"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -127,9 +127,9 @@ async def test_connect_anna_v4_no_tag(self):
"""Test an Anna firmware 4 setup - missing tag (issue)."""
self.smile_setup = "anna_v4_no_tag"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -155,9 +155,9 @@ async def test_connect_anna_without_boiler_fw441(self):
"""Test an Anna with firmware 4.4, without a boiler."""
self.smile_setup = "anna_without_boiler_fw441"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -183,9 +183,9 @@ async def test_connect_anna_heatpump_heating(self):
self.smile_setup = "anna_heatpump_heating"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -218,7 +218,7 @@ async def test_connect_anna_heatpump_heating(self):
# Now change some data and change directory reading xml from
# emulating reading newer dataset after an update_interval,
# set testday to Monday to force an incremental update
- testdata_updated = self.load_testdata(
+ testdata_updated = await self.load_testdata(
SMILE_TYPE, f"{self.smile_setup}_UPDATED_DATA"
)
@@ -240,9 +240,9 @@ async def test_connect_anna_heatpump_cooling(self):
"""
self.smile_setup = "anna_heatpump_cooling"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -287,9 +287,9 @@ async def test_connect_anna_heatpump_cooling_fake_firmware(self):
"""
self.smile_setup = "anna_heatpump_cooling_fake_firmware"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -312,9 +312,9 @@ async def test_connect_anna_elga_no_cooling(self):
self.smile_setup = "anna_elga_no_cooling"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -337,9 +337,9 @@ async def test_connect_anna_elga_2(self):
"""Test a 2nd Anna with Elga setup, cooling off, in idle mode (with missing outdoor temperature - solved)."""
self.smile_setup = "anna_elga_2"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -366,9 +366,9 @@ async def test_connect_anna_elga_2_schedule_off(self):
"""Test Anna with Elga setup, cooling off, in idle mode, modified to schedule off."""
self.smile_setup = "anna_elga_2_schedule_off"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
await self.device_test(smile, "2022-03-13 00:00:01", testdata)
assert (
@@ -391,9 +391,9 @@ async def test_connect_anna_elga_2_cooling(self):
"""
self.smile_setup = "anna_elga_2_cooling"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -421,7 +421,7 @@ async def test_connect_anna_elga_2_cooling(self):
assert result
# Simulate a change of season: from cooling to heating after an update_interval
- testdata_updated = self.load_testdata(
+ testdata_updated = await self.load_testdata(
SMILE_TYPE, f"{self.smile_setup}_UPDATED_DATA"
)
@@ -448,9 +448,9 @@ async def test_connect_anna_loria_heating_idle(self):
"""Test an Anna with a Loria in heating mode - state idle."""
self.smile_setup = "anna_loria_heating_idle"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -516,9 +516,9 @@ async def test_connect_anna_loria_cooling_active(self):
"""Test an Anna with a Loria in heating mode - state idle."""
self.smile_setup = "anna_loria_cooling_active"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -540,9 +540,9 @@ async def test_connect_anna_loria_driessens(self):
"""Test an Anna with a Loria in heating mode - state idle."""
self.smile_setup = "anna_loria_driessens"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
diff --git a/tests/test_init.py b/tests/test_init.py
index c80468ec6..a6aac745f 100644
--- a/tests/test_init.py
+++ b/tests/test_init.py
@@ -16,6 +16,7 @@
import pytest
# Testing
+import aiofiles
import aiohttp
from freezegun import freeze_time
from packaging import version
@@ -50,7 +51,7 @@
class TestPlugwise: # pylint: disable=attribute-defined-outside-init
"""Tests for Plugwise Smile."""
- def _write_json(self, call, data):
+ async def _write_json(self, call, data):
"""Store JSON data to per-setup files for HA component testing."""
no_fixtures = os.getenv("NO_FIXTURES") == "1"
if no_fixtures:
@@ -65,8 +66,8 @@ def _write_json(self, call, data):
if not os.path.exists(os.path.dirname(datafile)): # pragma: no cover
os.mkdir(os.path.dirname(datafile))
- with open(datafile, "w", encoding="utf-8") as fixture_file:
- fixture_file.write(
+ async with aiofiles.open(datafile, "w", encoding="utf-8") as fixture_file:
+ await fixture_file.write(
json.dumps(
data,
indent=2,
@@ -77,23 +78,24 @@ def _write_json(self, call, data):
+ "\n"
)
- def load_testdata(
+ async def load_testdata(
self, smile_type: str = "adam", smile_setup: str = "adam_zone_per_device"
):
"""Load JSON data from setup, return as object."""
path = os.path.join(
os.path.dirname(__file__), f"../tests/data/{smile_type}/{smile_setup}.json"
)
- with open(path, encoding="utf-8") as testdata_file:
- return json.load(testdata_file)
+ async with aiofiles.open(path, encoding="utf-8") as testdata_file:
+ content = await testdata_file.read()
+ return json.loads(content)
- async def setup_app(
+ def setup_app(
self,
broken=False,
- timeout=False,
- raise_timeout=False,
fail_auth=False,
+ raise_timeout=False,
stretch=False,
+ timeout_happened=False,
):
"""Create mock webserver for Smile to interface with."""
app = aiohttp.web.Application()
@@ -106,7 +108,7 @@ async def setup_app(
if broken:
app.router.add_get(CORE_DOMAIN_OBJECTS, self.smile_broken)
- elif timeout:
+ elif timeout_happened:
app.router.add_get(CORE_DOMAIN_OBJECTS, self.smile_timeout)
else:
app.router.add_get(CORE_DOMAIN_OBJECTS, self.smile_domain_objects)
@@ -130,13 +132,13 @@ async def setup_app(
return app
- async def setup_legacy_app(
+ def setup_legacy_app(
self,
broken=False,
- timeout=False,
- raise_timeout=False,
fail_auth=False,
+ raise_timeout=False,
stretch=False,
+ timeout_happened=False,
):
"""Create mock webserver for Smile to interface with."""
app = aiohttp.web.Application()
@@ -149,7 +151,7 @@ async def setup_legacy_app(
if broken:
app.router.add_get(CORE_LOCATIONS, self.smile_broken)
- elif timeout:
+ elif timeout_happened:
app.router.add_get(CORE_LOCATIONS, self.smile_timeout)
else:
app.router.add_get(CORE_LOCATIONS, self.smile_locations)
@@ -187,8 +189,8 @@ async def smile_appliances(self, request):
os.path.dirname(__file__),
f"../userdata/{self.smile_setup}/core.appliances.xml",
)
- with open(userdata, encoding="utf-8") as filedata:
- data = filedata.read()
+ async with aiofiles.open(userdata, encoding="utf-8") as filedata:
+ data = await filedata.read()
return aiohttp.web.Response(text=data)
async def smile_domain_objects(self, request):
@@ -197,8 +199,8 @@ async def smile_domain_objects(self, request):
os.path.dirname(__file__),
f"../userdata/{self.smile_setup}/core.domain_objects.xml",
)
- with open(userdata, encoding="utf-8") as filedata:
- data = filedata.read()
+ async with aiofiles.open(userdata, encoding="utf-8") as filedata:
+ data = await filedata.read()
return aiohttp.web.Response(text=data)
async def smile_locations(self, request):
@@ -207,8 +209,8 @@ async def smile_locations(self, request):
os.path.dirname(__file__),
f"../userdata/{self.smile_setup}/core.locations.xml",
)
- with open(userdata, encoding="utf-8") as filedata:
- data = filedata.read()
+ async with aiofiles.open(userdata, encoding="utf-8") as filedata:
+ data = await filedata.read()
return aiohttp.web.Response(text=data)
async def smile_modules(self, request):
@@ -217,8 +219,8 @@ async def smile_modules(self, request):
os.path.dirname(__file__),
f"../userdata/{self.smile_setup}/core.modules.xml",
)
- with open(userdata, encoding="utf-8") as filedata:
- data = filedata.read()
+ async with aiofiles.open(userdata, encoding="utf-8") as filedata:
+ data = await filedata.read()
return aiohttp.web.Response(text=data)
async def smile_status(self, request):
@@ -228,8 +230,8 @@ async def smile_status(self, request):
os.path.dirname(__file__),
f"../userdata/{self.smile_setup}/system_status_xml.xml",
)
- with open(userdata, encoding="utf-8") as filedata:
- data = filedata.read()
+ async with aiofiles.open(userdata, encoding="utf-8") as filedata:
+ data = await filedata.read()
return aiohttp.web.Response(text=data)
except OSError as exc:
raise aiohttp.web.HTTPNotFound from exc
@@ -262,12 +264,12 @@ async def smile_fail_auth(cls, request):
raise aiohttp.web.HTTPUnauthorized()
@staticmethod
- def connect_status(broken, timeout, fail_auth):
+ def connect_status(broken, fail_auth, timeout_happened):
"""Determine assumed status from settings."""
assumed_status = 200
if broken:
assumed_status = 500
- if timeout:
+ if timeout_happened:
assumed_status = 504
if fail_auth:
assumed_status = 401
@@ -275,95 +277,14 @@ def connect_status(broken, timeout, fail_auth):
async def connect(
self,
+ function,
broken=False,
- timeout=False,
- raise_timeout=False,
fail_auth=False,
- stretch=False,
- ):
- """Connect to a smile environment and perform basic asserts."""
- port = aiohttp.test_utils.unused_port()
- test_password = "".join(
- secrets.choice(string.ascii_lowercase) for _ in range(8)
- )
-
- # Happy flow
- app = await self.setup_app(broken, timeout, raise_timeout, fail_auth, stretch)
-
- server = aiohttp.test_utils.TestServer(
- app, port=port, scheme="http", host="127.0.0.1"
- )
- await server.start_server()
-
- client = aiohttp.test_utils.TestClient(server)
- websession = client.session
-
- url = f"{server.scheme}://{server.host}:{server.port}{CORE_DOMAIN_OBJECTS}"
-
- # Try/exceptpass to accommodate for Timeout of aoihttp
- try:
- resp = await websession.get(url)
- assumed_status = self.connect_status(broken, timeout, fail_auth)
- assert resp.status == assumed_status
- timeoutpass_result = False
- assert timeoutpass_result
- except Exception: # pylint: disable=broad-except
- timeoutpass_result = True
- assert timeoutpass_result
-
- if not broken and not timeout and not fail_auth:
- text = await resp.text()
- assert "xml" in text
-
- # Test lack of websession
- try:
- smile = pw_smile.Smile(
- host=server.host,
- username=pw_constants.DEFAULT_USERNAME,
- password=test_password,
- port=server.port,
- websession=None,
- )
- lack_of_websession = False
- assert lack_of_websession
- except Exception: # pylint: disable=broad-except
- lack_of_websession = True
- assert lack_of_websession
-
- smile = pw_smile.Smile(
- host=server.host,
- username=pw_constants.DEFAULT_USERNAME,
- password=test_password,
- port=server.port,
- websession=websession,
- )
-
- if not timeout:
- assert smile._timeout == 30
-
- # Connect to the smile
- smile_version = None
- try:
- smile_version = await smile.connect()
- assert smile_version is not None
- assert smile._timeout == 10
- return server, smile, client
- except (
- pw_exceptions.ConnectionFailedError,
- pw_exceptions.InvalidXMLError,
- pw_exceptions.InvalidAuthentication,
- ) as exception:
- assert smile_version is None
- await self.disconnect(server, client)
- raise exception
-
- async def connect_legacy(
- self,
- broken=False,
- timeout=False,
raise_timeout=False,
- fail_auth=False,
+ smile_timeout_value=10,
stretch=False,
+ timeout_happened=False,
+ url_part=CORE_DOMAIN_OBJECTS,
):
"""Connect to a smile environment and perform basic asserts."""
port = aiohttp.test_utils.unused_port()
@@ -372,9 +293,7 @@ async def connect_legacy(
)
# Happy flow
- app = await self.setup_legacy_app(
- broken, timeout, raise_timeout, fail_auth, stretch
- )
+ app = function(broken, fail_auth, raise_timeout, stretch, timeout_happened)
server = aiohttp.test_utils.TestServer(
app, port=port, scheme="http", host="127.0.0.1"
@@ -384,12 +303,12 @@ async def connect_legacy(
client = aiohttp.test_utils.TestClient(server)
websession = client.session
- url = f"{server.scheme}://{server.host}:{server.port}{CORE_LOCATIONS}"
+ url = f"{server.scheme}://{server.host}:{server.port}{url_part}"
# Try/exceptpass to accommodate for Timeout of aoihttp
try:
resp = await websession.get(url)
- assumed_status = self.connect_status(broken, timeout, fail_auth)
+ assumed_status = self.connect_status(broken, fail_auth, timeout_happened)
assert resp.status == assumed_status
timeoutpass_result = False
assert timeoutpass_result
@@ -397,7 +316,7 @@ async def connect_legacy(
timeoutpass_result = True
assert timeoutpass_result
- if not broken and not timeout and not fail_auth:
+ if not broken and not timeout_happened and not fail_auth:
text = await resp.text()
assert "xml" in text
@@ -424,7 +343,7 @@ async def connect_legacy(
websession=websession,
)
- if not timeout:
+ if not timeout_happened:
assert smile._timeout == 30
# Connect to the smile
@@ -432,7 +351,7 @@ async def connect_legacy(
try:
smile_version = await smile.connect()
assert smile_version is not None
- assert smile._timeout == 30
+ assert smile._timeout == smile_timeout_value
return server, smile, client
except (
pw_exceptions.ConnectionFailedError,
@@ -451,7 +370,7 @@ async def connect_wrapper(
if fail_auth:
try:
_LOGGER.warning("Connecting to device with invalid credentials:")
- await self.connect(fail_auth=fail_auth)
+ await self.connect(self.setup_app, fail_auth=fail_auth)
_LOGGER.error(" - invalid credentials not handled") # pragma: no cover
raise self.ConnectError # pragma: no cover
except pw_exceptions.InvalidAuthentication as exc:
@@ -460,11 +379,11 @@ async def connect_wrapper(
if raise_timeout:
_LOGGER.warning("Connecting to device exceeding timeout in handling:")
- return await self.connect(raise_timeout=True)
+ return await self.connect(self.setup_app, raise_timeout=True)
try:
_LOGGER.warning("Connecting to device exceeding timeout in response:")
- await self.connect(timeout=True)
+ await self.connect(self.setup_app, timeout_happened=True)
_LOGGER.error(" - timeout not handled") # pragma: no cover
raise self.ConnectError # pragma: no cover
except pw_exceptions.ConnectionFailedError:
@@ -472,14 +391,14 @@ async def connect_wrapper(
try:
_LOGGER.warning("Connecting to device with missing data:")
- await self.connect(broken=True)
+ await self.connect(self.setup_app, broken=True)
_LOGGER.error(" - broken information not handled") # pragma: no cover
raise self.ConnectError # pragma: no cover
except pw_exceptions.InvalidXMLError:
_LOGGER.info(" + successfully passed XML issue handling.")
_LOGGER.info("Connecting to functioning device:")
- return await self.connect(stretch=stretch)
+ return await self.connect(self.setup_app, stretch=stretch)
async def connect_legacy_wrapper(
self, raise_timeout=False, fail_auth=False, stretch=False
@@ -487,11 +406,21 @@ async def connect_legacy_wrapper(
"""Wrap connect to try negative testing before positive testing."""
if raise_timeout:
_LOGGER.warning("Connecting to device exceeding timeout in handling:")
- return await self.connect_legacy(raise_timeout=True)
+ return await self.connect(
+ self.setup_legacy_app,
+ raise_timeout=True,
+ smile_timeout_value=30,
+ url_part=CORE_LOCATIONS,
+ )
try:
_LOGGER.warning("Connecting to device exceeding timeout in response:")
- await self.connect_legacy(timeout=True)
+ await self.connect(
+ self.setup_legacy_app,
+ smile_timeout_value=30,
+ timeout_happened=True,
+ url_part=CORE_LOCATIONS,
+ )
_LOGGER.error(" - timeout not handled") # pragma: no cover
raise self.ConnectError # pragma: no cover
except pw_exceptions.ConnectionFailedError:
@@ -499,14 +428,24 @@ async def connect_legacy_wrapper(
try:
_LOGGER.warning("Connecting to device with missing data:")
- await self.connect_legacy(broken=True)
+ await self.connect(
+ self.setup_legacy_app,
+ broken=True,
+ smile_timeout_value=30,
+ url_part=CORE_LOCATIONS,
+ )
_LOGGER.error(" - broken information not handled") # pragma: no cover
raise self.ConnectError # pragma: no cover
except pw_exceptions.InvalidXMLError:
_LOGGER.info(" + successfully passed XML issue handling.")
_LOGGER.info("Connecting to functioning device:")
- return await self.connect_legacy(stretch=stretch)
+ return await self.connect(
+ self.setup_legacy_app,
+ smile_timeout_value=30,
+ stretch=stretch,
+ url_part=CORE_LOCATIONS,
+ )
# Generic disconnect
@classmethod
@@ -608,7 +547,7 @@ def test_and_assert(test_dict, data, header):
if initialize:
_LOGGER.info("Asserting testdata:")
data = await smile.async_update()
- if smile.smile_legacy:
+ if smile.smile.legacy:
assert smile._timeout == 30
else:
assert smile._timeout == 10
@@ -618,7 +557,7 @@ def test_and_assert(test_dict, data, header):
_LOGGER.info("Gateway id = %s", smile.gateway_id)
_LOGGER.info("Heater id = %s", smile.heater_id)
- _LOGGER.info("Hostname = %s", smile.smile_hostname)
+ _LOGGER.info("Hostname = %s", smile.smile.hostname)
_LOGGER.info("Entities list = %s", data)
self.cooling_present = smile.cooling_present
@@ -641,7 +580,7 @@ def test_and_assert(test_dict, data, header):
"cooling_state"
]
- self._write_json("data", data)
+ await self._write_json("data", data)
if "FIXTURES" in os.environ:
_LOGGER.info("Skipping tests: Requested fixtures only") # pragma: no cover
@@ -731,7 +670,7 @@ async def tinker_thermostat_temp(
tinker_temp_passed = False
test_temp = {"setpoint": 22.9}
if self.cooling_present and not block_cooling:
- if smile.smile_name == "Smile Anna":
+ if smile.smile.name == "Smile Anna":
if self._cooling_enabled:
test_temp = {"setpoint_low": 4.0, "setpoint_high": 23.0}
else:
@@ -1060,17 +999,17 @@ def validate_test_basics(
if smile_type:
log_msg = f" # Assert type matching {smile_type}"
parent_logger.info(log_msg)
- assert smile.smile_type == smile_type
+ assert smile.smile.type == smile_type
if smile_version:
log_msg = f" # Assert version matching '{smile_version}"
parent_logger.info(log_msg)
- assert smile.smile_version == version.parse(smile_version)
+ assert smile.smile.version == version.parse(smile_version)
log_msg = f" # Assert legacy {smile_legacy}"
parent_logger.info(log_msg)
if smile_legacy:
- assert smile.smile_legacy
+ assert smile.smile.legacy
else:
- assert not smile.smile_legacy
+ assert not smile.smile.legacy
class PlugwiseTestError(Exception):
"""Plugwise test exceptions class."""
diff --git a/tests/test_legacy_anna.py b/tests/test_legacy_anna.py
index a3a7b8469..8efecf2a5 100644
--- a/tests/test_legacy_anna.py
+++ b/tests/test_legacy_anna.py
@@ -16,10 +16,10 @@ class TestPlugwiseAnna(TestPlugwise): # pylint: disable=attribute-defined-outsi
async def test_connect_legacy_anna(self):
"""Test a legacy Anna device."""
self.smile_setup = "legacy_anna"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_legacy_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -53,9 +53,9 @@ async def test_connect_legacy_anna_2(self):
"""Test another legacy Anna device."""
self.smile_setup = "legacy_anna_2"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_legacy_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
diff --git a/tests/test_legacy_p1.py b/tests/test_legacy_p1.py
index d569ca52b..95bd94a93 100644
--- a/tests/test_legacy_p1.py
+++ b/tests/test_legacy_p1.py
@@ -15,9 +15,9 @@ async def test_connect_smile_p1_v2(self):
"""Test a legacy P1 device."""
self.smile_setup = "smile_p1_v2"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_legacy_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -39,9 +39,9 @@ async def test_connect_smile_p1_v2_2(self):
"""Test another legacy P1 device."""
self.smile_setup = "smile_p1_v2_2"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_legacy_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -56,7 +56,7 @@ async def test_connect_smile_p1_v2_2(self):
# Now change some data and change directory reading xml from
# emulating reading newer dataset after an update_interval
- testdata_updated = self.load_testdata(
+ testdata_updated = await self.load_testdata(
SMILE_TYPE, f"{self.smile_setup}_UPDATED_DATA"
)
self.smile_setup = "updated/smile_p1_v2_2"
diff --git a/tests/test_legacy_stretch.py b/tests/test_legacy_stretch.py
index 021cc385b..0d29766cb 100644
--- a/tests/test_legacy_stretch.py
+++ b/tests/test_legacy_stretch.py
@@ -15,9 +15,9 @@ async def test_connect_stretch_v31(self):
"""Test a legacy Stretch with firmware 3.1 setup."""
self.smile_setup = "stretch_v31"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_legacy_wrapper(stretch=True)
- assert smile.smile_hostname == "stretch000000"
+ assert smile.smile.hostname == "stretch000000"
self.validate_test_basics(
_LOGGER,
@@ -39,7 +39,7 @@ async def test_connect_stretch_v31(self):
# Now change some data and change directory reading xml from
# emulating reading newer dataset after an update_interval
- testdata_updated = self.load_testdata(
+ testdata_updated = await self.load_testdata(
SMILE_TYPE, f"{self.smile_setup}_UPDATED_DATA"
)
self.smile_setup = "updated/stretch_v31"
@@ -55,9 +55,9 @@ async def test_connect_stretch_v23(self):
"""Test a legacy Stretch with firmware 2.3 setup."""
self.smile_setup = "stretch_v23"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_legacy_wrapper(stretch=True)
- assert smile.smile_hostname == "stretch000000"
+ assert smile.smile.hostname == "stretch000000"
self.validate_test_basics(
_LOGGER,
@@ -94,9 +94,9 @@ async def test_connect_stretch_v27_no_domain(self):
# testdata dictionary with key ctrl_id_dev_id => keys:values
self.smile_setup = "stretch_v27_no_domain"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_legacy_wrapper(stretch=True)
- assert smile.smile_hostname == "stretch000000"
+ assert smile.smile.hostname == "stretch000000"
self.validate_test_basics(
_LOGGER,
diff --git a/tests/test_p1.py b/tests/test_p1.py
index 562ebc4fd..e76b619c6 100644
--- a/tests/test_p1.py
+++ b/tests/test_p1.py
@@ -15,9 +15,9 @@ async def test_connect_p1v4_442_single(self):
"""Test a P1 firmware 4.4 single-phase setup."""
self.smile_setup = "p1v4_442_single"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,
@@ -33,7 +33,7 @@ async def test_connect_p1v4_442_single(self):
# Now change some data and change directory reading xml from
# emulating reading newer dataset after an update_interval
- testdata_updated = self.load_testdata(
+ testdata_updated = await self.load_testdata(
SMILE_TYPE, f"{self.smile_setup}_UPDATED_DATA"
)
self.smile_setup = "updated/p1v4_442_single"
@@ -65,9 +65,9 @@ async def test_connect_p1v4_442_triple(self):
"""Test a P1 firmware 4 3-phase setup."""
self.smile_setup = "p1v4_442_triple"
- testdata = self.load_testdata(SMILE_TYPE, self.smile_setup)
+ testdata = await self.load_testdata(SMILE_TYPE, self.smile_setup)
server, smile, client = await self.connect_wrapper()
- assert smile.smile_hostname == "smile000000"
+ assert smile.smile.hostname == "smile000000"
self.validate_test_basics(
_LOGGER,