Filtered by Python

Page 10

Reset

A decent Elasticsearch search engine implementation

April 9, 2017
0 comments Python, Web development, Django

The title is a bit of an understatement because I think it's pretty good. It's not perfect and it's not guaranteed to scale, but it works pretty well. Especially on search term typos.

This, my own blog, now has a search engine built with Elasticsearch using the Python library elasticsearch-dsl. The algorithm (if you can call it that) is my own afternoon hack invention. Before I explain how it works try out a couple of searches:

Try a couple of searches:

(each search appends &debug-search for extended output)

  • corn - finds Cornwall, cron, Crontabber, crontab, corp etc.
  • crown - finds crown, Crowne, crowded, crowds, crowd etc.
  • react - finds create-react-app, React app, etc.
  • jugg - finds Jung, juggling, judging, judged etc.
  • pythn - finds Python, python2.4, python2.5 etc.

Also, by default it uses Elasticsearch's match_phrase so when you search for a multi-word thing, it requires a match on each term. E.g. date format which finds Date formatting, date formats etc.

But if you search for something where the whole phrase can't match, it splits up the search an uses a match operator instead (minus any stop words).

Typo-focussed

This solution is very much focussed on typos. One thing I really dislike in non-Google search engines is when you make a search and nothing is found and it says "Did you mean ...?". Quite likely I did, but why do I have to click it? Can't it just be clicked for me?

Also, if there's ambiguity and possibly some results based on what you typed and multiple potential "Did you mean...?". Why not just blend them alltogether like Google does? Here is my attempt to solve that. Come with me...

Figuring Out ALL Search Terms

So if you type "Firefix" (not "Firefox", also scroll to the bottom to see the debug table) then maybe, that's an actual word that might be in the database. Then by using the Elasticsearch's Suggesters it figures out alternative spellings based on frequency distributions within the indexed content. This lookup is actually really fast. So now it figures out three alternative ways to spell this term:

  • firefox (score 0.9, 1 character different)
  • firefli (score 0.7, 2 characters different)
  • firfox (score 0.7, 2 characters different)

And, very arbitrarily I pick a score for the default term that the user typed in. Let's pick 1.1. Doesn't matter gravely and it's up for future tuning. The initial goal is to not bury this spelling alternative too far back.

Here's how to run the suggester for every defined doc type and generate a list of other search terms tuples (minimum score >=0.6).


search_terms = [(1.1, q)]
_search_terms = set([q])
doc_type_keys = (
    (BlogItemDoc, ('title', 'text')),
    (BlogCommentDoc, ('comment',)),
)
for doc_type, keys in doc_type_keys:
    suggester = doc_type.search()
    for key in keys:
        suggester = suggester.suggest('sugg', q, term={'field': key})
    suggestions = suggester.execute_suggest()
    for each in suggestions.sugg:
        if each.options:
            for option in each.options:
                if option.score >= 0.6:
                    better = q.replace(each['text'], option['text'])
                    if better not in _search_terms:
                        search_terms.append((
                            option['score'],
                            better,
                        ))
                        _search_terms.add(better)

Eventually we get a list (once sorted) that looks like this:


search_terms = [(1.1 'firefix'), (0.9, 'firefox'), (0.7, 'firefli'), (0.7, 'firfox')]

The only reason the code sorts this by the score is in case there are crazy-many search terms. Then we might want to chop off some and only use the 5 highest scoring spelling alternatives.

Building The Boosted OR-query

In this scenario, we're searching amongst blog posts. The title is likely to be a better match than the body. If the title mentions it we probably want to favor that over those where it's only mentioned in the body.

So to build up the OR-query we'll boost the title more than the body ("text" in this example) and we'll build it up using all possible search terms and boost them based on their score. Here's the complete query.


strategy = 'match_phrase'
if original_q:
    strategy = 'match'
search_term_boosts = {}
for i, (score, word) in enumerate(search_terms):
    # meaning the first search_term should be boosted most
    j = len(search_terms) - i
    boost = 1 * j * score
    boost_title = 2 * boost
    search_term_boosts[word] = (boost_title, boost)
    match = Q(strategy, title={
        'query': word,
        'boost': boost_title,
    }) | Q(strategy, text={
        'query': word,
        'boost': boost,
    })
    if matcher is None:
        matcher = match
    else:
        matcher |= match

search_query = search_query.query(matcher)

The core is that it does Q('match_phrase' title='firefix', boost=2X) | Q('match_phrase', text='firefix', boost=X).

Here's another arbitrary number. The number 2. It means that the "title" is 2 times more important than the "text".

And that's it! Now every match is scored based on how suggester's score and whether it be matched on the "title" or the "text" (or both). Elasticsearch takes care of everything else. The default is to sort by the _score as ultimately dictated by Lucene.

Match Phrase or Match

In this implementation it tries to match using a match phrase query which basically tries to find matches where every word in the query matches.

The cheap solution here is to basically keep whole search function as is, but if absolutely nothing is found with a match_phrase, and there were multiple words, then just recurse over one more time and do it with a match query instead.

This could probably be improved and do the match_phrase first with higher boost and do the match too but with a lower boost. All in one big query.

Want A Copy?

Note, this copy is quite a mess! It's a personal side-project which is an excuse for experimentation and goofing around.

The full search function is here.

Please don't judge me for the scrappiness of the code but please share your thoughts on this being a decent application of Elasticsearch for smallish datasets like a blog.

Fastest cache backend possible for Django

April 7, 2017
11 comments Python, Linux, Web development

tl;dr; Redis is twice as fast as memcached as a Django cache backend when installed using AWS ElastiCache. Only tested for reads.

Django has a wonderful caching framework. I think I say "wonderful" because it's so simple. Not because it has a hundred different bells or whistles. Each cache gets a name (e.g. "mymemcache" or "redis append only"). The only configuration you generally have to worry about is 1) what backed and 2) what location.

For example, to set up a memcached backend:


# this in settings.py
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
        'KEY_PREFIX': 'myapp',
        'LOCATION': config('MEMCACHED_LOCATION', '127.0.0.1:11211'),
    },
}

With that in play you can now do:


>>> from django.core.cache import caches
>>> caches['default'].set('key', 'value', 60)  # 60 seconds
>>> caches['default'].get('key')
'value'

Django comes without built-in backend called django.core.cache.backends.locmem.LocMemCache which is basically a simply Python object in memory with no persistency between Python processes. This one is of course super fast because it involves no further network (local or remote) beyond the process itself. But it's not really useful because if you care about performance (which you probably are if you're here because of the blog post title) because it can't be reused amongst processes.

Anyway, the most common backends to use are:

  • Memcached
  • Redis

These are semi-persistent and built for extremely fast key lookups. They can both be reached over TCP or via a socket.

What I wanted to see, is which one is fastest.

The Experiment

First of all, in this blog post I'm only measuring the read times of the various cache backends.

Here's the Django view function that is the experiment:


from django.conf import settings
from django.core.cache import caches

def run(request, cache_name):
    if cache_name == 'random':
        cache_name = random.choice(settings.CACHE_NAMES)
    cache = caches[cache_name]
    t0 = time.time()
    data = cache.get('benchmarking', [])
    t1 = time.time()
    if random.random() < settings.WRITE_CHANCE:
        data.append(t1 - t0)
        cache.set('benchmarking', data, 60)
    if data:
        avg = 1000 * sum(data) / len(data)
    else:
        avg = 'notyet'
    # print(cache_name, '#', len(data), 'avg:', avg, ' size:', len(str(data)))
    return http.HttpResponse('{}\n'.format(avg))

