Nathan Hoad

Adventures in C++: symmetric_filter Example

November 26, 2020

Nov 27 update: In the interest of simplifying the code as much as possible, I’ve removed the templating completely.

TL;DR: If you want a minimal and simple example of a boost::iostreams::symmetric_filter, you can find one below.

This week I found myself wanting to decrypt and decompress a stream of data in C++. The internet’s preferred solution to the decompression portion is to use a boost::iostreams::filtering_stream with the stream filter of your choice, e.g. boost::iostreams::gzip_decompressor or boost::iostreams::zstd_decompressor. This is great because it makes it trivial to decompres or compress streams of data - a single line of code.

I found myself stuck for the decryption portion, however. Boost.Iostreams (completely reasonably) doesn’t provide an implementation of any encryption or decryption filters. So I knew I’d have to write my own. From the documentation I figured I wanted a symmetric_filter, because it’s useful for defining filters based on C language APIs (my encryption/decryption uses OpenSSL).

The problem, however, is that the only uses of symmetric_filter I could find were the implementations of the zstd/bzip2/gzip filters within the library itself, which I found difficult to understand, because they’re doing something pretty complex. I also don’t have a lot of experience using templates in C++, so all of the templating code and macros melted my brain.

So what follows is my attempt to fill this gap in human knowledge. Below is a simple example of how to implement a symmetric filter capable of processing data, and some notes on tweaking buffer sizes, because naturally that always comes up.

Firstly, a primer in boost::iostreams::filtering_stream

Say you have a file that’s been compressed with gzip, and you want to decompress it in C++. You have a couple of options.

  1. Read the entire file in, and decompress the string using the C api
  2. Read the file in a chunk at a time, and decompress each chunk using the C api
  3. Use boost::iostreams::filtering_stream with boost::iostreams::gzip_decompressor

Option 1 is not friendly to memory. What if you have a 3GB file? Consuming 3GB of memory to decompress a string into something significantly larger has a decent chance of exhausting all of your RAM. Option 2 is fine and all, but then you have to switch between char* and std::string a bunch, and it’s all very manual. Option 3 is the best option, because it lets us transparently read from the file as though it was never compressed in the first place, without blowing out memory or having a lot of messy code. Here’s an example of how to read a compressed file up from disk, decompressing it along the way:

#include <iostream>
#include <fstream>

#include <boost/iostreams/copy.hpp>
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/device/null.hpp>
#include <boost/iostreams/filter/gzip.hpp>

int main()
{
    std::ifstream in("output.dat.gz", std::ios::in | std::ios::binary);
    
    // make a filter object
    boost::iostreams::filtering_istream filter;
    
    // push a decompressor onto the stack of filters
    filter.push(boost::iostreams::gzip_decompressor());
    
    // push a source, i.e. the file we're reading in
    filter.push(in);
    
    // for this example we'll just trash the output
    boost::iostreams::null_sink out;
    
    // actually perform the copy
    auto bytes_written = boost::iostreams::copy(filter, out);
    
    std::cout << bytes_written << " bytes written" << std::endl;

    return 0;
}

Sample compilation and run:

$ g++ filtering.cpp -std=c++11 -lboost_iostreams && time ./a.out
3145728000 bytes written
./a.out  3.69s user 0.01s system 98% cpu 3.767 total

Using three lines of code above, we’ve got completely transparent gzip decompression going on. That’s pretty great! filtering_istream is a subclass of std::istream, so anything that takes an istream can take a filtering_istream.

Okay, but what about symmetric_filter?

As I said, I also wanted to decrypt data as it was coming through. Similar to above, I didn’t want to read the entire stream into memory. Given I’d just discovered filtering_stream, I didn’t want to give that up as well, so I figured I should be able to write a filter to decrypt the data as it comes through. And here you go:

#include <memory>
#include <iostream>
#include <sstream>
#include <fstream>

#include <boost/iostreams/copy.hpp>
#include <boost/iostreams/device/null.hpp>
#include <boost/iostreams/filter/gzip.hpp>
#include <boost/iostreams/filter/symmetric.hpp>
#include <boost/iostreams/filtering_stream.hpp>

// the actual filter gets implemented here
class symmetric_filter_example_impl {
public:
    // you can pass one argument in when constructing your filter. If you need more,
    // consider using a struct.
    explicit symmetric_filter_example_impl(int param_one) :
        param_one(param_one) {};

    // see implementation for a full explanation
    bool filter(const char*& begin_in, const char* end_in,
                char*& begin_out, char* end_out, bool flush);
    // reset the filter state, if you have any
    void close();

    typedef char char_type;

private:
    int param_one;
};


// this is the class you actually interact with, but it's basically a proxy/child class
// to fullfill the API contract of symmetric_filter for your
// symmetric_filter_example_impl class above. You can basically copy paste this and
// change the naming however you want.
class symmetric_filter_example:
    public boost::iostreams::symmetric_filter<symmetric_filter_example_impl>
{
private:
    typedef symmetric_filter_example_impl impl_type;
    typedef boost::iostreams::symmetric_filter<impl_type>    base_type;
public:
    typedef typename base_type::char_type               char_type;
    typedef typename base_type::category                category;

    symmetric_filter_example(
        int param_one, std::streamsize buffer_size = boost::iostreams::default_device_buffer_size) :
            base_type(buffer_size, param_one) {};
};

