Gitlab - Argos ALM by PALO IT
Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
P
py-stream
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Analytics
Analytics
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Commits
Open sidebar
pit-libs
python
py-stream
Commits
7e4f0b23
Commit
7e4f0b23
authored
Apr 15, 2024
by
Miguel Galindo Rodriguez
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
refactor: Change root package to pit
parent
efdfac5a
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
282 additions
and
8 deletions
+282
-8
README.md
README.md
+6
-6
pyproject.toml
pyproject.toml
+2
-2
src/pit/py_stream/__init__.py
src/pit/py_stream/__init__.py
+0
-0
src/pit/py_stream/event_handler.py
src/pit/py_stream/event_handler.py
+68
-0
src/pit/py_stream/event_notification.py
src/pit/py_stream/event_notification.py
+38
-0
src/pit/py_stream/settings.py
src/pit/py_stream/settings.py
+17
-0
tests/pit/py_stream/__init__.py
tests/pit/py_stream/__init__.py
+0
-0
tests/pit/py_stream/test_event_handler.py
tests/pit/py_stream/test_event_handler.py
+66
-0
tests/pit/py_stream/test_event_notification.py
tests/pit/py_stream/test_event_notification.py
+85
-0
No files found.
README.md
View file @
7e4f0b23
...
@@ -20,8 +20,8 @@ poetry install
...
@@ -20,8 +20,8 @@ poetry install
2.
Import the necessary classes from the library:
2.
Import the necessary classes from the library:
```
python
```
python
from
pit.
lib.py_stream
import
EventNotification
from
pit.
py_stream.event_notification
import
EventNotification
from
pit.
lib.
py_stream.settings
import
EventConfig
from
pit.py_stream.settings
import
EventConfig
```
```
3.
Create an instance of
`EventConfig`
with your topic bindings:
3.
Create an instance of
`EventConfig`
with your topic bindings:
```
python
```
python
...
@@ -40,8 +40,8 @@ To use this library, you need to create an instance of `EventNotification` and `
...
@@ -40,8 +40,8 @@ To use this library, you need to create an instance of `EventNotification` and `
Here is an example of how to publish an event:
Here is an example of how to publish an event:
```
python
```
python
from
pit.
lib.py_stream
import
EventPubSubNotification
from
pit.
py_stream.event_notification
import
EventPubSubNotification
from
pit.
lib.
py_stream.settings
import
EventConfig
from
pit.py_stream.settings
import
EventConfig
from
google.cloud
import
pubsub_v1
from
google.cloud
import
pubsub_v1
# Create an instance of EventConfig with your topic bindings
# Create an instance of EventConfig with your topic bindings
...
@@ -59,8 +59,8 @@ Here is an example of how to consume an event:
...
@@ -59,8 +59,8 @@ Here is an example of how to consume an event:
```
python
```
python
from
google.cloud
import
pubsub_v1
from
google.cloud
import
pubsub_v1
from
pit.
lib.
py_stream.event_handler
import
EventRouter
,
PubSubEventHandler
from
pit.py_stream.event_handler
import
EventRouter
,
PubSubEventHandler
from
pit.
lib.
py_stream.settings
import
EventConfig
from
pit.py_stream.settings
import
EventConfig
# Create an instance of EventConfig with your topic bindings
# Create an instance of EventConfig with your topic bindings
event_config
=
EventConfig
(
bindings
=
{
"my-subscription-alias"
:
"real-subscription-name"
})
event_config
=
EventConfig
(
bindings
=
{
"my-subscription-alias"
:
"real-subscription-name"
})
...
...
pyproject.toml
View file @
7e4f0b23
[tool.poetry]
[tool.poetry]
name
=
"py-stream"
name
=
"py-stream"
version
=
"
2
.0.0"
version
=
"
3
.0.0"
description
=
""
description
=
""
authors
=
[
"Miguel Galindo Rodriguez <mgalindo@palo-it.com>"
]
authors
=
[
"Miguel Galindo Rodriguez <mgalindo@palo-it.com>"
]
readme
=
"README.md"
readme
=
"README.md"
packages
=
[
{include
=
"p
y_stream
"
,
from
=
"src"
}
]
packages
=
[
{include
=
"p
it
"
,
from
=
"src"
}
]
[tool.poetry.dependencies]
[tool.poetry.dependencies]
python
=
"^3.11"
python
=
"^3.11"
...
...
src/pit/py_stream/__init__.py
0 → 100644
View file @
7e4f0b23
src/pit/py_stream/event_handler.py
0 → 100644
View file @
7e4f0b23
import
logging
from
dataclasses
import
dataclass
,
field
from
google.cloud
import
pubsub_v1
from
pit.py_stream.settings
import
EventConfig
logger
=
logging
.
getLogger
(
__name__
)
class
EventRouter
:
def
__init__
(
self
):
self
.
callbacks
=
{}
def
event
(
self
,
binding
:
str
):
def
decorator
(
func
):
self
.
callbacks
[
binding
]
=
func
return
func
return
decorator
@
dataclass
(
repr
=
False
,
eq
=
False
)
class
PubSubEventHandler
:
subscriber
:
pubsub_v1
.
SubscriberClient
publisher
:
pubsub_v1
.
PublisherClient
project_name
:
str
event_config
:
EventConfig
routers
:
list
=
field
(
init
=
False
,
default_factory
=
list
)
def
add_router
(
self
,
router
:
EventRouter
):
self
.
routers
.
append
(
router
)
def
start_listening
(
self
):
for
router
in
self
.
routers
:
for
binding
,
callback
in
router
.
callbacks
.
items
():
subscription_name
=
self
.
event_config
.
bindings
.
get
(
binding
)
subscription_path
=
self
.
subscriber
.
subscription_path
(
self
.
project_name
,
subscription_name
)
subscription_exist
=
self
.
_is_subscription_exist
(
subscription_path
)
if
subscription_exist
:
self
.
subscriber
.
subscribe
(
subscription_path
,
callback
=
callback
)
else
:
name
=
getattr
(
callback
,
"__name__"
,
repr
(
callback
))
logger
.
error
(
"The subscription [%s] does not exist, the callback [%s] will not be executed"
,
subscription_name
,
name
,
)
def
_is_subscription_exist
(
self
,
subscription_path
):
try
:
self
.
subscriber
.
get_subscription
(
request
=
{
"subscription"
:
subscription_path
}
)
return
True
except
Exception
as
e
:
logger
.
warning
(
"The subscription [%s] does not exist, reason: [%s]"
,
subscription_path
,
str
(
e
),
)
return
False
src/pit/py_stream/event_notification.py
0 → 100644
View file @
7e4f0b23
import
abc
import
json
import
logging
from
dataclasses
import
dataclass
from
pit.py_stream.settings
import
EventConfig
from
google.cloud
import
pubsub_v1
logger
=
logging
.
getLogger
(
__name__
)
class
EventNotification
(
metaclass
=
abc
.
ABCMeta
):
@
abc
.
abstractmethod
def
notify
(
self
,
binding
:
str
,
message
:
dict
|
str
,
headers
:
dict
=
None
):
raise
NotImplementedError
@
dataclass
(
repr
=
False
,
eq
=
False
)
class
EventPubSubNotification
(
EventNotification
):
client
:
pubsub_v1
.
PublisherClient
project_name
:
str
event_config
:
EventConfig
def
notify
(
self
,
binding
:
str
,
message
:
dict
|
str
,
headers
:
dict
=
None
):
logger
.
debug
(
"Notify [%s] => [%s]"
,
message
,
binding
)
topic
=
self
.
event_config
.
bindings
.
get
(
binding
)
topic_path
=
self
.
client
.
topic_path
(
self
.
project_name
,
topic
)
message_str
=
(
message
if
isinstance
(
message
,
str
)
else
json
.
dumps
(
message
,
default
=
str
)
)
data
=
message_str
.
encode
(
"utf-8"
)
if
headers
is
not
None
:
self
.
client
.
publish
(
topic_path
,
data
,
**
headers
)
else
:
self
.
client
.
publish
(
topic_path
,
data
)
src/pit/py_stream/settings.py
0 → 100644
View file @
7e4f0b23
from
typing
import
Literal
from
pydantic
import
BaseModel
,
Field
class
EventPubSubConfig
(
BaseModel
):
project_name
:
str
=
Field
(
default
=
"proj"
,
description
=
"GCP Project Name"
)
class
EventConfig
(
BaseModel
):
type
:
Literal
[
"pubsub"
]
=
Field
(
description
=
"Messaging provider"
,
default
=
"pubsub"
)
pubsub
:
EventPubSubConfig
|
None
=
None
bindings
:
dict
[
str
,
str
]
=
Field
(
description
=
"Map(str,str), key is the binding name used in code to be an alias of a topic or subscription,"
"this is used to get the name of the topic or subscription contained in the value"
,
default
=
{},
)
tests/pit/py_stream/__init__.py
0 → 100644
View file @
7e4f0b23
tests/pit/py_stream/test_event_handler.py
0 → 100644
View file @
7e4f0b23
from
unittest.mock
import
Mock
from
google.cloud
import
pubsub_v1
from
pytest
import
fixture
from
pit.py_stream.event_handler
import
EventRouter
,
PubSubEventHandler
from
pit.py_stream.settings
import
EventConfig
@
fixture
def
router
()
->
EventRouter
:
router
=
EventRouter
()
@
router
.
event
(
"tes_event"
)
def
test_event
(
event
):
pass
return
router
@
fixture
def
pubsub_event_config
()
->
EventConfig
:
return
EventConfig
(
bindings
=
{
"test_event"
:
"test_event"
})
@
fixture
def
pub_sub_event_handler
(
router
,
pubsub_event_config
)
->
PubSubEventHandler
:
return
PubSubEventHandler
(
subscriber
=
Mock
(
spec_set
=
pubsub_v1
.
SubscriberClient
),
publisher
=
Mock
(
spec_set
=
pubsub_v1
.
PublisherClient
),
project_name
=
"test_project"
,
event_config
=
pubsub_event_config
,
)
def
test_add_router
(
pub_sub_event_handler
,
router
):
# Act
pub_sub_event_handler
.
add_router
(
router
)
# Assert
assert
router
in
pub_sub_event_handler
.
routers
def
test_start_listening
(
pub_sub_event_handler
,
router
):
# Arrange
pub_sub_event_handler
.
add_router
(
router
)
# Act
pub_sub_event_handler
.
start_listening
()
# Assert
pub_sub_event_handler
.
subscriber
.
subscribe
.
assert_called_once
()
pub_sub_event_handler
.
subscriber
.
subscription_path
.
assert_called_once
()
pub_sub_event_handler
.
subscriber
.
get_subscription
.
assert_called_once
()
def
test_start_listening_subscription_not_exist
(
pub_sub_event_handler
,
router
):
# Arrange
pub_sub_event_handler
.
add_router
(
router
)
pub_sub_event_handler
.
subscriber
.
get_subscription
.
side_effect
=
Exception
# Act
pub_sub_event_handler
.
start_listening
()
# Assert
pub_sub_event_handler
.
subscriber
.
subscribe
.
assert_not_called
()
tests/pit/py_stream/test_event_notification.py
0 → 100644
View file @
7e4f0b23
import
json
from
unittest.mock
import
MagicMock
import
pytest
from
pit.py_stream.event_notification
import
EventPubSubNotification
from
pit.py_stream.settings
import
EventConfig
@
pytest
.
fixture
def
pubsub_client
():
return
MagicMock
()
@
pytest
.
fixture
def
event_config
():
return
EventConfig
(
bindings
=
{
"my-topic"
:
"job-created"
})
@
pytest
.
fixture
def
event_notification
(
pubsub_client
,
event_config
):
project_name
=
"my-project"
return
EventPubSubNotification
(
pubsub_client
,
project_name
,
event_config
)
def
test_notify_with_string_message
(
pubsub_client
,
event_notification
):
# Arrange
topic
=
"my-topic"
message
=
"Hello, world!"
headers
=
{
"header1"
:
"value1"
,
"header2"
:
"value2"
}
# Act
event_notification
.
notify
(
topic
,
message
,
headers
)
# Assert
topic_path
=
pubsub_client
.
topic_path
.
return_value
pubsub_client
.
publish
.
assert_called_once_with
(
topic_path
,
message
.
encode
(
"utf-8"
),
**
headers
)
def
test_notify_with_string_message_no_headers
(
pubsub_client
,
event_notification
):
# Arrange
topic
=
"my-topic"
message
=
"Hello, world!"
# Act
event_notification
.
notify
(
topic
,
message
)
# Assert
topic_path
=
pubsub_client
.
topic_path
.
return_value
pubsub_client
.
publish
.
assert_called_once_with
(
topic_path
,
message
.
encode
(
"utf-8"
)
)
def
test_notify_with_dict_message
(
pubsub_client
,
event_notification
):
# Arrange
topic
=
"my-topic"
message
=
{
"key"
:
"value"
}
headers
=
{
"header1"
:
"value1"
,
"header2"
:
"value2"
}
# Act
event_notification
.
notify
(
topic
,
message
,
headers
)
# Assert
topic_path
=
pubsub_client
.
topic_path
.
return_value
message_str
=
json
.
dumps
(
message
,
default
=
str
)
pubsub_client
.
publish
.
assert_called_once_with
(
topic_path
,
message_str
.
encode
(
"utf-8"
),
**
headers
)
def
test_topic_not_exists__rise_error
(
pubsub_client
,
event_notification
:
EventPubSubNotification
):
# Arrange
topic
=
"my-topic"
message
=
"Hello, world!"
headers
=
{
"header1"
:
"value1"
,
"header2"
:
"value2"
}
pubsub_client
.
publish
.
side_effect
=
RuntimeError
(
"Topic not found"
)
with
pytest
.
raises
(
RuntimeError
,
match
=
"Topic not found"
):
event_notification
.
notify
(
topic
,
message
,
headers
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment