diff --git a/Dockerfile b/Dockerfile index cac11d8c24eb9f4289e9cd383e75a0630c9603be..bb69bbdca0c06a4fecfa88e34d3cdb871830e6c2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,7 +22,7 @@ WORKDIR /build RUN true \ && apk add --no-cache --virtual .build-deps alpine-sdk libffi-dev \ && apk add --no-cache libffi \ - && pip3 install --no-cache-dir .[sqlite,redis,uvloop] \ + && pip3 install --no-cache-dir .[sqlite,redis,postgres,uvloop] \ && mkdir /var/lib/mta-sts \ && chown -R "$USER:$USER" /build /var/lib/mta-sts \ && apk del .build-deps \ diff --git a/README.md b/README.md index 976a81eff6fd5245f6b337ff6cabaf4b5345f2af..af01d40b6874e00190c726c0967bfb6badae76c0 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ All dependency packages installed automatically if this package is installed via Run: ```bash -sudo python3 -m pip install postfix-mta-sts-resolver[redis,sqlite] +sudo python3 -m pip install postfix-mta-sts-resolver[redis,sqlite,postgres] ``` If you don't need `redis` or `sqlite` support, you may omit one of them in square brackets. If you don't need any of them and you plan to use internal cache without persistence, you should also omit square brackets. @@ -53,10 +53,10 @@ All pip invocations can be run with `--user` option of `pip` installer. In this Run in project directory: ```bash -sudo python3 -m pip install .[redis,sqlite] +sudo python3 -m pip install .[redis,sqlite,postgres] ``` -If you don't need `redis` or `sqlite` support, you may omit one of them in square brackets. If you don't need any of them and you plan to use internal cache without persistence, you should also omit square brackets. +If you don't need `redis`, `sqlite` or `postgres` support, you may omit one of them in square brackets. If you don't need any of them and you plan to use internal cache without persistence, you should also omit square brackets. Package scripts shall be available in standard executable locations upon completion. diff --git a/config_examples/mta-sts-daemon.yml.postgres b/config_examples/mta-sts-daemon.yml.postgres new file mode 100644 index 0000000000000000000000000000000000000000..3f9b1928bc934932d7ffc4489e3312ae33a6bd0d --- /dev/null +++ b/config_examples/mta-sts-daemon.yml.postgres @@ -0,0 +1,15 @@ +host: 127.0.0.1 +port: 8461 +reuse_port: true +shutdown_timeout: 20 +cache: + type: postgres + options: + dsn: postgres://user@%2Frun%2Fpostgresql/user +default_zone: + strict_testing: false + timeout: 4 +zones: + myzone: + strict_testing: false + timeout: 4 diff --git a/postfix_mta_sts_resolver/defaults.py b/postfix_mta_sts_resolver/defaults.py index 0e35e97ca40c3855bc3fc0fcd332fbe5cdc7b8d7..5936214c034fa27a584c64931482f6ecd2c95916 100644 --- a/postfix_mta_sts_resolver/defaults.py +++ b/postfix_mta_sts_resolver/defaults.py @@ -11,6 +11,7 @@ CACHE_BACKEND = "internal" INTERNAL_CACHE_SIZE = 10000 SQLITE_THREADS = cpu_count() SQLITE_TIMEOUT = 5 +POSTGRES_TIMEOUT = 5 REDIS_CONNECT_TIMEOUT = 5 REDIS_TIMEOUT = 5 CACHE_GRACE = 60 diff --git a/postfix_mta_sts_resolver/postgres_cache.py b/postfix_mta_sts_resolver/postgres_cache.py new file mode 100644 index 0000000000000000000000000000000000000000..c70b9d7f18d3520725dae21f8856d427ec178c7f --- /dev/null +++ b/postfix_mta_sts_resolver/postgres_cache.py @@ -0,0 +1,110 @@ +# pylint: disable=invalid-name,protected-access + +import asyncio +import asyncpg +import json +import logging + +from .defaults import POSTGRES_TIMEOUT +from .base_cache import BaseCache, CacheEntry + + +class PostgresCache(BaseCache): + def __init__(self, *, timeout=POSTGRES_TIMEOUT, **kwargs): + self._last_proactive_fetch_ts_id = 1 + asyncpglogger = logging.getLogger("asyncpg") + if not asyncpglogger.hasHandlers(): # pragma: no cover + asyncpglogger.addHandler(logging.NullHandler()) + self._timeout = timeout + self._pool = None + self.kwargs = kwargs + + async def setup(self): + queries = [ + "CREATE TABLE IF NOT EXISTS proactive_fetch_ts " + "(id serial primary key, last_fetch_ts integer)", + "CREATE TABLE IF NOT EXISTS sts_policy_cache " + "(id serial primary key, domain text, ts integer, pol_id text, pol_body jsonb)", + "CREATE UNIQUE INDEX IF NOT EXISTS sts_policy_domain ON sts_policy_cache (domain)", + "CREATE INDEX IF NOT EXISTS sts_policy_domain_ts ON sts_policy_cache (domain, ts)", + ] + + async def set_type_codec(conn): + await conn.set_type_codec( + 'jsonb', + encoder=json.dumps, + decoder=json.loads, + schema='pg_catalog', + ) + + self._pool = await asyncpg.create_pool(init=set_type_codec, **self.kwargs) + async with self._pool.acquire(timeout=self._timeout) as conn: + async with conn.transaction(): + for q in queries: + await conn.execute(q) + + async def get_proactive_fetch_ts(self): + async with self._pool.acquire(timeout=self._timeout) as conn, conn.transaction(): + cur = await conn.cursor('SELECT last_fetch_ts FROM ' + 'proactive_fetch_ts where id = $1', + self._last_proactive_fetch_ts_id) + res = await cur.fetchrow() + return int(res[0]) if res is not None else 0 + + async def set_proactive_fetch_ts(self, timestamp): + async with self._pool.acquire(timeout=self._timeout) as conn, conn.transaction(): + await conn.execute(""" + INSERT INTO proactive_fetch_ts (last_fetch_ts, id) + VALUES ($1, $2) + ON CONFLICT (id) DO UPDATE SET last_fetch_ts = EXCLUDED.last_fetch_ts + """, + int(timestamp), self._last_proactive_fetch_ts_id, + ) + + async def get(self, key): + async with self._pool.acquire(timeout=self._timeout) as conn, conn.transaction(): + cur = await conn.cursor('SELECT ts, pol_id, pol_body FROM ' + 'sts_policy_cache WHERE domain=$1', + key) + res = await cur.fetchrow() + if res is not None: + ts, pol_id, pol_body = res + ts = int(ts) + return CacheEntry(ts, pol_id, pol_body) + else: + return None + + async def set(self, key, value): + ts, pol_id, pol_body = value + async with self._pool.acquire(timeout=self._timeout) as conn, conn.transaction(): + await conn.execute(""" + INSERT INTO sts_policy_cache (domain, ts, pol_id, pol_body) VALUES ($1, $2, $3, $4) + ON CONFLICT (domain) DO UPDATE + SET ts = EXCLUDED.ts, pol_id = EXCLUDED.pol_id, pol_body = EXCLUDED.pol_body + WHERE sts_policy_cache.ts < EXCLUDED.ts + """, key, int(ts), pol_id, pol_body) + + async def scan(self, token, amount_hint): + if token is None: + token = 1 + + async with self._pool.acquire(timeout=self._timeout) as conn, conn.transaction(): + res = await conn.fetch('SELECT id, ts, pol_id, pol_body, domain FROM ' + 'sts_policy_cache WHERE id >= $1 LIMIT $2', + token, amount_hint) + if res: + result = [] + new_token = token + for row in res: + rowid, ts, pol_id, pol_body, domain = row + ts = int(ts) + rowid = int(rowid) + new_token = max(new_token, rowid) + result.append((domain, CacheEntry(ts, pol_id, pol_body))) + new_token += 1 + return new_token, result + else: + return None, [] + + async def teardown(self): + await self._pool.close() diff --git a/postfix_mta_sts_resolver/utils.py b/postfix_mta_sts_resolver/utils.py index 761ea34239be27bc1d3c58fd72ea6acfa31a2b4a..3cc712e95b0f3719119bec43b508e09688e4c9c7 100644 --- a/postfix_mta_sts_resolver/utils.py +++ b/postfix_mta_sts_resolver/utils.py @@ -231,6 +231,10 @@ def create_cache(cache_type, options): # pylint: disable=import-outside-toplevel from . import redis_cache cache = redis_cache.RedisSentinelCache(**options) + elif cache_type == "postgres": + # pylint: disable=import-outside-toplevel + from . import postgres_cache + cache = postgres_cache.PostgresCache(**options) else: raise NotImplementedError("Unsupported cache type!") return cache diff --git a/setup.py b/setup.py index e852cb628814fb7fe2fcfe5ff643fe3115c15545..3c14d49ef48f585be9f6977a1b4d029687e30a4c 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,7 @@ setup(name='postfix_mta_sts_resolver', extras_require={ 'sqlite': 'aiosqlite>=0.10.0', 'redis': 'redis>=4.2.0rc1', + 'postgres': 'asyncpg>=0.27', 'dev': [ 'pytest>=3.0.0', 'pytest-cov', diff --git a/tests/install.debian.sh b/tests/install.debian.sh index b8030454bd6297f243dd8078ca5fef551ca18f5e..e3d33c2ca3a734f9552d352dea0a208c96e05936 100755 --- a/tests/install.debian.sh +++ b/tests/install.debian.sh @@ -7,7 +7,7 @@ PYTHON="${PYTHON:-python3}" # run under travis, but not under autopkgtest if [ -z "${AUTOPKGTEST_TMP+x}" ] ; then apt-get update - apt-get install -y redis-server dnsmasq lsof nginx-extras tinyproxy \ + apt-get install -y redis-server postgresql dnsmasq lsof nginx-extras tinyproxy \ build-essential libssl-dev libffi-dev python3-dev cargo systemctl start redis-server || { journalctl -xe ; false ; } "$PYTHON" -m pip install cryptography @@ -17,7 +17,7 @@ fi install -m 644 tests/resolv.conf /etc/resolv-dnsmasq.conf cat tests/dnsmasq.conf.appendix >> /etc/dnsmasq.conf echo 'nameserver 127.0.0.1' > /etc/resolv.conf -systemctl restart dnsmasq || { journalctl -xe ; false ; } +systemctl restart dnsmasq postgresql || { journalctl -xe ; false ; } # certificates for the test cases diff --git a/tests/test_cache.py b/tests/test_cache.py index 2e165d4638996d4349070e1977e4462347bcb036..e76765f9a7e78cd1b5eaac03e307817d98bb8b23 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -14,6 +14,10 @@ async def setup_cache(cache_type, cache_opts): await cache.setup() if cache_type == 'redis': await cache._pool.flushdb() + if cache_type == 'postgres': + async with cache._pool.acquire() as conn: + await conn.execute('TRUNCATE sts_policy_cache') + await conn.execute('TRUNCATE proactive_fetch_ts') return cache, tmpfile @pytest.mark.parametrize("cache_type,cache_opts,safe_set", [ @@ -23,6 +27,8 @@ async def setup_cache(cache_type, cache_opts): ("sqlite", {}, False), ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, True), ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, False) + ("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, True), + ("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, False), ]) @pytest.mark.asyncio async def test_cache_lifecycle(cache_type, cache_opts, safe_set): @@ -47,6 +53,8 @@ async def test_cache_lifecycle(cache_type, cache_opts, safe_set): ("internal", {}), ("sqlite", {}), ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}), + ("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}), + ("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}), ]) @pytest.mark.asyncio async def test_proactive_fetch_ts_lifecycle(cache_type, cache_opts): @@ -84,6 +92,12 @@ async def test_proactive_fetch_ts_lifecycle(cache_type, cache_opts): ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 3, 4), ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 0, 4), ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, constants.DOMAIN_QUEUE_LIMIT*2, constants.DOMAIN_QUEUE_LIMIT), + ("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, 3, 1), + ("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, 3, 2), + ("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, 3, 3), + ("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, 3, 4), + ("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, 0, 4), + ("postgres", {"dsn": "postgres://postgres@%2Frun%2Fpostgresql/postgres"}, constants.DOMAIN_QUEUE_LIMIT*2, constants.DOMAIN_QUEUE_LIMIT), ]) @pytest.mark.timeout(10) @pytest.mark.asyncio diff --git a/tests/test_utils.py b/tests/test_utils.py index c03d15c4f60a7ffaba427bd5822aa383b798cf04..83bf90349a12f229a82d52453785434454d7a057 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -27,7 +27,7 @@ def test_populate_cfg_defaults(cfg): assert isinstance(res['proactive_policy_fetching']['concurrency_limit'], int) assert isinstance(res['proactive_policy_fetching']['grace_ratio'], (int, float)) assert isinstance(res['cache'], collections.abc.Mapping) - assert res['cache']['type'] in ('redis', 'sqlite', 'internal') + assert res['cache']['type'] in ('redis', 'sqlite', 'postgres', 'internal') assert isinstance(res['default_zone'], collections.abc.Mapping) assert isinstance(res['zones'], collections.abc.Mapping) for zone in list(res['zones'].values()) + [res['default_zone']]: