List deletions

By leonardo maffi
V.1.0, May 13 2009
keywords: programming, D language, Python, Psyco, C language, benchmarks

[Go back to the article index]

Once in a while in my programs I need to keep an ordered sequence of items, process them all iteratively, and during each iteration remove few of them that aren't good anymore. So in each iteration only small number of items is removed, and such removals happen in random positions.

To focus on a basic version of such problem, the items can be random positive integers, the operation can decrease them by 1, and they can be removed when they become 0. The iterations stop when there are no items left in the sequence.

Here you can find all the code shown below (the zip may contain code more up to date compared to the code shown in this page, so use the zip as the reference code):
http://www.fantascienza.net/leonardo/js/list_deletions.zip

If you don't need to keep the order of the items then a good solution is just to keep them in a fixed-size array, and replace each removed one with good items taken from the end of the array (you need to keep the used length of the array).

In Python if you need to keep the items ordered you can keep them into a Python list (that is an a dynamic array), and in each iteration create a new array of the good items:

# Python program #1
from random import randrange
from timeit import default_timer as clock

def generate(nitems, k):
    return [randrange(k) for i in xrange(nitems)]

def map_filter(items):
    return [x-1 for x in items if x > 1]

def main(nitems, k):
    items = generate(nitems, k)
    t = clock()
    while items:
        items = map_filter(items)
        if nitems <= 300:
            print items, "\n"
    print round(clock() - t, 2)

import psyco; psyco.full()
main(400000, 400)

I have used Psyco (available for Python 2.6 too) to speed up the program.

You may also think about removing items in-place, like in the following function, but this is too much slow:
# unused Python function
def map_filter(items):
    i = 0
    while i < len(items):
        if items[i] > 0:
            items[i] -= 1
            i += 1
        else:
            del items[i:i+1]


A much better strategy is to perform the operation on items, and create a new shorter array only when there are too many zeros. This decreases the frequency of list creations, almost doubling the running speed (see at the bottom for the timings):
# from Python program #2
def map_filter(items):
    nzeros = 0
    for i in xrange(len(items)):
        if items[i] > 0:
            items[i] -= 1
        else:
            nzeros += 1

    if nzeros > 0:
        if (float(nzeros) / len(items)) > 0.1:
            items = filter(None, items)
    return items

The constant 0.1 comes from experiments.
In most situations you don't need something more efficient than the second Python version.

Now solutions in the D language, that you can use in a lower level way.
In D the append to dynamic arrays is very slow, so I have used my ArrayBuilder to create a first version similar to the first Python version:
// program D #1
import std.c.stdio: printf;
// dlibs: http://www.fantascienza.net/leonardo/so/libs_d.zip
import d.random: fastRandInt;
import d.time: clock;
import d.builders: ArrayBuilder;

int[] generate(int nitems, int k) {
    auto result = new int[nitems];
    for (int i; i < nitems; i++)
        result[i] = fastRandInt(k-1);
    return result;
}

bool map_filter(ref int[] items) {
    ArrayBuilder!(int) temp;
    foreach (item; items)
        if (item > 1)
            temp ~= item - 1;
    items = temp.toarray;
    return items.length != 0;
}

void main() {
    auto data = generate(400_000, 400);
    auto t = clock();
    while (map_filter(data)) {}
    printf("%f\n", clock() - t);
}

Thanks to the ArrayBuilder in many situations that simple code is fast enough.
Older programmers can think an obvious way to speed up the code, managing items with a single linked list:
// program D #2
import std.c.stdio: printf;
import d.random: randInt;
import d.time: clock;

struct Node {
    int data;
    Node* next;
}

Node* generate(int nnodes, int k) {
    auto nodes = new Node[nnodes];
    for (int i = 0; i < nnodes - 1; i++)
        nodes[i] = Node(randInt(k-1), &(nodes[i+1]));
    nodes[$-1] = Node(randInt(k-1), null);
    return nodes.ptr;
}

void show_list(Node* nodes) {
    if (nodes !is null)
        for (; nodes; nodes = nodes.next)
            printf("%d ", nodes.data);
    printf("\n");
}

