Commit c534e7be authored by Thomas Mignot's avatar Thomas Mignot

Big improve in publications/subscriptions

parent cbd38350
File deleted
......@@ -38,6 +38,7 @@ INSTALLED_APPS = [
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.postgres',
'django.contrib.staticfiles',
'todos',
]
......@@ -85,8 +86,9 @@ CHANNEL_LAYERS = {
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'django_ddp_test_project',
'USER': 'tmignot'
}
}
......
default_app_config = 'ryzom.apps.RyzomConfig'
......@@ -3,3 +3,9 @@ from django.apps import AppConfig
class RyzomConfig(AppConfig):
name = 'ryzom'
def ready(self):
import ryzom.signals
from ryzom.pubsub import to_publish
for publication in to_publish:
publication.publish_all()
......@@ -12,6 +12,7 @@ class Component():
_id=None):
self._id = _id or uuid.uuid1().hex
self.parent = parent
self.position = 0
self.tag = 'HTML' if parent is None else tag
self.attr = {} if attr is None else attr
self.events = {} if events is None else events
......@@ -22,12 +23,14 @@ class Component():
def preparecontent(self):
# handle text node as content
if isinstance(self.content, list):
for c in self.content:
for i, c in enumerate(self.content):
c.parent = self._id
c.position = i
elif isinstance(self.content, str) and self.tag is not 'text':
self.content = [Text(self.content)]
def addchild(self, component):
component.position = len(self.content)
component.parent = self._id
self.content.append(component)
......@@ -50,6 +53,7 @@ class Component():
for c in self.content
] if self.tag != 'text' else self.content,
'parent': self.parent,
'position': self.position,
'events': self.events,
'attr': self.attr,
'subscriptions': getattr(self, 'subscriptions', [])
......
......@@ -7,7 +7,7 @@ from django.contrib.auth.models import User
from asgiref.sync import async_to_sync
from django.conf import settings
from ryzom.models import Clients, Subscriptions
from ryzom.models import Clients, Subscriptions, Publications
ddp_urlpatterns = importlib.import_module(settings.DDP_URLPATTERNS).urlpatterns
server_methods = importlib.import_module(settings.SERVER_METHODS).Methods
......@@ -109,7 +109,6 @@ class Consumer(JsonWebsocketConsumer, object):
'message': f'Method {params["name"]} not found'
}
})
self.send(json.dumps(to_send))
else:
ret = method(params['params'])
if ret:
......@@ -122,6 +121,7 @@ class Consumer(JsonWebsocketConsumer, object):
'type': 'Error',
'params': ret
})
self.send(json.dumps(to_send))
def insert_component(self, data, change=False):
self.send(json.dumps({
......@@ -156,7 +156,7 @@ class Consumer(JsonWebsocketConsumer, object):
params = data['params']
to_send = {'_id': data['_id']}
client = Clients.objects.get(channel=self.channel_name)
for key in ['name', '_id', 'template']:
for key in ['name', '_id']:
if key not in params:
to_send.update({
'type': 'Error',
......@@ -176,12 +176,13 @@ class Consumer(JsonWebsocketConsumer, object):
}
})
else:
sub = Subscriptions.objects.create(
name=params['name'],
pub = Publications.objects.get(name=params['name'])
sub = Subscriptions(
publication=pub,
parent=params['_id'],
template_module=params['template'][0],
template_class=params['template'][1],
client=client)
sub.init()
sub.save()
to_send.update({
'type': 'Success',
'params': {
......
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
def send_insert(sub, model, tmpl, _id):
tmpl_instance = tmpl(model.objects.get(id=_id))
tmpl_instance.parent = sub.parent
tmpl_instance.position = sub.queryset.index(_id)
data = {
'type': 'handle.ddp',
'params': {
'type': 'inserted',
'instance': tmpl_instance.to_obj()
}
}
channel = get_channel_layer()
async_to_sync(channel.send)(sub.client.channel, data)
def send_change(sub, model, tmpl, _id):
tmpl_instance = tmpl(model.objects.get(id=_id))
tmpl_instance.parent = sub.parent
tmpl_instance.position = sub.queryset.index(_id)
data = {
'type': 'handle.ddp',
'params': {
'type': 'changed',
'instance': tmpl_instance.to_obj()
}
}
channel = get_channel_layer()
async_to_sync(channel.send)(sub.client.channel, data)
def send_remove(sub, model, tmpl, _id):
tmp = model()
tmp.id = _id
tmpl_instance = tmpl(tmp)
data = {
'type': 'handle.ddp',
'params': {
'type': 'removed',
'_id': tmpl_instance._id,
'parent': sub.parent
}
}
channel = get_channel_layer()
async_to_sync(channel.send)(sub.client.channel, data)
# Generated by Django 2.1.7 on 2019-02-28 14:37
# Generated by Django 2.1.7 on 2019-03-07 15:42
from django.conf import settings
import django.contrib.postgres.fields
import django.contrib.postgres.fields.jsonb
from django.db import migrations, models
import django.db.models.deletion
......@@ -23,14 +25,25 @@ class Migration(migrations.Migration):
],
),
migrations.CreateModel(
name='Subscriptions',
name='Publications',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=255)),
('parent', models.CharField(max_length=255)),
('model_module', models.CharField(max_length=255)),
('model_class', models.CharField(max_length=255)),
('template_module', models.CharField(max_length=255)),
('template_class', models.CharField(max_length=255)),
('query', django.contrib.postgres.fields.jsonb.JSONField(blank=True, null=True)),
],
),
migrations.CreateModel(
name='Subscriptions',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('parent', models.CharField(max_length=255)),
('queryset', django.contrib.postgres.fields.ArrayField(base_field=models.IntegerField(), default=list, size=None)),
('client', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='ryzom.Clients')),
('publication', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='ryzom.Publications')),
],
),
]
......@@ -2,10 +2,9 @@ import importlib
from django.db import models
from django.contrib.auth.models import User
from django.db.models.signals import post_delete, post_save
from django.dispatch import receiver
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.contrib.postgres.fields import ArrayField, JSONField
from django.contrib.postgres.aggregates import ArrayAgg
from ryzom.ddp import send_insert
class Clients(models.Model):
......@@ -18,54 +17,45 @@ class Clients(models.Model):
)
class Subscriptions(models.Model):
name = models.CharField(max_length=255)
parent = models.CharField(max_length=255)
class Publications(models.Model):
name = models.CharField(max_length=255, unique=True)
model_module = models.CharField(max_length=255)
model_class = models.CharField(max_length=255)
template_module = models.CharField(max_length=255)
template_class = models.CharField(max_length=255)
client = models.ForeignKey(Clients, models.CASCADE)
query = JSONField(blank=True, null=True)
@receiver(post_save)
def ddp_insert_change(sender, **kwargs):
created = kwargs.pop('created')
instance = kwargs.pop('instance')
data = {
'type': 'handle.ddp',
'params': {
'type': 'inserted' if created else 'changed',
}
}
subs = Subscriptions.objects.filter(name=sender.__name__)
for sub in subs:
mfile, mpath = sub.template_module[::-1].split('.', 1)
tmpl_module = importlib.import_module(f'.{mfile[::-1]}', mpath[::-1])
tmpl_class = getattr(tmpl_module, sub.template_class)
tmpl_instance = tmpl_class(instance)
tmpl_instance.parent = sub.parent
data['params']['instance'] = tmpl_instance.to_obj()
client = sub.client
channel = get_channel_layer()
async_to_sync(channel.send)(client.channel, data)
@receiver(post_delete)
def ddp_delete(sender, **kwargs):
instance = kwargs.pop('instance')
data = {
'type': 'handle.ddp',
'params': {
'type': 'removed',
}
}
subs = Subscriptions.objects.filter(name=sender.__name__)
for sub in subs:
mfile, mpath = sub.template_module[::-1].split('.', 1)
tmpl_module = importlib.import_module(f'.{mfile[::-1]}', mpath[::-1])
tmpl_class = getattr(tmpl_module, sub.template_class)
tmpl_instance = tmpl_class(instance)
data['params']['parent'] = sub.parent
data['params']['_id'] = tmpl_instance._id
client = sub.client
channel = get_channel_layer()
async_to_sync(channel.send)(client.channel, data)
class Subscriptions(models.Model):
parent = models.CharField(max_length=255)
client = models.ForeignKey(Clients, models.CASCADE)
publication = models.ForeignKey(Publications, models.CASCADE)
queryset = ArrayField(models.IntegerField(), default=list)
def init(self):
pub = self.publication
model_mod = importlib.import_module(pub.model_module)
model_cls = getattr(model_mod, pub.model_class)
tmpl_mod = importlib.import_module(pub.template_module)
tmpl_cls = getattr(tmpl_mod, pub.template_class)
qs = self.exec_query(model_cls).aggregate(ids=ArrayAgg('id'))
self.queryset = qs['ids']
for _id in self.queryset:
send_insert(self, model_cls, tmpl_cls, _id)
def exec_query(self, model):
'''
limit, orderby, offset, fields(values), filter
'''
pub = self.publication
qs = model.objects.filter(**pub.query.get('filter', {}))
if 'order_by' in pub.query:
qs = qs.order_by(*pub.query.order_by)
if 'limit' in pub.query or 'offset' in pub.query:
limit = pub.query.get('limit')
offset = pub.query.get('offset', 0)
if limit:
qs = qs[offset:offset + limit]
else:
qs = qs[offset:]
return qs
from ryzom.models import Publications
to_publish = []
class Subscription():
def __init__(self, name, params):
pub = Publications.objects.get(name=name)
if pub:
self.name = name
self.params = params
self.publication = pub
class Publishable():
_published = False
_prepubs = {}
@classmethod
def publish(cls, name, template=None, query={}):
if not cls._published:
cls._prepubs[name] = (template, query)
if cls not in to_publish:
to_publish.append(cls)
else:
cls.do_publish(name, template, query)
@classmethod
def do_publish(cls, name, template, query):
tmpl_cls, tmpl_mod = template[::-1].split('.', 1)
tmpl_mod = tmpl_mod[::-1]
tmpl_cls = tmpl_cls[::-1]
kwargs = {
'name': name,
'model_module': cls.__module__,
'model_class': cls.__name__,
'template_module': tmpl_mod,
'template_class': tmpl_cls,
'query': query
}
Publications.objects.update_or_create(**kwargs)
@classmethod
def publish_all(cls):
cls._published = True
for k, v in cls._prepubs.items():
name, template, query = k, *v
cls.do_publish(name, template, query)
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from ryzom.components import Component
......
import importlib
from django.contrib.postgres.aggregates import ArrayAgg
from django.db.models.signals import post_delete, post_save
from django.dispatch import receiver
from ryzom.ddp import send_insert, send_change, send_remove
from ryzom.models import Publications, Subscriptions
from ryzom.pubsub import Publishable
@receiver(post_save)
def _ddp_insert_change(sender, **kwargs):
if Publishable not in sender.mro():
return
created = kwargs.pop('created')
instance = kwargs.pop('instance')
pubs = Publications.objects.filter(
model_class=sender.__name__,
model_module=sender.__module__)
for pub in pubs:
tmpl_module = importlib.import_module(pub.template_module)
tmpl_class = getattr(tmpl_module, pub.template_class)
subscriptions = Subscriptions.objects.filter(publication=pub)
for sub in subscriptions:
old_qs = sub.queryset
new_qs = sub.exec_query(sender).aggregate(ids=ArrayAgg('id'))
new_qs = new_qs['ids']
# update the queryset
sub.queryset = new_qs
sub.save()
diff = {
'inserted': set(new_qs).difference(set(old_qs)),
'removed': set(old_qs).difference(set(new_qs))
}
# if sets are the same
if not diff['inserted'] and not diff['removed']:
# if created and sets are the same,
# entry has been filtered and can't be there
if not created:
# changed and may have moved
# just send new instance and pos
send_change(sub, sender, tmpl_class, instance.id)
# if sets aren't the same, then considering that only one entry
# was added or has changed:
# - it could have been removed if newly filtered (changed)
# - it could have been added if no more filtered (changed)
# - it could have been added if created and not filtered
# - it cannot have just moved neither changed or the set
# would have been the same
# - it could have been added while not created, replacing
# another entry because of filters, so created or not,
# we have to handle both added and removed entries
else:
# using loops for now but shouldn't be usefull as we
# are handling only one entry, the queryset shouldn't
# move by more that one in and/or one out
for _id in diff['removed']:
send_remove(sub, sender, tmpl_class, _id)
for _id in diff['inserted']:
send_insert(sub, sender, tmpl_class, _id)
@receiver(post_delete)
def _ddp_delete(sender, **kwargs):
if Publishable not in sender.mro():
return
instance = kwargs.pop('instance')
pubs = Publications.objects.filter(
model_module=sender.__module__,
model_class=sender.__name__)
for pub in pubs:
tmpl_module = importlib.import_module(pub.template_module)
tmpl_class = getattr(tmpl_module, pub.template_class)
subscriptions = Subscriptions.objects.filter(publication=pub)
for sub in subscriptions:
old_qs = sub.queryset
# if instance not in queryset, no need to remove it
# or update the queryset
if instance.id in old_qs:
new_qs = sub.exec_query(sender).aggregate(ids=ArrayAgg('id'))
new_qs = new_qs['ids']
sub.queryset = new_qs
sub.save()
diff = {
'inserted': set(new_qs).difference(set(old_qs)),
'removed': set(old_qs).difference(set(new_qs))
}
for _id in diff['removed']:
send_remove(sub, sender, tmpl_class, _id)
for _id in diff['inserted']:
send_insert(sub, sender, tmpl_class, _id)
......@@ -54,8 +54,7 @@
ws_send({
type: 'subscribe',
params: {
name: sub[0],
template: sub[1],
name: sub,
_id: component._id
}
}, function(r, e) { if (e) { console.log(e); }});
......
from ryzom.components import Div, Ul, Li, Span, Input, Button
from todos.models import Tasks
class Task(Li):
......@@ -16,14 +15,8 @@ class Task(Li):
class Tasklist(Ul):
def __init__(self):
# Array of ('ModelName', ('template.module.file', 'TemplateClass'))
self.subscriptions = [('Tasks', ('todos.components.tasks', 'Task'))]
content = [
Task(t)
for t in Tasks.objects.filter()
]
super().__init__(content, {'class': 'list-group'}, _id='tasklist')
self.subscriptions = ['tasks']
super().__init__(attr={'class': 'list-group'}, _id='tasklist')
class Taskform(Div):
......
# Generated by Django 2.1.7 on 2019-02-28 14:37
# Generated by Django 2.1.7 on 2019-03-07 15:31
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import ryzom.pubsub
class Migration(migrations.Migration):
......@@ -21,5 +22,6 @@ class Migration(migrations.Migration):
('about', models.CharField(max_length=1024)),
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL)),
],
bases=(models.Model, ryzom.pubsub.Publishable),
),
]
from django.db import models
from django.contrib.auth.models import User
from ryzom.pubsub import Publishable
class Tasks(models.Model):
class Tasks(models.Model, Publishable):
user = models.ForeignKey(User, models.SET_NULL, blank=True, null=True)
about = models.CharField(max_length=1024)
Tasks.publish(
name='tasks',
template='todos.components.tasks.Task',
query={'limit': 5}
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment