events.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. # -*- coding: utf-8 -*-
  2. """
  3. events plugin for Pelican
  4. =========================
  5. This plugin looks for and parses an "events" directory and generates
  6. blog posts with a user-defined event date. (typically in the future)
  7. It also generates an ICalendar v2.0 calendar file.
  8. https://en.wikipedia.org/wiki/ICalendar
  9. Author: Federico Ceratto <federico.ceratto@gmail.com>
  10. Released under AGPLv3+ license, see LICENSE
  11. """
  12. from datetime import datetime, timedelta
  13. from pelican import signals, utils
  14. from collections import namedtuple, defaultdict
  15. import icalendar
  16. import logging
  17. import os.path
  18. import pytz
  19. log = logging.getLogger(__name__)
  20. TIME_MULTIPLIERS = {
  21. 'w': 'weeks',
  22. 'd': 'days',
  23. 'h': 'hours',
  24. 'm': 'minutes',
  25. 's': 'seconds'
  26. }
  27. events = []
  28. localized_events = defaultdict(list)
  29. Event = namedtuple("Event", "dtstart dtend metadata")
  30. def parse_tstamp(ev, field_name):
  31. """Parse a timestamp string in format "YYYY-MM-DD HH:MM"
  32. :returns: datetime
  33. """
  34. try:
  35. return datetime.strptime(ev[field_name], '%Y-%m-%d %H:%M')
  36. except Exception as e:
  37. log.error("Unable to parse the '%s' field in the event named '%s': %s" \
  38. % (field_name, ev['title'], e))
  39. raise
  40. def parse_timedelta(ev):
  41. """Parse a timedelta string in format [<num><multiplier> ]*
  42. e.g. 2h 30m
  43. :returns: timedelta
  44. """
  45. chunks = ev['event-duration'].split()
  46. tdargs = {}
  47. for c in chunks:
  48. try:
  49. m = TIME_MULTIPLIERS[c[-1]]
  50. val = int(c[:-1])
  51. tdargs[m] = val
  52. except KeyError:
  53. log.error("""Unknown time multiplier '%s' value in the \
  54. 'event-duration' field in the '%s' event. Supported multipliers \
  55. are: '%s'.""" % (c, ev['title'], ' '.join(TIME_MULTIPLIERS)))
  56. raise RuntimeError("Unknown time multiplier '%s'" % c)
  57. except ValueError:
  58. log.error("""Unable to parse '%s' value in the 'event-duration' \
  59. field in the '%s' event.""" % (c, ev['title']))
  60. raise ValueError("Unable to parse '%s'" % c)
  61. return timedelta(**tdargs)
  62. def parse_article(generator, metadata):
  63. """Collect articles metadata to be used for building the event calendar
  64. :returns: None
  65. """
  66. if 'event-start' not in metadata:
  67. return
  68. dtstart = parse_tstamp(metadata, 'event-start')
  69. if 'event-end' in metadata:
  70. dtend = parse_tstamp(metadata, 'event-end')
  71. elif 'event-duration' in metadata:
  72. dtdelta = parse_timedelta(metadata)
  73. dtend = dtstart + dtdelta
  74. else:
  75. msg = "Either 'event-end' or 'event-duration' must be" + \
  76. " speciefied in the event named '%s'" % metadata['title']
  77. log.error(msg)
  78. raise ValueError(msg)
  79. events.append(Event(dtstart, dtend, metadata))
  80. def generate_ical_file(generator):
  81. """Generate an iCalendar file
  82. """
  83. global events
  84. ics_fname = generator.settings['PLUGIN_EVENTS']['ics_fname']
  85. if not ics_fname:
  86. return
  87. ics_fname = os.path.join(generator.settings['OUTPUT_PATH'], ics_fname)
  88. log.debug("Generating calendar at %s with %d events" % (ics_fname, len(events)))
  89. tz = generator.settings.get('TIMEZONE', 'UTC')
  90. tz = pytz.timezone(tz)
  91. ical = icalendar.Calendar()
  92. ical.add('prodid', '-//My calendar product//mxm.dk//')
  93. ical.add('version', '2.0')
  94. DEFAULT_LANG = generator.settings['DEFAULT_LANG']
  95. curr_events = events if not localized_events else localized_events[DEFAULT_LANG]
  96. for e in curr_events:
  97. ie = icalendar.Event(
  98. summary=e.metadata['summary'],
  99. dtstart=e.dtstart,
  100. dtend=e.dtend,
  101. dtstamp=e.metadata['date'],
  102. priority=5,
  103. uid=e.metadata['title'] + e.metadata['summary'],
  104. )
  105. if 'event-location' in e.metadata:
  106. ie.add('location', e.metadata['event-location'])
  107. ical.add_component(ie)
  108. with open(ics_fname, 'wb') as f:
  109. f.write(ical.to_ical())
  110. def generate_localized_events(generator):
  111. """ Generates localized events dict if i18n_subsites plugin is active """
  112. if "i18n_subsites" in generator.settings["PLUGINS"]:
  113. if not os.path.exists(generator.settings['OUTPUT_PATH']):
  114. os.makedirs(generator.settings['OUTPUT_PATH'])
  115. for e in events:
  116. if "lang" in e.metadata:
  117. localized_events[e.metadata["lang"]].append(e)
  118. else:
  119. log.debug("event %s contains no lang attribute" % (e.metadata["title"],))
  120. def generate_events_list(generator):
  121. """Populate the event_list variable to be used in jinja templates"""
  122. if not localized_events:
  123. generator.context['events_list'] = sorted(events, reverse = True,
  124. key=lambda ev: (ev.dtstart, ev.dtend))
  125. else:
  126. generator.context['events_list'] = {k: sorted(v, reverse = True,
  127. key=lambda ev: (ev.dtstart, ev.dtend))
  128. for k, v in localized_events.items()}
  129. def initialize_events(article_generator):
  130. """
  131. Clears the events list before generating articles to properly support plugins with
  132. multiple generation passes like i18n_subsites
  133. """
  134. del events[:]
  135. localized_events.clear()
  136. def register():
  137. signals.article_generator_init.connect(initialize_events)
  138. signals.article_generator_context.connect(parse_article)
  139. signals.article_generator_finalized.connect(generate_localized_events)
  140. signals.article_generator_finalized.connect(generate_ical_file)
  141. signals.article_generator_finalized.connect(generate_events_list)