Gitlab - Argos ALM by PALO IT

feat: added py-stream

A Python PaloIT library that provides a common interface to different message brokers such as PubSub, Kafka, RabbitMQ. It abstracts the complexity of using these brokers' APIs directly, allowing developers to focus on their application logic.
parent 7a3457f1
# Created by https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,pycharm
# Edit at https://www.toptal.com/developers/gitignore?templates=python,visualstudiocode,pycharm
### PyCharm ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# SonarLint plugin
.idea/sonarlint/
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
### PyCharm Patch ###
# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721
# *.iml
# modules.xml
# .idea/misc.xml
# *.ipr
# Sonarlint plugin
# https://plugins.jetbrains.com/plugin/7973-sonarlint
.idea/**/sonarlint/
# SonarQube Plugin
# https://plugins.jetbrains.com/plugin/7238-sonarqube-community-plugin
.idea/**/sonarIssues.xml
# Markdown Navigator plugin
# https://plugins.jetbrains.com/plugin/7896-markdown-navigator-enhanced
.idea/**/markdown-navigator.xml
.idea/**/markdown-navigator-enh.xml
.idea/**/markdown-navigator/
# Cache file creation bug
# See https://youtrack.jetbrains.com/issue/JBR-2257
.idea/$CACHE_FILE$
# CodeStream plugin
# https://plugins.jetbrains.com/plugin/12206-codestream
.idea/codestream.xml
# Azure Toolkit for IntelliJ plugin
# https://plugins.jetbrains.com/plugin/8053-azure-toolkit-for-intellij
.idea/**/azureSettings.xml
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
poetry.toml
# ruff
.ruff_cache/
# LSP config files
pyrightconfig.json
### VisualStudioCode ###
.vscode/
# Local History for Visual Studio Code
.history/
# Built Visual Studio Code Extensions
*.vsix
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide
# End of https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,pycharm
\ No newline at end of file
# py-stream
A Python library that provides a common interface to different message brokers such as PubSub, Kafka, RabbitMQ. It abstracts the complexity of using these brokers' APIs directly, allowing developers to focus on their application logic.
**Currently, in version 1.0.0, only PubSub is supported.** The library includes features like EventRouter and EventNotification for efficient message routing and notification.
## Features
- **Abstraction over different brokers**: This library provides a common interface to different brokers such as PubSub, Kafka, RabbitMQ. This abstraction simplifies the complexity of using the APIs of these brokers directly.
- **EventRouter**: It allows you to decorate functions that will work as consumers.
- **EventNotification.notify**: It publishes an event by looking up the topic name within `EventConfig.bindings[str,str]`.
- **Topic name abstraction**: The reason for not using the topic name directly in the code and using `EventConfig.bindings` is to allow changing the topic name without altering the code. The real name is the value of the dictionary `EventConfig.bindings`.
## Quick Start
1. Install the library using pip:
```bash
poetry install
```
2. Import the necessary classes from the library:
```python
from py_stream.event_notification import EventNotification
from py_stream.settings import EventConfig
```
3. Create an instance of `EventConfig` with your topic bindings:
```python
event_config = EventConfig(bindings={"my-topic-alias": "job-created"})
```
4. Create an instance of `EventNotification` with your PubSub client, project name, and event config:
```python
event_notification = EventPubSubNotification(pubsub_client, "gcp-project-name", event_config)
```
5. Now you can use `event_notification.notify("my-topic-alias", { "name":"Dave","last_name":"Jhones" })` to publish messages to your topic.
## Usage
To use this library, you need to create an instance of `EventNotification` and `EventConfig`. `EventConfig` holds the bindings for your topics. These bindings allow you to change the topic name without altering the code.
Here is an example of how to publish an event:
```python
from py_stream.event_notification import EventPubSubNotification
from py_stream.settings import EventConfig
from google.cloud import pubsub_v1
# Create an instance of EventConfig with your topic bindings
event_config = EventConfig(bindings={"my-topic-alias": "real.topic.name"})
# Create an instance of EventNotification with your PubSub client, project name, and event config
event_notification = EventPubSubNotification(pubsub_v1.PublisherClient(), "gcp-project-name", event_config)
# Now you can use event_notification.notify() to publish messages to your topic
event_notification.notify("my-topic", { "name":"Dave","last_name":"Jhones" })
```
Here is an example of how to consume an event:
```python
from google.cloud import pubsub_v1
from py_stream.event_handler import EventRouter, PubSubEventHandler
from py_stream.settings import EventConfig
# Create an instance of EventConfig with your topic bindings
event_config = EventConfig(bindings={"my-subscription-alias": "real-subscription-name"})
event_router = EventRouter()
# Decorate your function with event_router.subscribe() to consume messages from your topic
@event_router.event("my-subscription-alias")
def my_function(data):
print(data)
event_handler:PubSubEventHandler = PubSubEventHandler(
pubsub_v1.SubscriberClient(),
pubsub_v1.PublisherClient(),
"gcp-project-name",
event_config)
event_handler.add_router(event_router)
# Start the event router
event_handler.start_listening()
while True:
pass
```
## Notes
- Event Handler needs something to keep running the script to keep listening to the messages. In the example above, we used a `while True: pass` loop. In a real application, you might want to use a more sophisticated mechanism to keep the script running.
- The `EventRouter` class is responsible for routing messages to the correct function. It uses the `EventConfig` class to look up the subscription name.
- This library is compatible to FastAPI, is recommended start the PubSubEventHandler inside the lifespan function. See [Lifespan Events](https://fastapi.tiangolo.com/advanced/events/)
\ No newline at end of file
This diff is collapsed.
[tool.poetry]
name = "py-stream"
version = "0.1.0"
description = ""
authors = ["Miguel Galindo Rodriguez <mgalindo@palo-it.com>"]
readme = "README.md"
packages = [{include = "py_stream", from = "src"}]
[tool.poetry.dependencies]
python = "^3.11"
pydantic = "^2.7.0"
google-cloud-pubsub = "^2.21.1"
[tool.poetry.group.test.dependencies]
black = "^24.4.0"
pytest = "^8.1.1"
pytest-cov = "^5.0.0"
pytest-asyncio = "^0.23.6"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
testpaths = "tests"
filterwarnings = ["error", "ignore:The 'app' shortcut is now deprecated"]
log_cli = false
log_cli_level = "INFO"
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
log_cli_date_format = "%Y-%m-%d %H:%M:%S"
[tool.coverage.run]
branch = true
source = ['src']
[tool.coverage.report]
skip_empty = true
exclude_also = ["def __repr__", "raise AssertionError", "raise NotImplementedError", "@(abc\\.)?abstractmethod", "pass"]
import logging
from dataclasses import dataclass, field
from google.cloud import pubsub_v1
from py_stream.settings import EventConfig
logger = logging.getLogger(__name__)
class EventRouter:
def __init__(self):
self.callbacks = {}
def event(self, binding: str):
def decorator(func):
self.callbacks[binding] = func
return func
return decorator
@dataclass(repr=False, eq=False)
class PubSubEventHandler:
subscriber: pubsub_v1.SubscriberClient
publisher: pubsub_v1.PublisherClient
project_name: str
event_config: EventConfig
routers: list = field(init=False, default_factory=list)
def add_router(self, router: EventRouter):
self.routers.append(router)
def start_listening(self):
for router in self.routers:
for binding, callback in router.callbacks.items():
subscription_name = self.event_config.bindings.get(binding)
subscription_path = self.subscriber.subscription_path(
self.project_name, subscription_name
)
subscription_exist = self._is_subscription_exist(subscription_path)
if subscription_exist:
self.subscriber.subscribe(subscription_path, callback=callback)
else:
name = getattr(callback, "__name__", repr(callback))
logger.error(
"The subscription [%s] does not exist, the callback [%s] will not be executed",
subscription_name,
name,
)
def _is_subscription_exist(self, subscription_path):
try:
self.subscriber.get_subscription(
request={"subscription": subscription_path}
)
return True
except Exception as e:
logger.warning(
"The subscription [%s] does not exist, reason: [%s]",
subscription_path,
str(e),
)
return False
import abc
import json
import logging
from dataclasses import dataclass
from py_stream.settings import EventConfig
from google.cloud import pubsub_v1
logger = logging.getLogger(__name__)
class EventNotification(metaclass=abc.ABCMeta):
@abc.abstractmethod
def notify(self, binding: str, message: dict | str, headers: dict = None):
raise NotImplementedError
@dataclass(repr=False, eq=False)
class EventPubSubNotification(EventNotification):
client: pubsub_v1.PublisherClient
project_name: str
event_config: EventConfig
def notify(self, binding: str, message: dict | str, headers: dict = None):
logger.debug("Notify [%s] => [%s]", message, binding)
topic = self.event_config.bindings.get(binding)
topic_path = self.client.topic_path(self.project_name, topic)
message_str = (
message if isinstance(message, str) else json.dumps(message, default=str)
)
data = message_str.encode("utf-8")
if headers is not None:
self.client.publish(topic_path, data, **headers)
else:
self.client.publish(topic_path, data)
from typing import Literal
from pydantic import BaseModel, Field
class EventPubSubConfig(BaseModel):
project_name: str = Field(default="proj", description="GCP Project Name")
class EventConfig(BaseModel):
type: Literal["pubsub"] = Field(description="Messaging provider", default="pubsub")
pubsub: EventPubSubConfig | None = None
bindings: dict[str, str] = Field(
description="Map(str,str), key is the binding name used in code to be an alias of a topic or subscription,"
"this is used to get the name of the topic or subscription contained in the value",
default={},
)
from unittest.mock import Mock
from google.cloud import pubsub_v1
from pytest import fixture
from py_stream.event_handler import EventRouter, PubSubEventHandler
from py_stream.settings import EventConfig
@fixture
def router() -> EventRouter:
router = EventRouter()
@router.event("tes_event")
def test_event(event):
pass
return router
@fixture
def pubsub_event_config() -> EventConfig:
return EventConfig(bindings={"test_event": "test_event"})
@fixture
def pub_sub_event_handler(router, pubsub_event_config) -> PubSubEventHandler:
return PubSubEventHandler(
subscriber=Mock(spec_set=pubsub_v1.SubscriberClient),
publisher=Mock(spec_set=f),
project_name="test_project",
event_config=pubsub_event_config,
)
def test_add_router(pub_sub_event_handler, router):
# Act
pub_sub_event_handler.add_router(router)
# Assert
assert router in pub_sub_event_handler.routers
def test_start_listening(pub_sub_event_handler, router):
# Arrange
pub_sub_event_handler.add_router(router)
# Act
pub_sub_event_handler.start_listening()
# Assert
pub_sub_event_handler.subscriber.subscribe.assert_called_once()
pub_sub_event_handler.subscriber.subscription_path.assert_called_once()
pub_sub_event_handler.subscriber.get_subscription.assert_called_once()
def test_start_listening_subscription_not_exist(pub_sub_event_handler, router):
# Arrange
pub_sub_event_handler.add_router(router)
pub_sub_event_handler.subscriber.get_subscription.side_effect = Exception
# Act
pub_sub_event_handler.start_listening()
# Assert
pub_sub_event_handler.subscriber.subscribe.assert_not_called()
import json
from unittest.mock import MagicMock
import pytest
from py_stream.event_notification import EventPubSubNotification
from py_stream.settings import EventConfig
@pytest.fixture
def pubsub_client():
return MagicMock()
@pytest.fixture
def event_config():
return EventConfig(bindings={"my-topic": "job-created"})
@pytest.fixture
def event_notification(pubsub_client, event_config):
project_name = "my-project"
return EventPubSubNotification(pubsub_client, project_name, event_config)
def test_notify_with_string_message(pubsub_client, event_notification):
# Arrange
topic = "my-topic"
message = "Hello, world!"
headers = {"header1": "value1", "header2": "value2"}
# Act
event_notification.notify(topic, message, headers)
# Assert
topic_path = pubsub_client.topic_path.return_value
pubsub_client.publish.assert_called_once_with(
topic_path, message.encode("utf-8"), **headers
)
def test_notify_with_string_message_no_headers(pubsub_client, event_notification):
# Arrange
topic = "my-topic"
message = "Hello, world!"
# Act
event_notification.notify(topic, message)
# Assert
topic_path = pubsub_client.topic_path.return_value
pubsub_client.publish.assert_called_once_with(
topic_path, message.encode("utf-8")
)
def test_notify_with_dict_message(pubsub_client, event_notification):
# Arrange
topic = "my-topic"
message = {"key": "value"}
headers = {"header1": "value1", "header2": "value2"}
# Act
event_notification.notify(topic, message, headers)
# Assert
topic_path = pubsub_client.topic_path.return_value
message_str = json.dumps(message, default=str)
pubsub_client.publish.assert_called_once_with(
topic_path, message_str.encode("utf-8"), **headers
)
def test_topic_not_exists__rise_error(
pubsub_client, event_notification: EventPubSubNotification
):
# Arrange
topic = "my-topic"
message = "Hello, world!"
headers = {"header1": "value1", "header2": "value2"}
pubsub_client.publish.side_effect = RuntimeError("Topic not found")
with pytest.raises(RuntimeError, match="Topic not found"):
event_notification.notify(topic, message, headers)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment