Source code for pyads.connection

"""ADS Connection class.

:author: Stefan Lehmann <stlm@posteo.de>
:maintainer: Filippo Boido <filippo.boido@agileautomation.eu> (Agile Automation Technologies GmbH)
:license: MIT, see license file or https://opensource.org/licenses/MIT
:created on: 2018-06-11 18:15:53

"""
from __future__ import annotations
import inspect
import struct
from ctypes import (
    memmove,
    addressof,
    c_ubyte,
    Array,
    Structure,
    sizeof,
    create_string_buffer,
)
from datetime import datetime
from functools import partial
from typing import Optional, Union, Tuple, Any, Type, Callable, Dict, List, Sequence, cast, TypeVar, overload

# noinspection PyUnresolvedReferences
from .constants import (
    ADSIGRP_SYM_UPLOAD,
    ADSIGRP_SYM_UPLOADINFO2,
    ADSIOFFS_DEVDATA_ADSSTATE,
    PLCTYPE_BOOL,
    PLCTYPE_BYTE,
    PLCTYPE_DATE,
    PLCTYPE_DINT,
    PLCTYPE_DT,
    PLCTYPE_DWORD,
    PLCTYPE_INT,
    PLCTYPE_LREAL,
    PLCTYPE_REAL,
    PLCTYPE_SINT,
    PLCTYPE_STRING,
    PLCTYPE_TIME,
    PLCTYPE_TOD,
    PLCTYPE_UDINT,
    PLCTYPE_UINT,
    PLCTYPE_USINT,
    PLCTYPE_WORD,
    PLC_DEFAULT_STRING_SIZE,
    DATATYPE_MAP,
    ADSIGRP_SUMUP_READ,
    ADSIGRP_SUMUP_WRITE,
    ADSIGRP_SYM_VALBYHND,
    MAX_ADS_SUB_COMMANDS,
    ads_type_to_ctype,
    PLCSimpleDataType,
    PLCDataType,
)
from .filetimes import filetime_to_dt
from .pyads_ex import (
    adsAddRoute,
    adsDelRoute,
    adsPortOpenEx,
    adsPortCloseEx,
    adsGetLocalAddressEx,
    adsSyncReadStateReqEx,
    adsSyncReadDeviceInfoReqEx,
    adsSyncWriteControlReqEx,
    adsSyncWriteReqEx,
    adsSyncReadWriteReqEx2,
    adsSyncReadReqEx2,
    adsGetHandle,
    adsGetNetIdForPLC,
    adsGetSymbolInfo,
    adsSumRead,
    adsSumWrite,
    adsReleaseHandle,
    adsSyncReadByNameEx,
    adsSyncWriteByNameEx,
    adsSyncAddDeviceNotificationReqEx,
    adsSyncDelDeviceNotificationReqEx,
    adsSyncSetTimeoutEx,
    type_is_string,
    type_is_wstring,
    ADSError,
)
from .structs import (
    AmsAddr,
    AdsVersion,
    NotificationAttrib,
    SAdsNotificationHeader,
    SAdsSymbolEntry,
)
from .ads import (
    linux,
    StructureDef,
    dict_from_bytes,
    _list_slice_generator,
    _dict_slice_generator,
    bytes_from_dict,
    size_of_structure,
)
from .symbol import AdsSymbol
from .utils import decode_ads
from .rpc_interface import resolve_rpc_interface_definition

RpcInterfaceT = TypeVar("RpcInterfaceT")


[docs] class RpcObject: """Proxy object for calling PLC RPC methods using native attribute syntax.""" def __init__( self, connection: "Connection", object_name: str, method_separator: str = "#", method_prefixes: Tuple[str, ...] = ("", "m_"), method_return_types: Optional[Dict[str, Type["PLCDataType"]]] = None, method_parameters: Optional[ Dict[str, Sequence[Type["PLCDataType"]]] ] = None, ) -> None: self._connection = connection self._object_name = object_name self._method_separator = method_separator self._method_prefixes = method_prefixes self._method_return_types: Dict[str, Type["PLCDataType"]] = ( method_return_types.copy() if method_return_types else {} ) self._method_parameters: Dict[str, Tuple[Type["PLCDataType"], ...]] = ( {name: tuple(values) for name, values in method_parameters.items()} if method_parameters else {} ) # If a method is declared in `method_return_types` but no explicit # parameter signature is provided, treat it as a zero-argument RPC. for method_name in self._method_return_types: self._method_parameters.setdefault(method_name, tuple()) self._return_spec_cache: Dict[ str, Tuple[Optional[Type["PLCDataType"]], bool] ] = {} def _candidate_method_names(self, method_name: str) -> List[str]: names: List[str] = [] for prefix in self._method_prefixes: if method_name.startswith(prefix): candidate = method_name else: candidate = f"{prefix}{method_name}" names.append(f"{self._object_name}{self._method_separator}{candidate}") # Keep order but drop duplicates return list(dict.fromkeys(names)) def _get_symbol_info(self, qualified_name: str) -> Optional[SAdsSymbolEntry]: """Return cached symbol info for a fully qualified RPC method name.""" if not self._connection.is_open or self._connection._port is None: return None cache = self._connection._symbol_info_cache info = cache.get(qualified_name) if info is not None: return info try: info = adsGetSymbolInfo( self._connection._port, self._connection._adr, qualified_name ) except ADSError: return None cache[qualified_name] = info return info
[docs] def set_return_type(self, method_name: str, plc_type: Type["PLCDataType"]) -> None: """Register or override the expected return type for a method.""" self._method_return_types[method_name] = plc_type self._return_spec_cache.pop( f"{self._object_name}{self._method_separator}{method_name}", None )
def _manual_return_type(self, method_name: str) -> Optional[Type["PLCDataType"]]: """Look up manually configured return type for a method name.""" candidates = {method_name} for prefix in self._method_prefixes: if method_name.startswith(prefix): candidates.add(method_name[len(prefix):]) else: candidates.add(f"{prefix}{method_name}") for candidate in candidates: plc_type = self._method_return_types.get(candidate) if plc_type is not None: return plc_type return None def _infer_return_spec( self, qualified_name: str ) -> Tuple[Optional[Type["PLCDataType"]], bool]: """Infer the PLC return type for a method, falling back to raw bytes.""" cached = self._return_spec_cache.get(qualified_name) if cached is not None: return cached plc_type: Optional[Type["PLCDataType"]] = None coerce_bytes = False _, _, method_name = qualified_name.partition(self._method_separator) if method_name: plc_type = self._manual_return_type(method_name) if plc_type is None: info = self._get_symbol_info(qualified_name) if info is not None: plc_type = ads_type_to_ctype.get(info.dataType) if plc_type is None: plc_type = AdsSymbol.get_type_from_str(info.symbol_type) if plc_type is None and info.size: plc_type = c_ubyte * info.size coerce_bytes = True spec = (plc_type, coerce_bytes) self._return_spec_cache[qualified_name] = spec return spec @staticmethod def _normalize_control_args( args: Tuple[Any, ...], kwargs: Dict[str, Any] ) -> Tuple[Any, Optional[Type["PLCDataType"]], Optional[Type["PLCDataType"]], Optional[str]]: write_value = kwargs.pop("write_value", None) write_type = kwargs.pop("write_type", None) return_type = kwargs.pop("return_type", None) method_name_override = kwargs.pop("method_name", None) if kwargs: unknown = ", ".join(sorted(kwargs.keys())) raise TypeError(f"Unknown keyword argument(s): {unknown}") if len(args) > 3: raise TypeError("Expected up to 3 positional args: value, write_type, return_type") if len(args) >= 1: if write_value is not None: raise TypeError("write_value provided both positionally and as keyword") write_value = args[0] if len(args) >= 2: if write_type is not None: raise TypeError("write_type provided both positionally and as keyword") write_type = args[1] if len(args) == 3: if return_type is not None: raise TypeError("return_type provided both positionally and as keyword") return_type = args[2] return write_value, write_type, return_type, method_name_override def _parameter_types_for(self, method_name: str) -> Tuple[Type["PLCDataType"], ...]: candidates = {method_name} for prefix in self._method_prefixes: if method_name.startswith(prefix): candidates.add(method_name[len(prefix):]) else: candidates.add(f"{prefix}{method_name}") for candidate in candidates: values = self._method_parameters.get(candidate) if values is not None: return values return tuple() @staticmethod def _pack_single_argument(value: Any, plc_type: Type["PLCDataType"]) -> bytes: if type_is_string(plc_type): return value.encode("utf-8") + b"\x00" if type_is_wstring(plc_type): return value.encode("utf-16-le") if type(plc_type).__name__ == "PyCArrayType": data = plc_type(*value) elif type(value) is plc_type: data = value else: data = plc_type(value) raw = (c_ubyte * sizeof(data))() memmove(raw, addressof(data), sizeof(data)) return bytes(raw) def _pack_method_args( self, method_name: str, args: Tuple[Any, ...], ) -> Tuple[Any, Optional[Type["PLCDataType"]]]: parameter_types = self._parameter_types_for(method_name) if not parameter_types: if len(args) == 0: return None, None if len(args) == 1: raise TypeError( "Parameter types are not configured for this method. " "Pass `method_parameters=` to get_object, " "or call with explicit write_type." ) raise TypeError( "Parameter types are not configured for this method. " "Use explicit write_type or configure method parameters." ) if len(args) != len(parameter_types): raise TypeError( f"{method_name} expects {len(parameter_types)} parameter(s), " f"got {len(args)}." ) payload = bytearray() for value, plc_type in zip(args, parameter_types): payload.extend(self._pack_single_argument(value, plc_type)) if not payload: return None, None write_type = c_ubyte * len(payload) return list(payload), write_type def __getattr__(self, method_name: str) -> Callable[..., Any]: if method_name.startswith("_"): raise AttributeError(method_name) def _invoke(*args: Any, **kwargs: Any) -> Any: control_keys = {"write_value", "write_type", "return_type", "method_name"} has_control_args = any(key in kwargs for key in control_keys) if has_control_args: write_value, write_type, return_type, method_name_override = self._normalize_control_args(args, kwargs) else: if kwargs: unknown = ", ".join(sorted(kwargs.keys())) raise TypeError(f"Unknown keyword argument(s): {unknown}") is_legacy_positional = ( len(args) <= 3 and ( len(args) == 0 or (len(args) >= 2 and isinstance(args[1], type)) ) and not self._parameter_types_for(method_name) ) if is_legacy_positional: write_value = args[0] if len(args) >= 1 else None write_type = args[1] if len(args) >= 2 else None return_type = args[2] if len(args) == 3 else None method_name_override = None else: write_value, write_type = self._pack_method_args(method_name, args) return_type = None method_name_override = None if method_name_override is not None: return self._connection.call_rpc_method( method_name_override, return_type=return_type, write_value=write_value, write_type=write_type, ) last_error: Optional[ADSError] = None for candidate_name in self._candidate_method_names(method_name): resolved_return_type = return_type coerce_bytes = False if resolved_return_type is None: resolved_return_type, coerce_bytes = self._infer_return_spec( candidate_name ) try: result = self._connection.call_rpc_method( candidate_name, return_type=resolved_return_type, write_value=write_value, write_type=write_type, ) except ADSError as exc: if exc.err_code != 1808: # symbol not found raise last_error = exc continue if coerce_bytes and isinstance(result, list): return bytes(result) return result if last_error is not None: raise last_error raise AttributeError(method_name) return _invoke
[docs] class Connection(object): """Class for managing the connection to an ADS device. :ivar str ams_net_id: AMS net id of the remote device :ivar int ams_net_port: port of the remote device :ivar str ip_address: the ip address of the device :note: If no IP address is given the ip address is automatically set to first 4 parts of the Ams net id. """ def __init__( self, ams_net_id: str = None, ams_net_port: int = None, ip_address: str = None ) -> None: self._port = None # type: Optional[int] self._adr = AmsAddr(ams_net_id, ams_net_port) self._open = False if ip_address is None: if ams_net_id is None: raise TypeError("Must provide an IP or net ID") self.ip_address = ".".join(ams_net_id.split(".")[:4]) else: self.ip_address = ip_address self.ams_net_id = ams_net_id self.ams_net_port = ams_net_port self._notifications = {} # type: Dict[int, str] self._symbol_info_cache: Dict[str, SAdsSymbolEntry] = {} @property def ams_netid(self) -> str: return self._adr.netid @ams_netid.setter def ams_netid(self, value: str) -> None: if self._open: raise AttributeError( "Setting netid is not allowed while connection is open." ) self._adr.netid = value @property def ams_port(self) -> int: return self._adr.port @ams_port.setter def ams_port(self, value: int) -> None: if self._open: raise AttributeError( "Setting port is not allowed while connection is open." ) self._adr.port = value def __enter__(self) -> "Connection": """Open on entering with-block.""" self.open() return self def __exit__(self, _type: Type, _val: Any, _traceback: Any) -> None: """Close on leaving with-block.""" self.close() def __del__(self) -> None: """Class destructor. Make sure to close the connection when an instance runs out of scope. """ # If the connection is already closed, nothing new will happen self.close() def _query_plc_datatype_from_name(self, data_name: str, cache_symbol_info: bool) -> Type: """Return the plc_datatype by reading SymbolInfo from the target. If cache_symbol_info is True then the SymbolInfo will be cached and adsGetSymbolInfo will only used once. """ if cache_symbol_info: info = self._symbol_info_cache.get(data_name) if info is None: info = adsGetSymbolInfo(self._port, self._adr, data_name) self._symbol_info_cache[data_name] = info else: info = adsGetSymbolInfo(self._port, self._adr, data_name) return AdsSymbol.get_type_from_str(info.symbol_type)
[docs] def open(self) -> None: """Connect to the TwinCAT message router.""" if self._open: return if self.ams_net_id is None: self.ams_net_id = adsGetNetIdForPLC(self.ip_address) self._adr = AmsAddr(self.ams_net_id, self.ams_net_port) self._port = adsPortOpenEx() if linux: try: adsAddRoute(self._adr.netIdStruct(), self.ip_address) except ADSError: adsPortCloseEx(self._port) self._port = None raise self._open = True
[docs] def close(self) -> None: """:summary: Close the connection to the TwinCAT message router.""" if not self._open: return if linux: adsDelRoute(self._adr.netIdStruct()) if self._port is not None: adsPortCloseEx(self._port) self._port = None self._open = False
[docs] def get_local_address(self) -> Optional[AmsAddr]: """Return the local AMS-address and the port number. :rtype: AmsAddr """ if self._port is not None: return adsGetLocalAddressEx(self._port) return None
[docs] def read_state(self) -> Optional[Tuple[int, int]]: """Read the current ADS-state and the machine-state. Read the current ADS-state and the machine-state from the ADS-server. :rtype: (int, int) :return: adsState, deviceState """ if self._port is not None: return adsSyncReadStateReqEx(self._port, self._adr) return None
[docs] def write_control( self, ads_state: int, device_state: int, data: Any, plc_datatype: Type ) -> None: """Change the ADS state and the machine-state of the ADS-server. :param int ads_state: new ADS-state, according to ADSTATE constants :param int device_state: new machine-state :param data: additional data :param int plc_datatype: datatype, according to PLCTYPE constants :note: Despite changing the ADS-state and the machine-state it is possible to send additional data to the ADS-server. For current ADS-devices additional data is not progressed. Every ADS-device is able to communicate its current state to other devices. There is a difference between the device-state and the state of the ADS-interface (AdsState). The possible states of an ADS-interface are defined in the ADS-specification. """ if self._port is not None: return adsSyncWriteControlReqEx( self._port, self._adr, ads_state, device_state, data, plc_datatype )
[docs] def read_device_info(self) -> Optional[Tuple[str, AdsVersion]]: """Read the name and the version number of the ADS-server. :rtype: string, AdsVersion :return: device name, version """ if self._port is not None: return adsSyncReadDeviceInfoReqEx(self._port, self._adr) return None
[docs] def write( self, index_group: int, index_offset: int, value: Any, plc_datatype: Type["PLCDataType"] ) -> None: """Send data synchronous to an ADS-device. :param int index_group: PLC storage area, according to the INDEXGROUP constants :param int index_offset: PLC storage address :param Any value: value to write to the storage address of the PLC :param Type["PLCDataType"] plc_datatype: type of the data given to the PLC, according to PLCTYPE constants """ if self._port is not None: return adsSyncWriteReqEx( self._port, self._adr, index_group, index_offset, value, plc_datatype )
[docs] def read_write( self, index_group: int, index_offset: int, plc_read_datatype: Optional[Type["PLCDataType"]], value: Any, plc_write_datatype: Optional[Type["PLCDataType"]], return_ctypes: bool = False, check_length: bool = True, ) -> Any: """Read and write data synchronous from/to an ADS-device. :param int index_group: PLC storage area, according to the INDEXGROUP constants :param int index_offset: PLC storage address :param Type["PLCDataType"] plc_read_datatype: type of the data given to the PLC to respond to, according to PLCTYPE constants, or None to not read anything :param value: value to write to the storage address of the PLC :param Type["PLCDataType"] plc_write_datatype: type of the data given to the PLC, according to PLCTYPE constants, or None to not write anything :param bool return_ctypes: return ctypes instead of python types if True (default: False) :param bool check_length: check whether the amount of bytes read matches the size of the read data type (default: True) :return: value: **value** """ if self._port is not None: return adsSyncReadWriteReqEx2( self._port, self._adr, index_group, index_offset, plc_read_datatype, value, plc_write_datatype, return_ctypes, check_length, ) return None
[docs] def read( self, index_group: int, index_offset: int, plc_datatype: Type["PLCDataType"], return_ctypes: bool = False, check_length: bool = True, ) -> Any: """Read data synchronous from an ADS-device. :param int index_group: PLC storage area, according to the INDEXGROUP constants :param int index_offset: PLC storage address :param Type["PLCDataType"] plc_datatype: type of the data given to the PLC, according to PLCTYPE constants :param bool return_ctypes: return ctypes instead of python types if True (default: False) :param bool check_length: check whether the amount of bytes read matches the size of the read data type (default: True) :return: value """ if index_group is None or not isinstance(index_group, int): raise TypeError('index_group: integer is required') if index_offset is None or not isinstance(index_offset, int): raise TypeError('index_offset: integer is required') if self._port is not None: return adsSyncReadReqEx2( self._port, self._adr, index_group, index_offset, plc_datatype, return_ctypes, check_length, ) return None
[docs] def get_symbol( self, name: Optional[str] = None, index_group: Optional[int] = None, index_offset: Optional[int] = None, plc_datatype: Optional[Union[Type["PLCDataType"], str]] = None, comment: Optional[str] = None, auto_update: bool = False, structure_def: Optional["StructureDef"] = None, array_size: Optional[int] = 1, ) -> AdsSymbol: """Create a symbol instance Specify either the variable name or the index_group **and** index_offset so the symbol can be located. If the name was specified but not all other attributes were, the other attributes will be looked up from the connection. `data_type` can be a PLCTYPE constant or a string representing a PLC type (e.g. 'LREAL'). :param str name: :param Optional[int] index_group: :param Optional[int] index_offset: :param plc_datatype: type of the PLC variable, according to PLCTYPE constants :param str comment: comment :param bool auto_update: Create notification to update buffer (same as `set_auto_update(True)`) :param Optional["StructureDef"] structure_def: special tuple defining the structure and types contained within it according to PLCTYPE constants, must match the structure defined in the PLC, PLC structure must be defined with {attribute 'pack_mode' := '1'} :param Optional[int] array_size: size of array if reading array of structure, defaults to 1 Expected input example for structure_def: .. code:: python structure_def = ( ('rVar', pyads.PLCTYPE_LREAL, 1), ('sVar', pyads.PLCTYPE_STRING, 2, 35), ('SVar1', pyads.PLCTYPE_STRING, 1), ('rVar1', pyads.PLCTYPE_REAL, 1), ('iVar', pyads.PLCTYPE_DINT, 1), ('iVar1', pyads.PLCTYPE_INT, 3), ) # i.e ('Variable Name', variable type, arr size (1 if not array), # length of string (if defined in PLC)) """ return AdsSymbol(self, name, index_group, index_offset, plc_datatype, comment, auto_update=auto_update, structure_def=structure_def, array_size=array_size)
[docs] def get_all_symbols(self) -> List[AdsSymbol]: """Read all symbols from an ADS-device. :return: List of AdsSymbols """ symbols = [] if self._port is not None: symbol_size_msg = self.read( ADSIGRP_SYM_UPLOADINFO2, ADSIOFFS_DEVDATA_ADSSTATE, PLCTYPE_STRING, return_ctypes=True, ) sym_count = struct.unpack("I", symbol_size_msg[0:4])[0] sym_list_length = struct.unpack("I", symbol_size_msg[4:8])[0] data_type_creation_fn: Type = cast("Type", partial(create_string_buffer, sym_list_length)) symbol_list_msg = self.read( ADSIGRP_SYM_UPLOAD, ADSIOFFS_DEVDATA_ADSSTATE, data_type_creation_fn, return_ctypes=True, ) ptr = 0 for idx in range(sym_count): read_length, index_group, index_offset = struct.unpack( "III", symbol_list_msg[ptr + 0: ptr + 12] ) name_length, type_length, comment_length = struct.unpack( "HHH", symbol_list_msg[ptr + 24: ptr + 30] ) name_start_ptr = ptr + 30 name_end_ptr = name_start_ptr + name_length type_start_ptr = name_end_ptr + 1 type_end_ptr = type_start_ptr + type_length comment_start_ptr = type_end_ptr + 1 comment_end_ptr = comment_start_ptr + comment_length name = decode_ads(symbol_list_msg[name_start_ptr:name_end_ptr]) symbol_type = decode_ads(symbol_list_msg[type_start_ptr:type_end_ptr]) comment = decode_ads(symbol_list_msg[comment_start_ptr:comment_end_ptr]) ptr = ptr + read_length symbol = AdsSymbol(plc=self, name=name, index_group=index_group, index_offset=index_offset, symbol_type=symbol_type, comment=comment) symbols.append(symbol) return symbols
@overload def get_object( self, object_name: str, method_separator: str = "#", method_prefixes: Tuple[str, ...] = ("", "m_"), method_return_types: Optional[Dict[str, Type["PLCDataType"]]] = None, method_parameters: Optional[ Dict[str, Sequence[Type["PLCDataType"]]] ] = None, ) -> RpcObject: ... @overload def get_object( self, object_name: Type[RpcInterfaceT], method_separator: str = "#", method_prefixes: Tuple[str, ...] = ("", "m_"), method_return_types: Optional[Dict[str, Type["PLCDataType"]]] = None, method_parameters: Optional[ Dict[str, Sequence[Type["PLCDataType"]]] ] = None, ) -> RpcInterfaceT: ...
[docs] def get_object( self, object_name: Union[str, Type[RpcInterfaceT]], method_separator: str = "#", method_prefixes: Tuple[str, ...] = ("", "m_"), method_return_types: Optional[Dict[str, Type["PLCDataType"]]] = None, method_parameters: Optional[ Dict[str, Sequence[Type["PLCDataType"]]] ] = None, ) -> Union[RpcObject, RpcInterfaceT]: """Create a PLC object proxy for native RPC method calls or typed interfaces. Example: rpc = plc.get_object( "GVL.fbTestRemoteMethodCall", method_return_types={"m_iSum": pyads.PLCTYPE_INT}, method_parameters={"m_iSum": [pyads.PLCTYPE_INT, pyads.PLCTYPE_INT]}, ) result = rpc.m_iSum(5, 5) Method names are resolved in order using ``method_prefixes``. The default supports both ``doCalc`` and ``m_doCalc`` style RPC names. """ interface_class: Optional[Type[RpcInterfaceT]] = ( object_name if inspect.isclass(object_name) else None ) inferred_returns: Optional[Dict[str, Type["PLCDataType"]]] = None inferred_parameters: Optional[Dict[str, Tuple[Type["PLCDataType"], ...]]] = None if isinstance(object_name, str): resolved_object_name = object_name elif interface_class is not None: interface_definition = resolve_rpc_interface_definition(interface_class) if interface_definition.async_interface: raise TypeError( f"{interface_class.__name__} is decorated with @ads_async_path('...') " "and must be used with AsyncConnection.get_async_object()." ) if interface_definition.stepchain_methods: raise TypeError( f"{interface_class.__name__} declares @stepchain_start methods " "and must be used with AsyncConnection.get_async_object()." ) resolved_object_name = interface_definition.object_name if interface_definition.method_return_types: inferred_returns = dict(interface_definition.method_return_types) if interface_definition.method_parameters: inferred_parameters = dict(interface_definition.method_parameters) else: raise TypeError("object_name must be a string or an ads_path-decorated class.") final_return_types: Optional[Dict[str, Type["PLCDataType"]]] = None if inferred_returns: final_return_types = inferred_returns if method_return_types: final_return_types = {**(final_return_types or {}), **method_return_types} normalized_parameters: Optional[Dict[str, Tuple[Type["PLCDataType"], ...]]] = None if method_parameters: normalized_parameters = { name: tuple(values) for name, values in method_parameters.items() } if inferred_parameters: normalized_parameters = { **(inferred_parameters or {}), **(normalized_parameters or {}), } rpc_object = RpcObject( connection=self, object_name=resolved_object_name, method_separator=method_separator, method_prefixes=method_prefixes, method_return_types=final_return_types, method_parameters=normalized_parameters, ) if interface_class is not None: return cast(RpcInterfaceT, rpc_object) return rpc_object
[docs] def call_rpc_method( self, method_name: str, return_type: Optional[Type["PLCDataType"]] = None, write_value: Any = None, write_type: Optional[Type["PLCDataType"]] = None, ) -> Any: """Call a PLC RPC method by ADS handle and release the handle afterwards.""" handle = self.get_handle(method_name) if handle is None: return None try: return self.read_write( ADSIGRP_SYM_VALBYHND, handle, return_type, write_value, write_type, ) finally: self.release_handle(handle)
[docs] def get_handle(self, data_name: str) -> Optional[int]: """Get the handle of the PLC-variable, handles obtained using this method should be released using method 'release_handle'. :param string data_name: data name :rtype: int :return: int: PLC-variable handle """ if self._port is not None: return adsGetHandle(self._port, self._adr, data_name) return None
[docs] def release_handle(self, handle: int) -> None: """ Release handle of a PLC-variable. :param int handle: handle of PLC-variable to be released """ if self._port is not None: adsReleaseHandle(self._port, self._adr, handle)
[docs] def read_by_name( self, data_name: str, plc_datatype: Optional[Type["PLCDataType"]] = None, return_ctypes: bool = False, handle: Optional[int] = None, check_length: bool = True, cache_symbol_info: bool = True, ) -> Any: """Read data synchronous from an ADS-device from data name. :param string data_name: data name, can be empty string if handle is used :param Optional[Type["PLCDataType"]] plc_datatype: type of the data given to the PLC, according to PLCTYPE constants, if None the datatype will be read from the target with adsGetSymbolInfo (default: None) :param bool return_ctypes: return ctypes instead of python types if True (default: False) :param int handle: PLC-variable handle, pass in handle if previously obtained to speed up reading (default: None) :param bool check_length: check whether the amount of bytes read matches the size of the read data type (default: True) :param bool cache_symbol_info: when True, symbol info will be cached for future reading, only relevant if plc_datatype is None (default: True) :return: value: **value** """ if not self._port: return if plc_datatype is None: plc_datatype = self._query_plc_datatype_from_name(data_name, cache_symbol_info) return adsSyncReadByNameEx( self._port, self._adr, data_name, plc_datatype, return_ctypes=return_ctypes, handle=handle, check_length=check_length, )
[docs] def read_list_by_name( self, data_names: List[str], cache_symbol_info: bool = True, ads_sub_commands: int = MAX_ADS_SUB_COMMANDS, structure_defs: Optional[Dict[str, StructureDef]] = None, ) -> Dict[str, Any]: """Read a list of variables. Will split the read into multiple ADS calls in chunks of ads_sub_commands by default. MAX_ADS_SUB_COMMANDS comes from Beckhoff recommendation: https://infosys.beckhoff.com/english.php?content=../content/1033/tc3_adsdll2/9007199379576075.html&id=9180083787138954512 :param List[str] data_names: list of variable names to be read :param bool cache_symbol_info: when True, symbol info will be cached for future reading :param int ads_sub_commands: Max number of ADS-Sub commands used to read the variables in a single ADS call. A larger number can be used but may jitter the PLC execution! :param Optional[Dict[str, StructureDef]] structure_defs: for structured variables, optional mapping of data name to special tuple defining the structure and types contained within it according to PLCTYPE constants :return adsSumRead: A dictionary containing variable names from data_names as keys and values read from PLC for each variable :rtype: Dict[str, Any] """ if structure_defs is None: structure_defs = {} if cache_symbol_info: new_items = [i for i in data_names if i not in self._symbol_info_cache] new_cache = { i: adsGetSymbolInfo(self._port, self._adr, i) for i in new_items } self._symbol_info_cache.update(new_cache) data_symbols = {i: self._symbol_info_cache[i] for i in data_names} else: data_symbols = { i: adsGetSymbolInfo(self._port, self._adr, i) for i in data_names } def sum_read(port: int, adr: AmsAddr, data_names: List[str], data_symbols: Dict) -> Dict[str, str]: result = adsSumRead(port, adr, data_names, data_symbols, list(structure_defs.keys())) # type: ignore for data_name, structure_def in structure_defs.items(): # type: ignore if data_name in result: result[data_name] = dict_from_bytes(result[data_name], structure_def) return result if len(data_names) <= ads_sub_commands: return sum_read(self._port, self._adr, data_names, data_symbols) return_data: Dict[str, Any] = {} for data_names_slice in _list_slice_generator(data_names, ads_sub_commands): return_data.update( sum_read(self._port, self._adr, data_names_slice, data_symbols) ) return return_data
[docs] def read_structure_by_name( self, data_name: str, structure_def: StructureDef, array_size: Optional[int] = 1, structure_size: Optional[int] = None, handle: Optional[int] = None, ) -> Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]: """Read a structure of multiple types. :param string data_name: data name :param tuple structure_def: special tuple defining the structure and types contained within it according to PLCTYPE constants, must match the structure defined in the PLC, PLC structure must be defined with {attribute 'pack_mode' := '1'} :param Optional[int] array_size: size of array if reading array of structure, defaults to 1 :param Optional[int] structure_size: size of structure if known by previous use of size_of_structure, defaults to None :param Optional[int] handle: PLC-variable handle, pass in handle if previously obtained to speed up reading, defaults to None :return: values_dict: ordered dictionary of all values corresponding to the structure definition Expected input example for structure_def: .. code:: python structure_def = ( ('rVar', pyads.PLCTYPE_LREAL, 1), ('sVar', pyads.PLCTYPE_STRING, 2, 35), ('SVar1', pyads.PLCTYPE_STRING, 1), ('rVar1', pyads.PLCTYPE_REAL, 1), ('iVar', pyads.PLCTYPE_DINT, 1), ('iVar1', pyads.PLCTYPE_INT, 3), ) # i.e ('Variable Name', variable type, arr size (1 if not array), # length of string (if defined in PLC)) """ if structure_size is None: structure_size = size_of_structure(structure_def * array_size) values = self.read_by_name(data_name, c_ubyte * structure_size, handle=handle) if values is not None: return dict_from_bytes(values, structure_def, array_size=array_size) return None
[docs] def write_by_name( self, data_name: str, value: Any, plc_datatype: Optional[Type["PLCDataType"]] = None, handle: Optional[int] = None, cache_symbol_info: bool = True, ) -> None: """Send data synchronous to an ADS-device from data name. :param string data_name: data name, can be empty string if handle is used :param value: value to write to the storage address of the PLC :param int plc_datatype: type of the data given to the PLC, according to PLCTYPE constants, if None the datatype will be read from the target with adsGetSymbolInfo (default: None) :param int handle: PLC-variable handle, pass in handle if previously obtained to speed up writing (default: None) :param bool cache_symbol_info: when True, symbol info will be cached for future reading, only relevant if plc_datatype is None (default: True) """ if not self._port: return if plc_datatype is None: plc_datatype = self._query_plc_datatype_from_name(data_name, cache_symbol_info) return adsSyncWriteByNameEx( self._port, self._adr, data_name, value, plc_datatype, handle=handle )
[docs] def write_list_by_name( self, data_names_and_values: Dict[str, Any], cache_symbol_info: bool = True, ads_sub_commands: int = MAX_ADS_SUB_COMMANDS, structure_defs: Optional[Dict[str, StructureDef]] = None, ) -> Dict[str, str]: """Write a list of variables. Will split the write into multiple ADS calls in chunks of ads_sub_commands by default. MAX_ADS_SUB_COMMANDS comes from Beckhoff recommendation: https://infosys.beckhoff.com/english.php?content=../content/1033/tc3_adsdll2/9007199379576075.html&id=9180083787138954512 :param data_names_and_values: dictionary of variable names and their values to be written :type data_names_and_values: dict[str, Any] :param bool cache_symbol_info: when True, symbol info will be cached for future reading :param int ads_sub_commands: Max number of ADS-Sub commands used to write the variables in a single ADS call. A larger number can be used but may jitter the PLC execution! :param dict structure_defs: for structured variables, optional mapping of data name to special tuple defining the structure and types contained within it according to PLCTYPE constants :return adsSumWrite: A dictionary containing variable names from data_names as keys and values return codes for each write operation from the PLC :rtype: dict(str, str) """ if cache_symbol_info: new_items = [ i for i in data_names_and_values.keys() if i not in self._symbol_info_cache ] new_cache = { i: adsGetSymbolInfo(self._port, self._adr, i) for i in new_items } self._symbol_info_cache.update(new_cache) data_symbols = { i: self._symbol_info_cache[i] for i in data_names_and_values } else: data_symbols = { i: adsGetSymbolInfo(self._port, self._adr, i) for i in data_names_and_values.keys() } if structure_defs is None: structure_defs = {} else: data_names_and_values = data_names_and_values.copy() # copy so the original does not get modified for name, structure_def in structure_defs.items(): data_names_and_values[name] = bytes_from_dict(data_names_and_values[name], structure_def) structured_data_names = list(structure_defs.keys()) if len(data_names_and_values) <= ads_sub_commands: return adsSumWrite( self._port, self._adr, data_names_and_values, data_symbols, structured_data_names ) return_data: Dict[str, str] = {} for data_names_slice in _dict_slice_generator(data_names_and_values, ads_sub_commands): return_data.update( adsSumWrite(self._port, self._adr, data_names_slice, data_symbols, structured_data_names) ) return return_data
[docs] def write_structure_by_name( self, data_name: str, value: Union[Dict[str, Any], List[Dict[str, Any]]], structure_def: StructureDef, array_size: Optional[int] = 1, structure_size: Optional[int] = None, handle: Optional[int] = None, ) -> None: """Write a structure of multiple types. :param str data_name: data name :param Union[Dict[str, Any], List[Dict[str, Any]]] value: value to write to the storage address of the PLC :param StructureDef structure_def: special tuple defining the structure and types contained within it according to PLCTYPE constants, must match the structure defined in the PLC, PLC structure must be defined with {attribute 'pack_mode' := '1'} :param Optional[int] array_size: size of array if writing array of structure, defaults to 1 :param Optional[int] structure_size: size of structure if known by previous use of size_of_structure, defaults to None :param Optional[int] handle: PLC-variable handle, pass in handle if previously obtained to speed up reading, defaults to None Expected input example for structure_def: .. code:: python structure_def = ( ('rVar', pyads.PLCTYPE_LREAL, 1), ('sVar', pyads.PLCTYPE_STRING, 2, 35), ('sVar', pyads.PLCTYPE_STRING, 1), ('rVar1', pyads.PLCTYPE_REAL, 1), ('iVar', pyads.PLCTYPE_DINT, 1), ) # i.e ('Variable Name', variable type, arr size (1 if not array), # length of string (if defined in PLC)) """ byte_values = bytes_from_dict(value, structure_def) if structure_size is None: structure_size = size_of_structure(structure_def * array_size) return self.write_by_name( data_name, byte_values, c_ubyte * structure_size, handle=handle )
[docs] def add_device_notification( self, data: Union[str, Tuple[int, int]], attr: NotificationAttrib, callback: Callable, user_handle: Optional[int] = None, ) -> Optional[Tuple[int, int]]: """Add a device notification. :param Union[str, Tuple[int, int] data: PLC storage address as string or Tuple with index group and offset :param pyads.structs.NotificationAttrib attr: object that contains all the attributes for the definition of a notification :param callback: callback function that gets executed in the event of a notification :param user_handle: optional user handle :rtype: (int, int) :returns: notification handle, user handle Save the notification handle and the user handle on creating a notification if you want to be able to remove the notification later in your code. **Usage**: >>> import pyads >>> from ctypes import sizeof >>> >>> # Connect to the local TwinCAT PLC >>> plc = pyads.Connection('127.0.0.1.1.1', 851) >>> >>> # Create callback function that prints the value >>> def mycallback(notification, data): >>> contents = notification.contents >>> value = next( >>> map(int, >>> bytearray(contents.data)[0:contents.cbSampleSize]) >>> ) >>> print(value) >>> >>> with plc: >>> # Add notification with default settings >>> atr = pyads.NotificationAttrib(sizeof(pyads.PLCTYPE_INT)) >>> handles = plc.add_device_notification("GVL.myvalue", atr, mycallback) >>> >>> # Remove notification >>> plc.del_device_notification(handles) Note: the `user_handle` (passed or returned) is the same as the handle returned from :meth:`Connection.get_handle()`. """ if self._port is not None: notification_handle, user_handle = adsSyncAddDeviceNotificationReqEx( self._port, self._adr, data, attr, callback, user_handle ) return notification_handle, user_handle return None
[docs] def del_device_notification( self, notification_handle: int, user_handle: int ) -> None: """Remove a device notification. :param notification_handle: address of the variable that contains the handle of the notification :param user_handle: user handle """ if self._port is not None: adsSyncDelDeviceNotificationReqEx( self._port, self._adr, notification_handle, user_handle )
@property def is_open(self) -> bool: """Show the current connection state. :return: True if connection is open """ return self._open
[docs] def set_timeout(self, ms: int) -> None: """Set Timeout.""" if self._port is not None: adsSyncSetTimeoutEx(self._port, ms)
[docs] def notification( self, plc_datatype: Optional[Type] = None, timestamp_as_filetime: bool = False ) -> Callable: """Decorate a callback function. **Decorator**. A decorator that can be used for callback functions in order to convert the data of the NotificationHeader into the fitting Python type. :param plc_datatype: The PLC datatype that needs to be converted. This can be any basic PLC datatype or a `ctypes.Structure`. :param timestamp_as_filetime: Whether the notification timestamp should be returned as `datetime.datetime` (False) or Windows `FILETIME` as originally transmitted via ADS (True). Be aware that the precision of `datetime.datetime` is limited to microseconds, while FILETIME allows for 100 ns. This may be relevant when using task cycle times such as 62.5 µs. Default: False. The callback functions need to be of the following type: >>> def callback(handle, name, timestamp, value) * `handle`: the notification handle * `name`: the variable name * `timestamp`: the timestamp as datetime value * `value`: the converted value of the variable **Usage**: >>> import pyads >>> >>> plc = pyads.Connection('172.18.3.25.1.1', 851) >>> >>> >>> @plc.notification(pyads.PLCTYPE_STRING) >>> def callback(handle, name, timestamp, value): >>> print(handle, name, timestamp, value) >>> >>> >>> with plc: >>> attr = pyads.NotificationAttrib(20, >>> pyads.ADSTRANS_SERVERCYCLE) >>> handles = plc.add_device_notification('GVL.test', attr, >>> callback) >>> while True: >>> pass """ def notification_decorator( func: Callable[[int, str, Union[datetime, int], Any], None] ) -> Callable[[Any, str], None]: def func_wrapper(notification: Any, data_name: str) -> None: h_notification, timestamp, value = self.parse_notification( notification, plc_datatype, timestamp_as_filetime ) return func(h_notification, data_name, timestamp, value) return func_wrapper return notification_decorator
# noinspection PyMethodMayBeStatic
[docs] def parse_notification( self, notification: Any, plc_datatype: Optional[Type], timestamp_as_filetime: bool = False, ) -> Tuple[int, Union[datetime, int], Any]: # noinspection PyTypeChecker """Parse a notification. Convert the data of the NotificationHeader into the fitting Python type. :param notification: The notification we recieve from PLC datatype to be converted. This can be any basic PLC datatype or a `ctypes.Structure`. :param plc_datatype: The PLC datatype that needs to be converted. This can be any basic PLC datatype or a `ctypes.Structure`. :param timestamp_as_filetime: Whether the notification timestamp should be returned as `datetime.datetime` (False) or Windows `FILETIME` as originally transmitted via ADS (True). Be aware that the precision of `datetime.datetime` is limited to microseconds, while FILETIME allows for 100 ns. This may be relevant when using task cycle times such as 62.5 µs. Default: False. :rtype: (int, int, Any) :returns: notification handle, timestamp, value **Usage**: >>> import pyads >>> from ctypes import sizeof >>> >>> # Connect to the local TwinCAT PLC >>> plc = pyads.Connection('127.0.0.1.1.1', 851) >>> tag = {"GVL.myvalue": pyads.PLCTYPE_INT} >>> >>> # Create callback function that prints the value >>> def mycallback(notification: SAdsNotificationHeader, data: str) -> None: >>> data_type = tag[data] >>> handle, timestamp, value = plc.parse_notification(notification, data_type) >>> print(value) >>> >>> with plc: >>> # Add notification with default settings >>> attr = pyads.NotificationAttrib(sizeof(pyads.PLCTYPE_INT)) >>> >>> handles = plc.add_device_notification("GVL.myvalue", attr, mycallback) >>> >>> # Remove notification >>> plc.del_device_notification(handles) """ contents = notification.contents data_size = contents.cbSampleSize # Get dynamically sized data array data = (c_ubyte * data_size).from_address( addressof(contents) + SAdsNotificationHeader.data.offset ) value: Any if plc_datatype == PLCTYPE_STRING: # read only until null-termination character value = bytearray(data).split(b"\0", 1)[0].decode("utf-8") elif plc_datatype is not None and issubclass(plc_datatype, Structure): value = plc_datatype() fit_size = min(data_size, sizeof(value)) memmove(addressof(value), addressof(data), fit_size) elif plc_datatype is not None and issubclass(plc_datatype, Array): if data_size == sizeof(plc_datatype): value = list(plc_datatype.from_buffer_copy(bytes(data))) else: # invalid size value = None elif plc_datatype not in DATATYPE_MAP: value = bytearray(data) else: value = struct.unpack(DATATYPE_MAP[plc_datatype], bytearray(data))[0] if timestamp_as_filetime: timestamp = contents.nTimeStamp else: timestamp = filetime_to_dt(contents.nTimeStamp) return contents.hNotification, timestamp, value