It records the time to make a cache.get read and depending settings.WRITE_CHANCE it also does a write (but doesn't record that).
What it records is a list of floats. The content of that piece of data stored in the cache looks something like this:

  1. [0.0007331371307373047]
  2. [0.0007331371307373047, 0.0002570152282714844]
  3. [0.0007331371307373047, 0.0002570152282714844, 0.0002200603485107422]

So the data grows from being really small to something really large. If you run this 1,000 times with settings.WRITE_CACHE of 1.0 the last time it has to fetch a list of 999 floats out of the cache backend.

You can either test it with 1 specific backend in mind and see how fast Django can do, say, 10,000 of these. Here's one such example:

$ wrk -t10 -c400 -d10s http://127.0.0.1:8000/default
Running 10s test @ http://127.0.0.1:8000/default
  10 threads and 400 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    76.28ms  155.26ms   1.41s    92.70%
    Req/Sec   349.92    193.36     1.51k    79.30%
  34107 requests in 10.10s, 2.56MB read
  Socket errors: connect 0, read 0, write 0, timeout 59
Requests/sec:   3378.26
Transfer/sec:    259.78KB

$ wrk -t10 -c400 -d10s http://127.0.0.1:8000/memcached
Running 10s test @ http://127.0.0.1:8000/memcached
  10 threads and 400 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    96.87ms  183.16ms   1.81s    95.10%
    Req/Sec   213.42     82.47     0.91k    76.08%
  21315 requests in 10.09s, 1.57MB read
  Socket errors: connect 0, read 0, write 0, timeout 32
Requests/sec:   2111.68
Transfer/sec:    159.27KB

$ wrk -t10 -c400 -d10s http://127.0.0.1:8000/redis
Running 10s test @ http://127.0.0.1:8000/redis
  10 threads and 400 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    84.93ms  148.62ms   1.66s    92.20%
    Req/Sec   262.96    138.72     1.10k    81.20%
  25271 requests in 10.09s, 1.87MB read
  Socket errors: connect 0, read 0, write 0, timeout 15
Requests/sec:   2503.55
Transfer/sec:    189.96KB

But an immediate disadvantage with this is that the "total final rate" (i.e. requests/sec) is likely to include so many other factors. However, you can see that LocMemcache got 3378.26 req/s, MemcachedCache got 2111.68 req/s and RedisCache got 2503.55 req/s.

The code for the experiment is available here: https://github.com/peterbe/django-fastest-cache

The Infra Setup

I created an AWS m3.xlarge EC2 Ubuntu node and two nodes in AWS ElastiCache. One 2-node memcached cluster based on cache.m3.xlarge and one 2-node 1-replica Redis cluster also based on cache.m3.xlarge.

The Django server was run with uWSGI like this:

uwsgi --http :8000 --wsgi-file fastestcache/wsgi.py  --master --processes 6 --threads 10

The Results

Instead of hitting one backend repeatedly and reporting the "requests per second" I hit the "random" endpoint for 30 seconds and let it randomly select a cache backend each time and once that's done, I'll read each cache and look at the final massive list of timings it took to make all the reads. I run it like this:

wrk -t10 -c400 -d30s http://127.0.0.1:8000/random && curl http://127.0.0.1:8000/summary
...wrk output redacted...

                         TIMES        AVERAGE         MEDIAN         STDDEV
memcached                 5738        7.523ms        4.828ms        8.195ms
default                   3362        0.305ms        0.187ms        1.204ms
redis                     4958        3.502ms        1.707ms        5.591ms

Best Averages (shorter better)
###############################################################################
█████████████████████████████████████████████████████████████  7.523  memcached
██                                                             0.305  default
████████████████████████████                                   3.502  redis

Things to note:

  • Redis is twice as fast as memcached.
  • Pure Python LocMemcache is 10 times faster than Redis.
  • The table reports average and median. The ASCII bar chart shows only the averages.
  • All three backends report huge standard deviations. The median is very different from the average.
  • The average is probably the more interesting number since it more reflects the ups and downs of reality.
  • If you compare the medians, Redis is 3 times faster than memcached.
  • It's luck that Redis got fewer datapoints than memcached (4958 vs 5738) but it's as expected that the LocMemcache backend only gets 3362 because the uWSGI server that is used is spread across multiple processes.

Other Things To Test

Perhaps pylibmc is faster than python-memcached.

TIMES        AVERAGE         MEDIAN         STDDEV
pylibmc                   2893        8.803ms        6.080ms        7.844ms
default                   3456        0.315ms        0.181ms        1.656ms
redis                     4754        3.697ms        1.786ms        5.784ms

Best Averages (shorter better)
###############################################################################
██████████████████████████████████████████████████████████████   8.803  pylibmc
██                                                               0.315  default
██████████████████████████                                       3.697  redis

Using pylibmc didn't make things much faster. What if we we pit memcached against pylibmc?:

TIMES        AVERAGE         MEDIAN         STDDEV
pylibmc                   3005        8.653ms        5.734ms        8.339ms
memcached                 2868        8.465ms        5.367ms        9.065ms

Best Averages (shorter better)
###############################################################################
█████████████████████████████████████████████████████████████  8.653  pylibmc
███████████████████████████████████████████████████████████    8.465  memcached

What about that fancy hiredis Redis Python driver that's supposedly faster?

TIMES        AVERAGE         MEDIAN         STDDEV
redis                     4074        5.628ms        2.262ms        8.300ms
hiredis                   4057        5.566ms        2.296ms        8.471ms

Best Averages (shorter better)
###############################################################################
███████████████████████████████████████████████████████████████  5.628  redis
██████████████████████████████████████████████████████████████   5.566  hiredis

These last two results are both surprising and suspicious. Perhaps the whole setup is wrong. Why wouldn't the C-based libraries be faster? Is it so incredibly dwarfed by the network I/O in the time between my EC2 node and the ElastiCache nodes?

In Conclusion

I personally like Redis. It's not as stable as memcached. On a personal server I've run for years the Redis server sometimes just dies due to corrupt memory and I've come to accept that. I don't think I've ever seen memcache do that.

But there are other benefits with Redis as a cache backend. With the django-redis library you have really easy access to the raw Redis connection and you can do much more advanced data structures. You can also cache certain things indefinitely. Redis also supports storing much larger strings than memcached (1MB for memcached and 512MB for Redis).

The conclusion is that Redis is faster than memcached by a factor of 2. Considering the other feature benefits you can get out of having a Redis server available, it's probably a good choice for your next Django project.

Bonus Feature

In big setups you most likely have a whole slur of web heads that are servers that do nothing but handle web requests. And these are configured to talk to databases and caches over the near network. However, many of us have cheap servers on DigitalOcean or Linode where we run web servers, relational databases and cache servers all on the same machine. (I do. This blog is one of those where there is Nginx, Redis, memcached and PostgreSQL on a 4GB DigitalOcean SSD Ubuntu).

So here's one last test where I installed a local Redis and a local memcached on the EC2 node itself:

$ cat .env | grep 127.0.0.1
MEMCACHED_LOCATION="127.0.0.1:11211"
REDIS_LOCATION="redis://127.0.0.1:6379/0"

Here are the results:

TIMES        AVERAGE         MEDIAN         STDDEV
memcached                 7366        3.456ms        1.380ms        5.678ms
default                   3716        0.263ms        0.189ms        1.002ms
redis                     5582        2.334ms        0.639ms        4.965ms

Best Averages (shorter better)
###############################################################################
█████████████████████████████████████████████████████████████  3.456  memcached
████                                                           0.263  default
█████████████████████████████████████████                      2.334  redis

The conclusion of that last benchmark is that Redis is still faster and it's roughly 1.8x faster to run these backends on the web head than to use ElastiCache. Perhaps that just goes to show how amazingly fast the AWS inter-datacenter fiber network is!

Fastest way to download a file from S3

March 29, 2017
5 comments Python

tl;dr; You can download files from S3 with requests.get() (whole or in stream) or use the boto3 library. Although slight differences in speed, the network I/O dictates more than the relative implementation of how you do it.

I'm working on an application that needs to download relatively large objects from S3. Some files are gzipped and size hovers around 1MB to 20MB (compressed).

So what's the fastest way to download them? In chunks, all in one go or with the boto3 library? I should warn, if the object we're downloading is not publically exposed I actually don't even know how to download other than using the boto3 library. In this experiment I'm only concerned with publicly available objects.

The Functions

f1()

The simplest first. Note that in a real application you would do something more with the r.content and not just return its size. And in fact you might want to get the text out instead since that's encoded.


def f1(url):
    r = requests.get(url)
    return len(r.content)

f2()

If you stream it you can minimize memory bloat in your application since you can re-use the chunks of memory if you're able to do something with the buffered content. In this case, the buffer is just piled on in memory, 512 bytes at a time.


def f2(url):
    r = requests.get(url, stream=True)
    buffer = io.BytesIO()
    for chunk in r.iter_content(chunk_size=512):
        if chunk:
            buffer.write(chunk)
    return len(buffer.getvalue())

I did put a counter into that for-loop to see how many times it writes and if you multiple that with 512 or 1024 respectively it does add up.

f3()

Same as f2() but with twice as large chunks/


def f3(url):  # same as f2 but bigger chunk size
    r = requests.get(url, stream=True)
    buffer = io.BytesIO()
    for chunk in r.iter_content(chunk_size=1024):
        if chunk:
            buffer.write(chunk)
    return len(buffer.getvalue())

f4()

I'm actually quite new to boto3 (the cool thing was to use boto before) and from some StackOverflow-surfing I found this solution to support downloading of gzipped or non-gzipped objects into a buffer:


def f4(url):
    _, bucket_name, key = urlparse(url).path.split('/', 2)
    obj = s3.Object(
        bucket_name=bucket_name,
        key=key
    )
    buffer = io.BytesIO(obj.get()["Body"].read())
    try:
        got_text = GzipFile(None, 'rb', fileobj=buffer).read()
    except OSError:
        buffer.seek(0)
        got_text = buffer.read()
    return len(got_text)

Note how it doesn't try to find out if the buffer is gzipped but instead relying on assuming it is plus a raised exception.
This feels clunky, around the "gunzipping", but it's probably quite representative of a final solution.

Complete experiment code here

The Results

At first I ran this on my laptop here on my decent home broadband whilst having lunch. The results were very similar to what I later found on EC2 but 7-10 times slower here. So let's focus on the results from within an EC2 node in us-west-1c.

The raw numbers are as follows (showing median values):

Function 18MB file Std Dev 1MB file Std Dev
f1 1.053s 0.492s 0.395s 0.104s
f2 1.742s 0.314s 0.398s 0.064s
f3 1.393s 0.727s 0.388s 0.08s
f4 1.135s 0.09s 0.264s 0.079s

I ran each function 20 times. It's interesting, but not totally surprising that the function that was fastest for the large file wasn't necessarily the fastest for the smaller file.

The winners are f1() and f4() both with one gold and one silver each. Makes sense because it's often faster to do big things, over the network, all at once.

Or, are there winners at all?

With a tiny margin, f1() and f4() are slightly faster but they are not as convenient because they're not streams. In f2() and f3() you have the ability to do something constructive with the stream. As a matter of fact, in my application I want to download the S3 object and parse it line by line so I can use response.iter_lines() which makes this super convenient.

But most importantly, I think we can conclude that it doesn't matter much how you do it. Network I/O is still king.

Lastly, that boto3 solution has the advantage that with credentials set right it can download objects from a private S3 bucket.

Bonus Thought!

This experiment was conducted on a m3.xlarge in us-west-1c. That 18MB file is a compressed file that, when unpacked, is 81MB. This little Python code basically managed to download 81MB in about 1 second. Yay!! The future is here and it's awesome.

Don't forget your sets in Python!

March 10, 2017
4 comments Python

I had this piece of code:


new_all_ids = set(
    x for x in all_ids if x not in to_process_ids
)

The all_ids is a list object of 1.1 million IDs. Some repeated. And to_process_ids is a list sample of 1,000 randomly selected IDs from that list but all unique. The objective of the code was to first extract 1,000 IDs, do something when them, then remove those 1,000 from the original list and once that's done, update all_ids back with those from to_process_ids removed.

Only problem was that I noticed that this operation took 30+ seconds! How can it take 30 seconds to do a little bit of list comprehension? The explanation is the lack of index lookups.

What the code actually does is this:


new_all_ids = set()
for x in all_ids:
    not_found = False
    for y in to_process_ids:
        if y != x:
            not_found = True
    if not_found:
         new_all_ids.add(x)

Can you see it? It's doing a nested loop. Or, O(n^2) in computer lingo. That's 1.1 million * 1,000 iterations of that conditional.

What sets do, on the other hand, is that they convert the list of keys in the set to a hash table. Kinda similar to how dict works except it doesn't point to a value. That means that asking a set if it contains a certain key is a O(1) operation.

You might have spotted the solution already.
If you instead do:


to_process_ids_set = set(to_process_ids)
new_all_ids = set(
    x for x in all_ids if x not in to_process_ids_set
)

Now, in my particular example, instead of taking 30+ seconds this implementation only takes 0.026 seconds. 1,000 times faster.

You might also have noticed that there's a much more convenient way to do this very same thing, namely set operations! The desired new_all_ids is going to become a set anyway. And if we can first convert it to a set, then do the operation on it we can avoid looping over repeats as we do the checking.

Final solution:


new_all_ids = set(all_ids) - set(to_process_ids)

You can try it yourself with this little benchmark:


"""
Demonstrate three ways how to reduce a non-unique list of integers
by 1,000 randomly selected unique ones.
Demonstrates the huge difference between lists and set lookups.
"""
import time
import random


# Range numbers chosen quite arbitrarily to get a benchmark that lasts
# a couple of seconds with a realistic proportion of unique and 
# repeated integers.
items = 200000
all_ids = [random.randint(1, items * 2) for _ in range(items)]

print('About', 100. * len(set(all_ids)) / len(all_ids), '% unique')

to_process_ids = random.sample(set(all_ids), 1000)


def f1(to_process_ids):
    return set(x for x in all_ids if x not in to_process_ids)


def f2(to_process_ids):
    to_process_ids_set = set(to_process_ids)
    return set(x for x in all_ids if x not in to_process_ids_set)


def f3(to_process_ids):
    return set(all_ids) - set(to_process_ids)


functions = [f1, f2, f3]
results = {f.__name__: [] for f in functions}
for i in range(10):
    random.shuffle(functions)
    for f in functions:
        t0 = time.time()
        f(to_process_ids)
        t1 = time.time()
        results[f.__name__].append(t1 - t0)

for function in sorted(results):
    print(function, sum(results[function])/ len(results[function]))

When I run that with my Python 3.5.1 I get:

About 78.616 % unique
f1 3.9494598865509034
f2 0.041156983375549315
f3 0.02245485782623291

Seems to match expectations.

crontabber now supports locking, both high- and low-level

March 4, 2017
0 comments Python, Mozilla, PostgreSQL

tl;dr; In other words, you can now have multiple servers with crontabber, all talking to a central PostgreSQL for its state, and not have to worry about jobs being started more than exactly once. This will be super useful if your crontabber apps are such that they kick of stored procedures that would freak out if run more than once with the same parameters.

crontabber is an advanced Python program to run cron-like applications in a predictable way. Every app is a Python class with a run method. Example here. Until version 0.18 you had to do locking outside but now the locking has been "internalized". Meaning, if you open two terminals and run python crontabber.py --admin.conf=myconfig.ini in both you don't have to worry about it starting the same apps in parallel.

General, business logic locking

Every app has a state. It's stored in PostgreSQL. It looks like this:

# \d crontabber
              Table "public.crontabber"
    Column    |           Type           | Modifiers
--------------+--------------------------+-----------
 app_name     | text                     | not null
 next_run     | timestamp with time zone |
 first_run    | timestamp with time zone |
 last_run     | timestamp with time zone |
 last_success | timestamp with time zone |
 error_count  | integer                  | default 0
 depends_on   | text[]                   |
 last_error   | json                     |
 ongoing      | timestamp with time zone |
Indexes:
    "crontabber_unique_app_name_idx" UNIQUE, btree (app_name)

The last column, ongoing used to be just for the "curiosity". For example, in Socorro we used that to display a flashing message about which jobs are ongoing right now.

As of version 0.18, this ongoing column is actually used to NOT run apps again. Basically, when started, crontabber figures out which app to run next (assuming it's time to run it) and now the first thing it does is look up if it's ongoing already, and if it is the whole crontabber application exits with an error code of 3.

Sub-second locking

What might happen is that two separate servers which almost perfectly synchronoized clocks might have cron run crontabber at the "exact" same time. Or rather, only a few milliseconds apart. But the database is central so what might happen is that two distinct PostgreSQL connection tries to send a... UPDATE crontabber SET ongoing=now() WHERE app_name='some-app-name' at the very same time.

So how is this solved? The answer is row-level locking. The magic sauce is here. You make a select, by app_name with a suffix of FOR UPDATE WAIT. Imagine two distinct PostgreSQL connections sending this:


BEGIN;
SELECT ongoing FROM crontabber WHERE app_name = 'my-app-name'
FOR UPDATE NOWAIT;

-- do some other stuff in Python

UPDATE crontabber SET ongoing = now() WHERE app_name = 'my-app-name';
COMMIT;

One of them will succeed the other will raise an error. Now all you need to do is catch that raised error, check that it's a row-level locking error and not some other general error. Instead of worrying about the raised error you just accept it and exit the program early.

This screenshot of a test.sql script demonstrates this:

Two distinct terminals sending an UPDATE to psql. One will error.
Two terminals lined up and I start one and quickly switch and start the other one

Another way to demonstrate this is to use psycopg2 in a little script:


import threading
import psycopg2


def updater():
    connection = psycopg2.connect('dbname=crontabber_exampleapp')
    cursor = connection.cursor()
    cursor.execute("""
    SELECT ongoing FROM crontabber WHERE app_name = 'bar'
    FOR UPDATE NOWAIT
    """)
    cursor.execute("""
    UPDATE crontabber SET ongoing = now() WHERE app_name = 'bar'
    """)
    print("JOB CAN START!")
    connection.commit()


# Use threads to simulate starting two connections virtually 
# simultaneously.
threads = [
    threading.Thread(target=updater),
    threading.Thread(target=updater),
]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

The output of this is:

▶ python /tmp/test.py
JOB CAN START!
Exception in thread Thread-1:
Traceback (most recent call last):
...
OperationalError: could not obtain lock on row in relation "crontabber"

With threads, you never know exactly which one will work and which one will not. In this case it was Thread-1 that sent its SQL a couple of nanoseconds too late.

In conclusion...

As of version 0.18 of crontabber, all locking is now dealt with inside crontabber. You still kick off crontabber from cron or crontab but if your cron does kick it off whilst it's still in the midst of running a job, it will simply exit with an error code of 2 or 3.

In other words, you can now have multiple servers with crontabber, all talking to a central PostgreSQL for its state, and not have to worry about jobs being started more than exactly once. This will be super useful if your crontabber apps are such that they kick of stored procedures that would freak out if run more than once with the same parameters.

Podcasttime.io - How Much Time Do Your Podcasts Take To Listen To?

February 13, 2017
3 comments Python, Web development, Django, JavaScript, React

tl;dr; It's a web app where you search and find the podcasts you listen to. It then gives you a break down how much time that requires to keep up, per day, per week and per month. Podcasttime.io

Podcasttime.io on Firefox iOS
First I wrote some scripts to scrape various sources of podcasts. This is basically a RSS feed URL from which you can fetch the name and an image. And with some cron jobs you can download and parse each podcast feed and build up an index of how many episodes they have and how long each episode is. Together with each episodes "publish date" you can easily figure out an average of how much content each podcast puts out over time.

Suppose you listen to JavaScript Air, Talk Python To Me and Google Cloud Platform Podcast for example, that means you need to listen to podcasts for about 8 minutes per day to keep up.

The Back End

The technology is exciting. The backend is a Django 1.10 server. It manages a PostgreSQL database of all the podcasts, episodes, cron jobs etc. Through Django ORM signals is packages up each podcast with its metadata and stores it in an Elasticsearch database. All the communication between Django and ElasticSearch is done with Elasticsearch DSL.

Also, all the downloading and parsing of feeds is done as background tasks in Celery. This got really interesting/challenging because sooo many podcasts are poorly marked up and many a times the only way to find out how long an episode is is to use ffmpeg to probe it and that takes time.

Another biggish challenge is that fact that often things simply don't work because of networks being what they are, unreliable. So you have to re-attempt network calls without accidentally getting caught in infinite loops of accidentally putting a bad/broken RSS feed back into the background queue again and again and again.

The Front End

Actually, the first prototype of this app was written with Django as the front end plus some jQuery to tie things together. On a plane ride, and as an excuse to learn it, I re-wrote the whole thing in React with Redux. To be honest, I never really enjoyed that and it felt like everything was hard and I had to do more jumping-around-files than actual coding. In particular, Redux is nice but when you have a lot of AJAX both inside components and upon mounting it gets quite messy in my humble opinion.

So, on another plane ride (to Hawaii, so I had more time) I re-wrote it from scratch but this time using three beautiful pieces of front end technology: create-react-app, Mobx and mobx-router. Suddenly it became fun again. Mobx (or Redux or something "fluxy") is necessary if you want fancy pushState URLs AND a central (aka global) state management.

To be perfectly honest, I never actually tried combining Mobx with something like react-router or if it's even possible. But with mobx-router it's quite neat. You write a "views route map" (see example) where you can kick off AJAX before entering (and leaving) routes. Then you use that to populate a global store and now all components can be almost entirely about simply rendering the store. There is some AJAX within the mounted components (e.g. the search and autocomplete).

Plotly graph
On the home page, there's a chart that rather unscientifically plots episode durations over time as a line chart. I'm trying a library called Plotly which is actually a online app for building charts but they offer a free JavaScript library too for generating graphs. Not entirely sure how I feel about it yet but apart from looking a big crowded on mobile, it's working really well.

A Killer Feature

This is a pattern I've wanted to build but never managed to get right. The way to get data about a podcast (and its episodes) is to do an Elasticsearch search. From the homepage you basically call /find?q=Planet%20money when you search. That gives you almost all the information you need. So you store that in the global store. Then, if the user clicks on that particular podcast to go to its "perma page" you can simply load that podcast's individual route and you don't need to do something like /find?id=727 because you already have everything you need. If the user then opens that page in a new tab or reloads you now have to fetch just the one podcast, so you simply call /find?id=727. In other words, subsequent page loads load instantly! (Basically, it updates the store's podcast object upon clicking any of the podcasts iterated over from the listing. Code here)

And to top that - and this is where a good router shines - if you make a search or something, click something and click back since you have a global store of state, you can simply reuse that without needing another AJAX query.

The State of the Future

First of all, this is a fun little side project and it's probably buggy. My goal is not to make money on it but to build up a graph. Every time someone uses the site and finds the podcasts they listen to that slowly builds up connections. If you listen to "The Economist", "Planet Money" and "Freakonomics", that tie those together loosely. It's hard to programmatically know that those three podcasts are "related" but they are by "peoples' taste".

The ultimate goal of this is; now I can recommend other podcasts based on a given set. It's a little bit like LastFM used to work. Using Audioscrobbler LastFM was able to build up a graph based on what people preferred to listen to and using that network of knowledge they can recommend things you have not listened to but probably would appreciate.

At the moment, there's a simple Picks listing of "lists" (aka "picks") that people have chosen. With enough time and traffic I'll try to use Elasticsearch's X-Pack Graph capabilities to develop a search engine based on this.

At the time of writing, I've indexed 4,669 podcasts, spanning 611,025 episodes which equates to 549,722 hours of podcast content.

The Code

The front end code is available on github.com/peterbe/podcasttime2 and is relatively neat and tidy. The most interesting piece is probably the views/index.js which is the "controller" of things. That's where it decides which component to render, does the AJAX queries and manages the global store.

The back end code is a bit messier. It's done as an "app" as part of this very blog. The way the Elasticsearch indexing is configured is here and the hotch potch code for scraping and parsing RSS feeds is here.

Please try it out and show me your selection. You can drop feedback here.

Autocompeter is Dead. Long live Autocompeter!

January 9, 2017
0 comments Python, Web development, Go

About 2 years ago I launched Autocompeter.com. It was two parts:

1) A autocompeter.js pure JavaScript solution to add autocomplete to a search input field.
2) A REST API where you can submit titles with a HTTP header key, and a fancy autocomplete search.

Only Rewrote the Go + Redis part

The second part has now been completely re-written. The server was originally written in Go and used Redis. Now it's Django and ElasticSearch.

The ultimate reason for this was that Redis was, by far, the biggest memory consumer on my shared DigitalOcean server. The way it worked was that every prefix of every word in every title was indexes as a key. For example the words p, pe, pet, pete, peter and peter$ are all keys and they point to an array of IDs that you then look up to get the distinct set of titles and their URLs. This makes it really really fast but since redis doesn't support namespaces, or multiple columns it means that for every prefix it needs a prefix of its own for the domain they belong to. So the hash for www.peterbe.com is eb9f747 so the strings to store are instead eb9f747p, eb9f747pe, eb9f747pet, eb9f747pete, eb9f747peter and eb9f747peter$.

ElasticSearch on the other hand has ALL of this built in deep in Lucene. AND you can filter. So the way it's queried now instead is something like this:


search = TitleDoc.search()
search = search.filter('term', domain=domain.name)
search = search.query(Q('match_phrase', title=request.GET['q']))
search = search.sort('-popularity', '_score')
search = search[:size]
response = search.execute()
...

And here's how the mapping is defined:


from elasticsearch_dsl import (
    DocType,
    Float,
    Text,
    Index,
    analyzer,
    Keyword,
    token_filter,
)


edge_ngram_analyzer = analyzer(
    'edge_ngram_analyzer',
    type='custom',
    tokenizer='standard',
    filter=[
        'lowercase',
        token_filter(
            'edge_ngram_filter', type='edgeNGram',
            min_gram=1, max_gram=20
        )
    ]
)


class TitleDoc(DocType):
    id = Keyword()
    domain = Keyword(required=True)
    url = Keyword(required=True, index=False)
    title = Text(
        required=True,
        analyzer=edge_ngram_analyzer,
        search_analyzer='standard'
    )
    popularity = Float()
    group = Keyword()

I'm learning ElasticSearch rapidly but I still feel like I have so much to learn. This solution I have here is quite good and I'm pretty happy with the results but I bet there's a lot of things I can learn to make it even better.

Why Ditch Go?

I actually had a lot of fun building the first server version of Autocompeter in Go but Django is just so many times more convenient. It's got management commands, ORM, authentication system, CSRF protection, awesome error reporting, etc. All built in! With Go I had to build everything from scratch.

Also, I felt like the important thing here is the JavaScript client and the database. Now that I've proven this to work with Django and elasticsearch-dsl I think it wouldn't be too hard to re-write the critical query API in Go or in something like Sanic for maximum performance.

All Dockerized

Oh, one of the reasons I wanted to do this new server in Python is because I want to learn Docker better and in particular Docker with Python projects.

The project is now entirely contained in Docker so you can start the PostgreSQL, ElasticSearch 5.1.1 and Django with docker-compose up. There might be a couple of things I've forgot to document for how to configure things but this is actually the first time I've developed something entirely in Docker.

ElasticSearch 5 in Travis-CI

January 6, 2017
0 comments Python, Linux, Web development

tl;dr; Here's a working .travis.yml file that works with ElasticSearch 5.1.1

I had to jump through hoops to get Travis-CI to run with ElasticSearch 5.1.1 and I thought I'd share. If you just do:

services:
  - elasticsearch

This is from the Travis-CI documentation but this installs ElasticSearch 1.4. Not good enough. The instructions on the same page for using higher versions did not work for me.

To get a specific version you need to download it yourself and install it with dpkg -i but the problem is that if you want to use ElasticSearch version 5, you need to have Java 1.8. The short answer is that this is how you install Java 1.8:

addons:
  apt:
    packages:
      - oracle-java8-set-default

But now you need to sudo so you need to add sudo: true in your .travis.yml. Bummer, because it makes the build a bit slower. However, a necessary evil.

The critical line I use to install it is this:

curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.1.deb && \
sudo dpkg -i --force-confnew elasticsearch-5.1.1.deb && \
sudo service elasticsearch start

I thought I could "upgrade" the existing install, but that breaks thinks. In other words you have to remove the services: - elasticsearch line or else it can't upgrade.

Now, during debugging I was not getting errors on the line:

sudo service elasticsearch start

So I add this to be sure the right version got installed:

#!/bin/bash
curl -v http://localhost:9200/

and then I can see that the right version was installed. It should look something like this:

* About to connect() to localhost port 9200 (#0)
*   Trying 127.0.0.1... connected
> GET / HTTP/1.1
> User-Agent: curl/7.22.0 (x86_64-pc-linux-gnu) libcurl/7.22.0 OpenSSL/1.0.1 zlib/1.2.3.4 libidn/1.23 librtmp/2.3
> Host: localhost:9200
> Accept: */*
> 
< HTTP/1.1 200 OK
< content-type: application/json; charset=UTF-8
< content-length: 327
< 
{
  "name" : "m_acpqT",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "b4_KnK6KQmSx64C9o-81Ug",
  "version" : {
    "number" : "5.1.1",
    "build_hash" : "5395e21",
    "build_date" : "2016-12-06T12:36:15.409Z",
    "build_snapshot" : false,
    "lucene_version" : "6.3.0"
  },
  "tagline" : "You Know, for Search"
}
* Connection #0 to host localhost left intact
* Closing connection #0

Note the line that says "number" : "5.1.1",.

So, yay! Hopefully this will help someone else because it took me quite a while to get right.

Using Fanout.io in Django

December 13, 2016
1 comment Python, Web development, Django, Mozilla, JavaScript

Earlier this year we started using Fanout.io in Air Mozilla to enhance the experience for users awaiting content updates. Here I hope to flesh out its details a bit to inspire others to deploy a similar solution.

What It Is

First of all, Fanout.io is basically a service that handles your WebSockets. You put in some of Fanout's JavaScript into your site that handles a persistent WebSocket connection between your site and Fanout.io. And to push messages to your user you basically send them to Fanout.io from the server and they "forward" it to the WebSocket.

The HTML page looks like this:


<html>
<body>

  <h1>Web Page</h1>

<!-- replace the FANOUT_REALM_ID with the ID you get in the Fanout.io admin page -->
<script 
  src="https://{{ FANOUT_REALM_ID }}.fanoutcdn.com/bayeux/static/faye-browser-1.1.2-fanout1-min.js"
></script>
<script src="fanout.js"></script>
</body>
</html>

And the fanout.js script looks like this:


window.onload = function() {
  // replace the FANOUT_REALM_ID with the ID you get in the Fanout.io admin page
  var client = new Faye.Client('https://{{ FANOUT_REALM_ID }}.fanoutcdn.com/bayeux')
  client.subscribe('/mycomments', function(data) {  
     console.log('Incoming updated data from the server:', data);
  })
};

And in server it looks something like this:


from django.conf import settings
import fanout

fanout.realm = settings.FANOUT_REALM_ID
fanout.key = settings.FANOUT_REALM_KEY


def post_comment(request):
    """A django view function that saves the posted comment"""
   text = request.POST['comment']
   saved_comment = Comment.objects.create(text=text, user=request.user)
   fanout.publish('mycomments', {'new_comment': saved_comment.id})
   return http.JsonResponse({'comment_posted': True})

Note that, in the client-side code, there's no security since there's no authentication. Any client can connect to any channel. So it's important that you don't send anything sensitive. In fact, you should think of this pattern simply as a hint that something has changed. For example, here's a slightly more fleshed out example of how you'd use the subscription.


window.onload = function() {
  // replace the FANOUT_REALM_ID with the ID you get in the Fanout.io admin page
  var client = new Faye.Client('https://{{ FANOUT_REALM_ID }}.fanoutcdn.com/bayeux')
  client.subscribe('/mycomments', function(data) {  
    if (data.new_comment) {
      // server says a new comment has been posted in the server
      $.json('/comments', function(response) {
        $('#comments .comment').remove();
        $.each(response.comments, function(comment) {        
          $('<div class="comment">')
          .append($('<p>').text(comment.text))
          .append($('<span>').text('By: ' + comment.user.name))
          .appendTo('#comments');
        });
      });
    }
  })
};

Yes, I know jQuery isn't hip but it demonstrates the pattern well. Also, in the real world you might not want to ask the server for all comments (and re-render) but instead do an AJAX query to get all new comments since some parameter or something.

Why It's Awesome

It's awesome because you can have a simple page that updates near instantly when the server's database is updated. The alternative would be to do a setInterval loop that frequently does an AJAX query to see if there's new content to update. This is cumbersome because it requires a lot heavier AJAX queries. You might want to make it secure so you engage sessions that need to be looked up each time. Or, since you're going to request it often you have to write a very optimized server-side endpoint that is cheap to query often.

And last but not least, if you rely on an AJAX loop interval, you have to pick a frequency that your server can cope with and it's likely to be in the range of several seconds or else it might overload the server. That means that updates are quite delayed.

But maybe most important, you don't need to worry about running a WebSocket server. It's not terribly hard to do one yourself on your laptop with a bit of Node Express or Tornado but now you have yet another server to maintain and it, internally, needs to be connected to a "pub-sub framework" like Redis or a full blown message queue.

Alternatives

Fanout.io is not the only service that offers this. The decision to use Fanout.io was taken about a year ago and one of the attractive things it offers is that it's got a freemium option which is ideal for doing local testing. The honest truth is that I can't remember the other justifications used to chose Fanout.io over its competitors but here are some alternatives that popped up on a quick search:

It seems they all (including Fanout.io) has freemium plans, supports authentication, REST APIs (for sending and for querying connected clients' stats).

There are also some more advanced feature packed solutions like Meteor, Firebase and GunDB that act more like databases that are connected via WebSockets or alike. For example, you can have a database as a "conduit" for pushing data to a client. Meaning, instead of sending the data from the server directly you save it in a database which syncs to the connected clients.

Lastly, I've heard that Heroku has a really neat solution that does something similar whereby it sets up something similar as an extension.

Let's Get Realistic

The solution sketched out above is very simplistic. There are a lot more fine-grained details that you'd probably want to zoom in to if you're going to do this properly.

Throttling

In Air Mozilla, we call fanout.publish(channel, message) from a post_save ORM signal. If you have a lot of saves for some reason, you might be sending too many messages to the client. A throttling solution, per channel, simply makes sure your "callback" gets called only once per channel per small time frame. Here's the solution we employed:


window.Fanout = (function() {
  var _locks = {};
  return {
    subscribe: function subscribe(channel, callback) {
      _client.subscribe(channel, function(data) {
          if (_locks[channel]) {
              // throttled
              return;
          }
          _locks[channel] = true;
          callback(data);
          setTimeout(function() {
              _locks[channel] = false;
          }, 500);
      });        
    };
  }
})();

Subresource Integrity

Subresource integrity is an important web security technique where you know in advance a hash of the remote JavaScript you include. That means that if someone hacks the result of loading https://cdn.example.com/somelib.js the browser compares the hash of that with a hash mentioned in the <script> tag and refuses to load it if the hash doesn't match.

In the example of Fanout.io it actually looks like this:


<script 
  src="https://{{ FANOUT_REALM_ID }}.fanoutcdn.com/bayeux/static/faye-browser-1.1.2-fanout1-min.js"
  crossOrigin="anonymous"
  integrity="sha384-/9uLm3UDnP3tBHstjgZiqLa7fopVRjYmFinSBjz+FPS/ibb2C4aowhIttvYIGGt9"
></script>

The SHA you get from the Fanout.io documentation. It requires, and implies, that you need to use an exact version of the library. You can't use it like this: <script src="https://cdn.example/somelib.latest.min.js" ....

WebSockets vs. Long-polling

Fanout.io's JavaScript client follows a pattern that makes it compatible with clients that don't support WebSockets. The first technique it uses is called long-polling. With this the server basically relys on standard HTTP techniques but the responses are long lasting instead. It means the request simply takes a very long time to respond and when it does, that's when data can be passed.

This is not a problem for modern browsers. They almost all support WebSocket but you might have an application that isn't a modern browser.

Anyway, what Fanout.io does internally is that it first creates a long-polling connection but then shortly after tries to "upgrade" to WebSockets if it's supported. However, the projects I work only need to support modern browsers and there's a trick to tell Fanout to go straight to WebSockets:


var client = new Faye.Client('https://{{ FANOUT_REALM_ID }}.fanoutcdn.com/bayeux', {
    // What this means is that we're opting to have
    // Fanout start with fancy-pants WebSocket and
    // if that doesn't work it falls back on other
    // options, such as long-polling.
    // The default behaviour is that it starts with
    // long-polling and tries to "upgrade" itself
    // to WebSocket.
    transportMode: 'fallback'
});

Fallbacks

In the case of Air Mozilla, it already had a traditional solution whereby it does a setInterval loop that does an AJAX query frequently.

Because the networks can be flaky or because something might go wrong in the client, the way we use it is like this:


var RELOAD_INTERVAL = 5;  // seconds

if (typeof window.Fanout !== 'undefined') {
    Fanout.subscribe('/' + container.data('subscription-channel-comments'), function(data) {
        // Supposedly the comments have changed.
        // For security, let's not trust the data but just take it
        // as a hint that it's worth doing an AJAX query
        // now.
        Comments.load(container, data);
    });
    // If Fanout doesn't work for some reason even though it
    // was made available, still use the regular old
    // interval. Just not as frequently.
    RELOAD_INTERVAL = 60 * 5;
}
setInterval(function() {
    Comments.reload_loop(container);
}, RELOAD_INTERVAL * 1000);

Use Fanout Selectively/Progressively

In the case of Air Mozilla, there are lots of pages. Some don't ever need a WebSocket connection. For example, it might be a simple CRUD (Create Update Delete) page. So, for that I made the whole Fanout functionality "lazy" and it only gets set up if the page has some JavaScript that knows it needs it.

This also has the benefit that the Fanout resource loading etc. is slightly delayed until more pressing things have loaded and the DOM is ready.

You can see the whole solution here. And the way you use it here.

Have Many Channels

You can have as many channels as you like. Don't create a channel called comments when you can have a channel called comments-123 where 123 is the ID of the page you're on for example.

In the case of Air Mozilla, there's a channel for every single page. If you're sitting on a page with a commenting widget, it doesn't get WebSocket messages about newly posted comments on other pages.

Conclusion

We've now used Fanout for almost a year in our little Django + jQuery app and it's been great. The management pages in Air Mozilla use AngularJS and the integration looks like this in the event manager page:


window.Fanout.subscribe('/events', function(data) {
    $scope.$apply(lookForModifiedEvents);
});

Fanout.io's been great to us. Really responsive support and very reliable. But if I were to start a fresh new project that needs a solution like this I'd try to spend a little time to investigate the competitors to see if there are some neat features I'd enjoy.

UPDATE

Fanout reached out to help explain more what's great about Fanout.io

"One of Fanout's biggest differentiators is that we use and promote open technologies/standards. For example, our service supports the open Bayeux protocol, and you can connect to it with any compatible client library, such as Faye. Nearly all competing services have proprietary protocols. This "open" aspect of Fanout aligns pretty well with Mozilla's values, and in fact you'd have a hard time finding any alternative that works the same way."

Cope with JSONDecodeError in requests.get().json() in Python 2 and 3

November 16, 2016
6 comments Python

UPDATE June 2020

Thanks to commentors, I've been made aware that requests will use simplejson if it's installed to handle the deserialization of the JSON. So if you have simplejson in your requirements.txt or pyproject.toml you have to change to this:


import requests
import simplejson

response = requests.get(url)
try:
    print(response.json())
except simplejson.errors.JSONDecodeError:
    print("N'est pas JSON")

Or, make your app more solid by doing the same trick that requests does:


import requests
try:
    from simplejson.errors import JSONDecodeError
except ImportError:
    from json.decoder import JSONDecodeError

response = requests.get(url)
try:
    print(response.json())
except JSONDecodeError:
    print("N'est pas JSON")

Now, back to the old blog post...


Suppose you don't know with a hundred percent certainty that an API will respond in with a JSON payload you need to protect yourself.

This is how you do it in Python 3:


import json
import requests

response = requests.get(url)
try:
    print(response.json())
except json.decoder.JSONDecodeError:
    print("N'est pas JSON")

This is how you do it in Python 2:


import requests

response = requests.get(url)
try:
    print response.json()
except ValueError:
    print "N'est pas JSON"

Here's how you make the code work across both:


import json
import requests

try:
    from json.decoder import JSONDecodeError
except ImportError:
    JSONDecodeError = ValueError

response = requests.get(url)
try:
    print(response.json())
except JSONDecodeError:
    print("N'est pas JSON")