Source code for being.pubsub
"""Publish / subscribe."""
from typing import Iterable, Callable
[docs]class PubSub:
"""Publish / subscribe."""
def __init__(self, events: Iterable):
"""
Args:
events: Supported events.
"""
# Todo: Final args? defaultdict?
self.subscribers = {evt: set() for evt in events}
[docs] def subscribe(self, event, callback: Callable):
"""Subscribe callback to event."""
self.subscribers[event].add(callback)
[docs] def unsubscribe(self, event, callback: Callable):
"""Unsubscribe callback from event."""
self.subscribers[event].remove(callback)
[docs] def publish(self, event, *args, **kwargs):
"""Publish event."""
for func in self.subscribers[event]:
func(*args, **kwargs)
def __str__(self):
return '%s(events: %s)' % (type(self).__name__, list(self.subscribers))