"""Client for accessing the NI Data Moniker Service."""
from __future__ import annotations
import logging
import sys
import threading
from types import TracebackType
from typing import Iterator, TYPE_CHECKING
import grpc
from ni.datamonikers.v1 import data_moniker_pb2
from ni.datamonikers.v1 import data_moniker_pb2_grpc
from ni_grpc_extensions.channelpool import GrpcChannelPool
if TYPE_CHECKING:
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
_logger = logging.getLogger(__name__)
[docs]
class MonikerClient:
"""Client for accessing the NI Data Moniker Service."""
__slots__ = (
"_lock",
"_owns_grpc_channel_pool",
"_service_location",
"_grpc_channel_pool",
"_stub",
)
_lock: threading.Lock
_owns_grpc_channel_pool: bool
_service_location: str | None
_grpc_channel_pool: GrpcChannelPool | None
_stub: data_moniker_pb2_grpc.MonikerServiceStub | None
def __init__(
self,
*,
service_location: str | None = None,
grpc_channel: grpc.Channel | None = None,
grpc_channel_pool: GrpcChannelPool | None = None,
) -> None:
"""Initialize the Moniker Client.
Args:
service_location: The address of the data moniker service location (recommended).
grpc_channel: A data moniker gRPC channel (optional).
grpc_channel_pool: A gRPC channel pool (recommended).
Either `service_location` or `grpc_channel` must be provided. If both are provided,
`grpc_channel` takes precedence.
"""
if service_location is None and grpc_channel is None:
raise ValueError("Either 'service_location' or 'grpc_channel' must be provided.")
self._lock = threading.Lock()
self._owns_grpc_channel_pool = False
self._service_location = service_location
self._grpc_channel_pool = grpc_channel_pool
self._stub = (
data_moniker_pb2_grpc.MonikerServiceStub(grpc_channel)
if grpc_channel is not None
else None
)
[docs]
def __enter__(self) -> Self:
"""Enter the runtime context of the MonikerClient."""
return self
[docs]
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
traceback: TracebackType | None,
) -> None:
"""Exit the runtime context of the MonikerClient."""
self.close()
[docs]
def close(self) -> None:
"""Close the client and clean up resources that it owns."""
with self._lock:
self._stub = None
if self._owns_grpc_channel_pool and self._grpc_channel_pool:
self._grpc_channel_pool.close()
self._grpc_channel_pool = None
self._owns_grpc_channel_pool = False
def _get_stub(self) -> data_moniker_pb2_grpc.MonikerServiceStub:
if self._stub is None:
with self._lock:
if self._grpc_channel_pool is None:
_logger.debug("Creating unshared GrpcChannelPool.")
self._grpc_channel_pool = GrpcChannelPool()
self._owns_grpc_channel_pool = True
if self._stub is None:
channel = self._grpc_channel_pool.get_channel(self._service_location) # type: ignore
self._stub = data_moniker_pb2_grpc.MonikerServiceStub(channel)
return self._stub
[docs]
def begin_sideband_stream(
self, request: data_moniker_pb2.BeginMonikerSidebandStreamRequest
) -> data_moniker_pb2.BeginMonikerSidebandStreamResponse:
"""Begin a sideband stream."""
return self._get_stub().BeginSidebandStream(request)
[docs]
def stream_read(
self, moniker_list: data_moniker_pb2.MonikerList
) -> Iterator[data_moniker_pb2.MonikerReadResult]:
"""Stream read data from monikers."""
return self._get_stub().StreamRead(moniker_list)
[docs]
def stream_write(
self, requests: Iterator[data_moniker_pb2.MonikerWriteRequest]
) -> Iterator[data_moniker_pb2.StreamWriteResponse]:
"""Stream write data to monikers."""
return self._get_stub().StreamWrite(requests)
[docs]
def stream_read_write(
self, requests: Iterator[data_moniker_pb2.MonikerWriteRequest]
) -> Iterator[data_moniker_pb2.MonikerReadResult]:
"""Stream read and write data with monikers."""
return self._get_stub().StreamReadWrite(requests)
[docs]
def read_from_moniker(
self, moniker: data_moniker_pb2.Moniker
) -> data_moniker_pb2.ReadFromMonikerResult:
"""Read data from a moniker."""
return self._get_stub().ReadFromMoniker(moniker)
[docs]
def write_to_moniker(
self, request: data_moniker_pb2.WriteToMonikerRequest
) -> data_moniker_pb2.WriteToMonikerResponse:
"""Write data to a moniker."""
return self._get_stub().WriteToMoniker(request)