Node* map_filter(Node* nodes) {
    Node* result;
    while (nodes != null) {
        if (nodes.data > 0) {
            nodes.data--;
            Node* temp = nodes.next;
            nodes.next = result;
            result = nodes;
            nodes = temp;
        } else {
            nodes = nodes.next;
        }
    }

    return result;
}

void main() {
    auto data = generate(400_000, 400);
    //show_list(data);

    auto t = clock();
    do {
        data = map_filter(data);
        //show_list(data);
    } while (data != null);
    printf("%f\n", clock() - t);
}

The node allocations are done with a single 'new' of an array of nodes, that are then linked.

This second map-filter code is faster than the first one, but not much so. A C version of this program is about equally fast, see list_deletions_c.

This program creates a second list and moves to it all the good items, decreasing their value. In most iterations most items are good (often >99%), so removing bad items and leaving good ones untouched can improve performance, but it requires code more complex (see version D#6 below).

So I have improved the array-based version, (inner functions are static for performance). This is more complex than the second Python version, but uses the same general strategy (a similar version can be created for Psyco too, but it's worse than the second Python version). Most iterations are done with ignoring_update() that just sets items to

// program D #3
import d.random: fastRandInt;
import d.time: clock;
import d.builders: ArrayBuilder;
import std.stdio: writefln;

int[] generate(int nitems, int k) {
    auto result = new int[nitems];
    for (int i; i < nitems; i++)
        result[i] = fastRandInt(k-1);
    return result;
}

bool map_filter(ref int[] items) {
    static bool ignoring_update(int[] items) {
        bool modified = false;
        int i = 0;
        for (; i < items.length; i++)
            if (items[i] > 0) {
                modified = true;
                items[i]--;
                break;
            }

        for (; i < items.length; i++)
            if (items[i] > 0)
                items[i]--;

        return modified;
    }

    static bool compacting_update(ref int[] items) {
        ArrayBuilder!(int) temp;
        foreach (item; items)
            if (item > 1)
                temp ~= item - 1;
        items = temp.toarray;
        return items.length != 0;
    }

    if (items.length < 300)
        return ignoring_update(items);

    // compute approximate stats
    int nprobes = items.length / 20;
    nprobes = nprobes > 5000 ? 5000 : nprobes;
    int nempty;
    for (int i; i < nprobes; i++)
        if (items[i] <= 0)
            nempty++;

    if ((cast(double)nempty / cast(double)nprobes) < 0.07)
        // less than 7% are bad, found experimentally. This is a
        // very small percentage. Maybe a higher percentage breaks
        // too much the future instructions done by the cpu
        return ignoring_update(items);
    else
        return compacting_update(items);
}

/// compute full stats
int stats(int[] items) {
    int nempty;
    foreach (el; items)
        if (el <= 0)
        nempty++;
    return nempty;
}

void main() {
    auto data = generate(400_000, 400);
    auto t = clock();
    while (map_filter(data)) {
        //writefln(data, "\n");
        //int empty = stats(data);
        //writefln("len, empty: ", data.length, " ", empty);
    }
    writefln(clock() - t);
}


ignoring_update() decreases items bigger than zero, but doesn't change the length of the array.
compacting_update() is like the update() of the first D version, it creates a new compacted array.

The function to call if found sampling the first part of the array and counting how many zeros it contains. If zeros aren't many, calls ignoring_update(), otherwise calls compacting_update().
I have found the best threshold value experimentally, it's very small, about 7%.

In D programs often lot of time is spent allocating memory. We can remove most memory allocations from the first D program keeping two arrays and moving the items from one to the other and then back to the first:
// program D #4
import std.stdio: writefln;
// dlibs: http://www.fantascienza.net/leonardo/so/libs_d.zip
import d.random: fastRandInt;
import d.time: clock;

int[] generate(int nitems, int k) {
    auto result = new int[nitems];
    for (int i; i < nitems; i++)
        result[i] = fastRandInt(k-1);
    return result;
}

bool map_filter(ref int[] items, ref int[] aux) {
    aux.length = items.length;
    int pos;
    foreach (item; items)
        if (item > 1)
            aux[pos++] = item - 1;
    aux.length = pos;

    // swap
    int[] temp = items;
    items = aux;
    aux = temp;

    return pos != 0;
}

void main() {
    const int n = 400_000, k = 400;
    auto data = generate(n, k);
    int[] aux;
    auto t = clock();
    while (map_filter(data, aux))
        if (n <= 400)
            writefln(data, "\n");
    writefln(clock() - t);
}

The key to understand the efficiency this 4th program is that the D GC doesn't perform any allocation if the new specified length of an array is equal or less than the current one.

This 4th version is almost as fast as the 3rd one, but it's much simpler, so where the basic functional strategy isn't fast enough, this strategy is good (and it doesn't even need an ArrayBuilder). A similar strategy can be used in Python too, keeping the lengths of two fixed-size lists into extra variables.

A similar strategy can be used in Python too, with good enough results:
# Python program #3
from random import randrange
from timeit import default_timer as clock

def generate(nitems, k):
    return [randrange(k) for i in xrange(nitems)]

def map_filter(items, len_items, aux):
    pos = 0
    for i in xrange(len_items):
        if items[i] > 1:
            aux[pos] = items[i] - 1
            pos += 1
    items, aux = aux, items
    return items, pos, aux

def main(nitems, k):
    items = generate(nitems, k)
    aux = [None] * len(items)
    len_items = nitems
    if nitems <= 300: print items, "\n"

    t = clock()
    while len_items:
        items, len_items, aux = map_filter(items, len_items, aux)
        if nitems <= 300: print len_items, items, "\n"
    print round(clock() - t, 2)

import psyco; psyco.full()
main(400000, 400)


In D we can use the same idea with the older idea of not compacting when there are few zeros:
// program D #5
import std.stdio: writefln;
// dlibs: http://www.fantascienza.net/leonardo/so/libs_d.zip
import d.random: fastRandInt;
import d.time: clock;

int[] generate(int nitems, int k) {
    auto result = new int[nitems];
    for (int i; i < nitems; i++)
        result[i] = fastRandInt(k-1);
    return result;
}

bool map_filter(ref int[] items, ref int[] aux) {
    static bool ignoring_update(int[] items) {
        bool modified = false;
        int i = 0;
        for (; i < items.length; i++)
            if (items[i] > 0) {
                modified = true;
                items[i]--;
                break;
            }

        for (; i < items.length; i++)
            if (items[i] > 0)
                items[i]--;

        return modified;
    }

    if (items.length < 200)
        return ignoring_update(items);

    // compute approximate stats
    int nprobes = items.length / 20;
    nprobes = nprobes > 5000 ? 5000 : nprobes;
    int nempty;
    for (int i; i < nprobes; i++)
        if (items[i] <= 0)
            nempty++;

    if ((cast(double)nempty / cast(double)nprobes) < 0.04) {
        // 4% of the items are zero, or less
        return ignoring_update(items);
    } else {
        // compacting update
        aux.length = items.length;
        int pos;
        foreach (item; items)
            if (item > 1)
                aux[pos++] = item - 1;
        aux.length = pos;

        // swap
        int[] temp = items;
        items = aux;
        aux = temp;

        return pos != 0;
    }
}

void main() {
    const int n = 400_000, k = 400;
    auto data = generate(n, k);
    if (n <= 400) writefln(data, "\n");
    int[] aux;

    auto t = clock();
    while (map_filter(data, aux))
        if (n <= 400) writefln(data, "\n");
    writefln(clock() - t);
}

This is faster in running time, but compared to the 3rd D version is also much faster in total running time, because at the end the GC has much less memory to free.

Python is well known for being useful to prototype code. But probably less people understand how useful is optimizing code for two languages at the same time. The frequent shift in point of view allows to see the problem from more than one point of view at the same time, allowing to develop and test ideas better.

As you can see the optimal threshold percentage of D#5 is 4% compared to the 7% of D#3, probably because in D#5 the compacting update is faster, because it usually (but the first time) doesn't allocate new memory, so the optimal balance is a bit different.

I have translated the D #5 into C (list_deletions2_c, not shown here), and using GCC the performance is about the same (using llvm-gcc the performance is lower), so for this program the D compiler is good.

In theory one of fastest versions is to keep a list of only the good items, and remove just the bad ones, leaving the good ones untouched. You can do this with the linked list. First I scan the list to ignore the first bad items until I find a good one. The current pointer now is the new list header to be used in the successive iteration. Then we define a cursor that is kept one node before the node currently processed, so deletions can be done in a very simple way. So the map_filter() is short function.

Unfortunately the performance is even worse than the D#2 (and D#1) version (a C version of D#6 shows the same timings), I don't know why, maybe the double indirections require too much time.

The general low performance of the liked-list solutions is quite probably caused by cache misses, because the nodes are fixed in position in the array allocated by generate() so the size of the whole structure doesn't change, and lot of memory has to be accessed in every iterations, generating many cache misses.

To solve this cache problem, once in a while such linked list has to be compacted in memory. In practice I think there's no hope of going faster than the version D#5 that uses array. Regarding memory used, D#5 uses two arrays, but each of then is half the size of the array of data-pointer allocated by the generate() of D#6, so on 32 bit operating systems/CPU the memory used is the same (about 34 MB for nitems=4_000_000). On 64 bit operating systems/CPU the memory used by D#6 is bigger because the pointers are bigger (while int is 32 bit still on D).
// program D #6
import std.c.stdio: printf;
import d.random: randInt;
import d.time: clock;

struct Node {
    int data;
    Node* next;
}

Node* generate(int nnodes, int k) {
    auto nodes = new Node[nnodes];
    for (int i = 0; i < nnodes - 1; i++)
        nodes[i] = Node(randInt(k-1), &(nodes[i+1]));
    nodes[$-1] = Node(randInt(k-1), null);
    return nodes.ptr;
}

void show_list(Node* nodes) {
    if (nodes !is null)
        for (; nodes; nodes = nodes.next)
            printf("%d ", nodes.data);
    printf("\n\n");
}

Node* map_filter(Node* nodes) {
    // skip the first empty items
    while (nodes != null && nodes.data < 2)
        nodes = nodes.next;

    if (nodes != null) {
        nodes.data--; // process the current item
        Node* cursor = nodes; // one node before the current item
        while (cursor.next != null) {
            if (cursor.next.data < 2)
                cursor.next = cursor.next.next; // remove item
            else {
                cursor.next.data--; // process item
                cursor = cursor.next; // step forward
            }
        }
    }

    return nodes;
}

void main() {
    auto data = generate(400_000, 400);
    //show_list(data);

    auto t = clock();
    do {
        data = map_filter(data);
        //show_list(data);
    } while (data != null);
    printf("%f\n", clock() - t);
}


Timings nitems=400_000, k=400, seconds, best of 3:
  list_deletions3_py: 18.73 (no Psyco)
  list_deletions2_py: 15.93 (no Psyco)
  list_deletions1_py: 10.43 (no Psyco)   (57.9 X)
  list_deletions1_py:  5.37              (30   X)
  list_deletions3_py:  2.36
  list_deletions2_py:  2.30              (12.7 X)
  list_deletions6_d:   0.75 (0.86 total)
  list_deletions1_d:   0.73
  list_deletions2_d:   0.60
  list_deletions1_c:   0.59
  list_deletions4_d:   0.26
  list_deletions3_d:   0.22 (0.43 total)
  list_deletions5_d:   0.18 (0.25 total) ( 1 X)
  list_deletions2_c:   0.17 (0.24 total)

Timings nitems=4_000_000, k=1500, seconds, best of 3:
  list_deletions3_d:   8.36 (8.58 total)
  list_deletions2_c:   7.80 (7.95 total)
  list_deletions5_d:   7.79 (8.08 total)


CPU: Intel Core 2, 2 GHz (2 cores), WinXP SP2.

D code compiled with:
DMD v1.042 and D 2.029
dmd -O -release -inline

Python 2.6.2 (r262:71605, Apr 14 2009, 22:40:02)
[MSC v.1500 32 bit (Intel)] on win32

Psyco for Python 2.6, V.1.6.0 final 0

[Go back to the article index]