bool symmetric_filter_example_impl::filter(
    const char*& begin_in, const char* end_in,
    char*& begin_out, char* end_out, bool flush)
{
    // begin_in is the inclusive beginning of your input buffer
    // end_in is the exclusive end of your input buffer
    // begin_out is the inclusive beginning of your output buffer
    // end_out is the exclusive end of your output buffer
    // flush=true indicates that there's no more input data coming
    // flush=false indicates that there's still more data coming
    size_t in_length = end_in - begin_in;

    // the size of our output buffer
    size_t out_length = end_out - begin_out;

    // we can't copy to a smaller buffer than we have
    size_t copy_length = std::min(out_length, in_length);

    std::cout << "filter(" << param_one << ") in=" << in_length
              << " out=" <<  out_length << " max-transfer=" << copy_length
              << " flush=" << flush << std::endl;

    // let's perform our operation, which is just a straight copy for our example.
    // You might like to call EVP_DecryptUpdate or something.
    memcpy(begin_out, begin_in, copy_length);

    // to tell the caller that you've consumed some data, you need to modify
    // begin_in to point to the next unconsumed byte (and so, it follows that consuming all
    // of the input means that begin_in == begin_in).
    begin_in += copy_length;

    // similar to begin_in, to tell the caller that you've written some data, you need
    // to modify begin_out to point to the next unwritten byte.
    begin_out += copy_length;

    // return true to indicate you have more data to write out before receiving some
    // more data
    // and so returning false means you're ready for more input data
    return false;
}

void symmetric_filter_example_impl::close()
{
    // if you had any kind of state in your filter, you'd reset it here
    std::cout << "closing" << std::endl;
}


// more abstract object for storing both your source stream and your filter as generic
// objects
struct MyStorageObject {
    // whatever other random data you might want to store...
    int header;

    // this is the filtered stream
    std::shared_ptr<std::istream> stream;

private:
    // this is the actual stream. We need this to be stored as member on top of the
    // stream filter as source types are stored by reference in a filtering_stream.
    std::shared_ptr<std::istream> raw_stream;

    friend MyStorageObject build_filter();
};

MyStorageObject build_filter()
{
    MyStorageObject f;

    f.raw_stream = std::make_shared<std::ifstream>("testfile.txt", std::ios::binary|std::ios::in);

    auto stream_filter = std::make_shared<boost::iostreams::filtering_istream>();

    // you set the buffer size for each filter, not for one giant buffer or anything.
    // The argument to symmetric_filter_example is the input buffer size, the argument
    // to push is the output buffer size
    stream_filter->push(symmetric_filter_example(1, 1*1024), 2*1024);
    // another filter with a different param, so you can see how data is streamed
    // between filters
    stream_filter->push(symmetric_filter_example(2, 3*1024), 4*1024);
    // setting your read size in your source object to match that if your input buffer
    // makes sense to reduce the number of syscalls you'll make to fill a buffer
    stream_filter->push(*f.raw_stream, 5*1024);

    f.header = 1234;
    f.stream = stream_filter;

    return f;
}

int main()
{
    std::stringstream out;

    MyStorageObject f = build_filter();
    boost::iostreams::copy(*f.stream, out);

    return 0;
}

The MyStorageObject struct isn’t necessary at all, it’s just an example of how you might want to store your source and filter objects. It’s important that you store the source in some way such that it doesn’t outlive your filter, because filtering_stream takes references to source objects.

If my explanation on the filter function doesn’t clarify enough, you should read the explanation at the top of the source.

Aside from that though, I’m not going to talk much about this. The comments should do the talking.

Some Stuff to Watch Out For

Buffer sizes massively impact performance

This is C++, so you know, you’re always optimizing stuff. Take the gzip decompression example above, for example. It takes 10 seconds to run with the default parameters for me, but if I change the output buffer size on the decompressor like so:

filter.push(boost::iostreams::gzip_decompressor(), 16*1024);

Then suddenly it goes from 10 seconds down to 4, at the expense of 16KB of memory. You can also tweak the input buffer by passing the desired size to your filter object, e.g.:

filter.push(
    boost::iostreams::gzip_decompressor(
        boost::iostreams::gzip::default_window_bits, 16*1024),
        16*1024
    );

This sort of thing makes a lot of sense in this case. Your output buffer should always be larger than your input buffer when you’re decompressing something. Note that the default buffer size for a filter is 128 bytes, so you should almost certainly pick something larger unless you have very little memory.

The biggest thing to note from above is that I’m showing you how to set both the input and output buffer sizes. It took me quite a while to figure out how to set the input buffer size.

Filters are copy constructed, sources are by reference

In case you missed it in the above example, you have to keep your source object around, because filtering_istream::push takes a reference to it. If you start getting segfaults and you can’t figure out why, check for this first.

Conclusions

That’s about all I have to say about this. boost::iostreams::filtering_istream is awesome and more people should use it, and implementing filters isn’t that hard once you know what you’re doing.