Gitlab - Argos ALM by PALO IT

refactor: Change root package to pit.lib

parent 03f7a8e1
......@@ -18,9 +18,10 @@ A Python library that provides a common interface to different message brokers s
poetry install
```
2. Import the necessary classes from the library:
```python
from py_stream.event_notification import EventNotification
from py_stream.settings import EventConfig
from pit.lib.py_stream import EventNotification
from pit.lib.py_stream.settings import EventConfig
```
3. Create an instance of `EventConfig` with your topic bindings:
```python
......@@ -39,8 +40,8 @@ To use this library, you need to create an instance of `EventNotification` and `
Here is an example of how to publish an event:
```python
from py_stream.event_notification import EventPubSubNotification
from py_stream.settings import EventConfig
from pit.lib.py_stream import EventPubSubNotification
from pit.lib.py_stream.settings import EventConfig
from google.cloud import pubsub_v1
# Create an instance of EventConfig with your topic bindings
......@@ -50,7 +51,7 @@ event_config = EventConfig(bindings={"my-topic-alias": "real.topic.name"})
event_notification = EventPubSubNotification(pubsub_v1.PublisherClient(), "gcp-project-name", event_config)
# Now you can use event_notification.notify() to publish messages to your topic
event_notification.notify("my-topic", { "name":"Dave","last_name":"Jhones" })
event_notification.notify("my-topic", {"name": "Dave", "last_name": "Jhones"})
```
Here is an example of how to consume an event:
......@@ -58,8 +59,8 @@ Here is an example of how to consume an event:
```python
from google.cloud import pubsub_v1
from py_stream.event_handler import EventRouter, PubSubEventHandler
from py_stream.settings import EventConfig
from pit.lib.py_stream.event_handler import EventRouter, PubSubEventHandler
from pit.lib.py_stream.settings import EventConfig
# Create an instance of EventConfig with your topic bindings
event_config = EventConfig(bindings={"my-subscription-alias": "real-subscription-name"})
......@@ -72,7 +73,7 @@ def my_function(data):
print(data)
event_handler:PubSubEventHandler = PubSubEventHandler(
event_handler: PubSubEventHandler = PubSubEventHandler(
pubsub_v1.SubscriberClient(),
pubsub_v1.PublisherClient(),
"gcp-project-name",
......
[tool.poetry]
name = "py-stream"
version = "0.1.0"
version = "2.0.0"
description = ""
authors = ["Miguel Galindo Rodriguez <mgalindo@palo-it.com>"]
readme = "README.md"
......
import logging
from dataclasses import dataclass, field
from google.cloud import pubsub_v1
from py_stream.settings import EventConfig
logger = logging.getLogger(__name__)
class EventRouter:
def __init__(self):
self.callbacks = {}
def event(self, binding: str):
def decorator(func):
self.callbacks[binding] = func
return func
return decorator
@dataclass(repr=False, eq=False)
class PubSubEventHandler:
subscriber: pubsub_v1.SubscriberClient
publisher: pubsub_v1.PublisherClient
project_name: str
event_config: EventConfig
routers: list = field(init=False, default_factory=list)
def add_router(self, router: EventRouter):
self.routers.append(router)
def start_listening(self):
for router in self.routers:
for binding, callback in router.callbacks.items():
subscription_name = self.event_config.bindings.get(binding)
subscription_path = self.subscriber.subscription_path(
self.project_name, subscription_name
)
subscription_exist = self._is_subscription_exist(subscription_path)
if subscription_exist:
self.subscriber.subscribe(subscription_path, callback=callback)
else:
name = getattr(callback, "__name__", repr(callback))
logger.error(
"The subscription [%s] does not exist, the callback [%s] will not be executed",
subscription_name,
name,
)
def _is_subscription_exist(self, subscription_path):
try:
self.subscriber.get_subscription(
request={"subscription": subscription_path}
)
return True
except Exception as e:
logger.warning(
"The subscription [%s] does not exist, reason: [%s]",
subscription_path,
str(e),
)
return False
import abc
import json
import logging
from dataclasses import dataclass
from py_stream.settings import EventConfig
from google.cloud import pubsub_v1
logger = logging.getLogger(__name__)
class EventNotification(metaclass=abc.ABCMeta):
@abc.abstractmethod
def notify(self, binding: str, message: dict | str, headers: dict = None):
raise NotImplementedError
@dataclass(repr=False, eq=False)
class EventPubSubNotification(EventNotification):
client: pubsub_v1.PublisherClient
project_name: str
event_config: EventConfig
def notify(self, binding: str, message: dict | str, headers: dict = None):
logger.debug("Notify [%s] => [%s]", message, binding)
topic = self.event_config.bindings.get(binding)
topic_path = self.client.topic_path(self.project_name, topic)
message_str = (
message if isinstance(message, str) else json.dumps(message, default=str)
)
data = message_str.encode("utf-8")
if headers is not None:
self.client.publish(topic_path, data, **headers)
else:
self.client.publish(topic_path, data)
from typing import Literal
from pydantic import BaseModel, Field
class EventPubSubConfig(BaseModel):
project_name: str = Field(default="proj", description="GCP Project Name")
class EventConfig(BaseModel):
type: Literal["pubsub"] = Field(description="Messaging provider", default="pubsub")
pubsub: EventPubSubConfig | None = None
bindings: dict[str, str] = Field(
description="Map(str,str), key is the binding name used in code to be an alias of a topic or subscription,"
"this is used to get the name of the topic or subscription contained in the value",
default={},
)
from unittest.mock import Mock
from google.cloud import pubsub_v1
from pytest import fixture
from py_stream.event_handler import EventRouter, PubSubEventHandler
from py_stream.settings import EventConfig
@fixture
def router() -> EventRouter:
router = EventRouter()
@router.event("tes_event")
def test_event(event):
pass
return router
@fixture
def pubsub_event_config() -> EventConfig:
return EventConfig(bindings={"test_event": "test_event"})
@fixture
def pub_sub_event_handler(router, pubsub_event_config) -> PubSubEventHandler:
return PubSubEventHandler(
subscriber=Mock(spec_set=pubsub_v1.SubscriberClient),
publisher=Mock(spec_set=f),
project_name="test_project",
event_config=pubsub_event_config,
)
def test_add_router(pub_sub_event_handler, router):
# Act
pub_sub_event_handler.add_router(router)
# Assert
assert router in pub_sub_event_handler.routers
def test_start_listening(pub_sub_event_handler, router):
# Arrange
pub_sub_event_handler.add_router(router)
# Act
pub_sub_event_handler.start_listening()
# Assert
pub_sub_event_handler.subscriber.subscribe.assert_called_once()
pub_sub_event_handler.subscriber.subscription_path.assert_called_once()
pub_sub_event_handler.subscriber.get_subscription.assert_called_once()
def test_start_listening_subscription_not_exist(pub_sub_event_handler, router):
# Arrange
pub_sub_event_handler.add_router(router)
pub_sub_event_handler.subscriber.get_subscription.side_effect = Exception
# Act
pub_sub_event_handler.start_listening()
# Assert
pub_sub_event_handler.subscriber.subscribe.assert_not_called()
import json
from unittest.mock import MagicMock
import pytest
from py_stream.event_notification import EventPubSubNotification
from py_stream.settings import EventConfig
@pytest.fixture
def pubsub_client():
return MagicMock()
@pytest.fixture
def event_config():
return EventConfig(bindings={"my-topic": "job-created"})
@pytest.fixture
def event_notification(pubsub_client, event_config):
project_name = "my-project"
return EventPubSubNotification(pubsub_client, project_name, event_config)
def test_notify_with_string_message(pubsub_client, event_notification):
# Arrange
topic = "my-topic"
message = "Hello, world!"
headers = {"header1": "value1", "header2": "value2"}
# Act
event_notification.notify(topic, message, headers)
# Assert
topic_path = pubsub_client.topic_path.return_value
pubsub_client.publish.assert_called_once_with(
topic_path, message.encode("utf-8"), **headers
)
def test_notify_with_string_message_no_headers(pubsub_client, event_notification):
# Arrange
topic = "my-topic"
message = "Hello, world!"
# Act
event_notification.notify(topic, message)
# Assert
topic_path = pubsub_client.topic_path.return_value
pubsub_client.publish.assert_called_once_with(
topic_path, message.encode("utf-8")
)
def test_notify_with_dict_message(pubsub_client, event_notification):
# Arrange
topic = "my-topic"
message = {"key": "value"}
headers = {"header1": "value1", "header2": "value2"}
# Act
event_notification.notify(topic, message, headers)
# Assert
topic_path = pubsub_client.topic_path.return_value
message_str = json.dumps(message, default=str)
pubsub_client.publish.assert_called_once_with(
topic_path, message_str.encode("utf-8"), **headers
)
def test_topic_not_exists__rise_error(
pubsub_client, event_notification: EventPubSubNotification
):
# Arrange
topic = "my-topic"
message = "Hello, world!"
headers = {"header1": "value1", "header2": "value2"}
pubsub_client.publish.side_effect = RuntimeError("Topic not found")
with pytest.raises(RuntimeError, match="Topic not found"):
event_notification.notify(topic, message, headers)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment