Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
GitHub Mirror
mastodon
Mastodon
Commits
0ba524b1
Commit
0ba524b1
authored
May 17, 2022
by
Yamagishi Kazutoshi
Committed by
GitHub
May 17, 2022
Browse files
Use ESM with Streaming API Server
parent
d25015fc
Changes
5
Hide whitespace changes
Inline
Side-by-side
streaming/constants.mjs
0 → 100644
View file @
0ba524b1
// @ts-check
import
*
as
os
from
'
node:os
'
;
/** @type {boolean} */
export
const
alwaysRequireAuth
=
process
.
env
.
LIMITED_FEDERATION_MODE
===
'
true
'
||
process
.
env
.
WHITELIST_MODE
===
'
true
'
||
process
.
env
.
AUTHORIZED_FETCH
===
'
true
'
;
/** @type {string} */
export
const
env
=
process
.
env
.
NODE_ENV
||
'
development
'
;
/** @type {string} */
export
const
logLevel
=
process
.
env
.
LOG_LEVEL
||
'
verbose
'
;
/** @type {number} */
export
const
numWorkers
=
+
process
.
env
.
STREAMING_CLUSTER_NUM
||
(
env
===
'
development
'
?
1
:
Math
.
max
(
os
.
cpus
().
length
-
1
,
1
));
/** @type {string} */
export
const
redisNamespace
=
process
.
env
.
REDIS_NAMESPACE
||
null
;
export
const
trustedProxyIp
=
process
.
env
.
TRUSTED_PROXY_IP
?
process
.
env
.
TRUSTED_PROXY_IP
.
split
(
/
(?:\s
*,
\s
*|
\s
+
)
/
)
:
'
loopback,uniquelocal
'
;
streaming/index.js
→
streaming/index.
m
js
View file @
0ba524b1
// @ts-check
const
os
=
require
(
'
os
'
);
const
throng
=
require
(
'
throng
'
);
const
dotenv
=
require
(
'
dotenv
'
);
const
express
=
require
(
'
express
'
);
const
http
=
require
(
'
http
'
);
const
redis
=
require
(
'
redis
'
);
const
pg
=
require
(
'
pg
'
);
const
log
=
require
(
'
npmlog
'
);
const
url
=
require
(
'
url
'
);
const
uuid
=
require
(
'
uuid
'
);
const
fs
=
require
(
'
fs
'
);
const
WebSocket
=
require
(
'
ws
'
);
const
env
=
process
.
env
.
NODE_ENV
||
'
development
'
;
const
alwaysRequireAuth
=
process
.
env
.
LIMITED_FEDERATION_MODE
===
'
true
'
||
process
.
env
.
WHITELIST_MODE
===
'
true
'
||
process
.
env
.
AUTHORIZED_FETCH
===
'
true
'
;
import
*
as
http
from
'
node:http
'
;
import
*
as
url
from
'
node:url
'
;
import
*
as
dotenv
from
'
dotenv
'
;
import
express
from
'
express
'
;
import
log
from
'
npmlog
'
;
import
*
as
pg
from
'
pg
'
;
import
throng
from
'
throng
'
;
import
*
as
uuid
from
'
uuid
'
;
import
WebSocket
from
'
ws
'
;
import
{
alwaysRequireAuth
,
env
,
logLevel
,
numWorkers
,
redisNamespace
,
trustedProxyIp
,
}
from
'
./constants.mjs
'
;
import
{
allowCrossDomain
,
authenticationMiddleware
,
errorMiddleware
,
setRemoteAddress
,
setRequestId
,
}
from
'
./middlewares.mjs
'
;
import
Subscriber
from
'
./subscriber.mjs
'
;
import
{
attachServerWithConfig
,
checkScopes
,
dbUrlToConfig
,
firstParam
,
httpNotFound
,
isInScope
,
isTruthy
,
onPortAvailable
,
parseJSON
,
placeholders
,
redisUrlToClient
,
}
from
'
./utils.mjs
'
;
dotenv
.
config
({
path
:
env
===
'
production
'
?
'
.env.production
'
:
'
.env
'
,
});
log
.
level
=
process
.
env
.
LOG_LEVEL
||
'
verbose
'
;
/**
* @param {string} dbUrl
* @return {Object.<string, any>}
*/
const
dbUrlToConfig
=
(
dbUrl
)
=>
{
if
(
!
dbUrl
)
{
return
{};
}
const
params
=
url
.
parse
(
dbUrl
,
true
);
const
config
=
{};
if
(
params
.
auth
)
{
[
config
.
user
,
config
.
password
]
=
params
.
auth
.
split
(
'
:
'
);
}
if
(
params
.
hostname
)
{
config
.
host
=
params
.
hostname
;
}
if
(
params
.
port
)
{
config
.
port
=
params
.
port
;
}
if
(
params
.
pathname
)
{
config
.
database
=
params
.
pathname
.
split
(
'
/
'
)[
1
];
}
const
ssl
=
params
.
query
&&
params
.
query
.
ssl
;
if
(
ssl
&&
ssl
===
'
true
'
||
ssl
===
'
1
'
)
{
config
.
ssl
=
true
;
}
return
config
;
};
/**
* @param {Object.<string, any>} defaultConfig
* @param {string} redisUrl
*/
const
redisUrlToClient
=
async
(
defaultConfig
,
redisUrl
)
=>
{
const
config
=
defaultConfig
;
let
client
;
if
(
!
redisUrl
)
{
client
=
redis
.
createClient
(
config
);
}
else
if
(
redisUrl
.
startsWith
(
'
unix://
'
))
{
client
=
redis
.
createClient
(
Object
.
assign
(
config
,
{
socket
:
{
path
:
redisUrl
.
slice
(
7
),
},
}));
}
else
{
client
=
redis
.
createClient
(
Object
.
assign
(
config
,
{
url
:
redisUrl
,
}));
}
client
.
on
(
'
error
'
,
(
err
)
=>
log
.
error
(
'
Redis Client Error!
'
,
err
));
await
client
.
connect
();
return
client
;
};
const
numWorkers
=
+
process
.
env
.
STREAMING_CLUSTER_NUM
||
(
env
===
'
development
'
?
1
:
Math
.
max
(
os
.
cpus
().
length
-
1
,
1
));
/**
* @param {string} json
* @param {any} req
* @return {Object.<string, any>|null}
*/
const
parseJSON
=
(
json
,
req
)
=>
{
try
{
return
JSON
.
parse
(
json
);
}
catch
(
err
)
{
if
(
req
.
accountId
)
{
log
.
warn
(
req
.
requestId
,
`Error parsing message from user
${
req
.
accountId
}
:
${
err
}
`
);
}
else
{
log
.
silly
(
req
.
requestId
,
`Error parsing message from
${
req
.
remoteAddress
}
:
${
err
}
`
);
}
return
null
;
}
};
log
.
level
=
logLevel
;
const
startMaster
=
()
=>
{
if
(
!
process
.
env
.
SOCKET
&&
process
.
env
.
PORT
&&
isNaN
(
+
process
.
env
.
PORT
))
{
...
...
@@ -119,13 +58,14 @@ const startMaster = () => {
const
startWorker
=
async
(
workerId
)
=>
{
log
.
warn
(
`Starting worker
${
workerId
}
`
);
/** @type {Record<string, pg.PoolConfig>} */
const
pgConfigs
=
{
development
:
{
user
:
process
.
env
.
DB_USER
||
pg
.
defaults
.
user
,
password
:
process
.
env
.
DB_PASS
||
pg
.
defaults
.
password
,
database
:
process
.
env
.
DB_NAME
||
'
mastodon_development
'
,
host
:
process
.
env
.
DB_HOST
||
pg
.
defaults
.
host
,
port
:
process
.
env
.
DB_PORT
||
pg
.
defaults
.
port
,
port
:
process
.
env
.
DB_PORT
?
+
process
.
env
.
DB_PORT
:
pg
.
defaults
.
port
,
max
:
10
,
},
...
...
@@ -134,7 +74,7 @@ const startWorker = async (workerId) => {
password
:
process
.
env
.
DB_PASS
||
''
,
database
:
process
.
env
.
DB_NAME
||
'
mastodon_production
'
,
host
:
process
.
env
.
DB_HOST
||
'
localhost
'
,
port
:
process
.
env
.
DB_PORT
||
5432
,
port
:
process
.
env
.
DB_PORT
?
+
process
.
env
.
DB_PORT
:
5432
,
max
:
10
,
},
};
...
...
@@ -146,34 +86,31 @@ const startWorker = async (workerId) => {
const
app
=
express
();
app
.
set
(
'
trust proxy
'
,
process
.
env
.
TRUSTED_PROXY_IP
?
process
.
env
.
TRUSTED_PROXY_IP
.
split
(
/
(?:\s
*,
\s
*|
\s
+
)
/
)
:
'
loopback,uniquelocal
'
);
app
.
set
(
'
trust proxy
'
,
trustedProxyIp
);
const
pgPool
=
new
pg
.
Pool
(
Object
.
assign
(
pgConfigs
[
env
],
dbUrlToConfig
(
process
.
env
.
DATABASE_URL
)));
const
pgPool
=
new
pg
.
Pool
({
...
pgConfigs
[
env
],
...
dbUrlToConfig
(
process
.
env
.
DATABASE_URL
),
});
const
server
=
http
.
createServer
(
app
);
const
redisNamespace
=
process
.
env
.
REDIS_NAMESPACE
||
null
;
/** @type {import('redis').RedisClientOptions} */
const
redisParams
=
{
socket
:
{
host
:
process
.
env
.
REDIS_HOST
||
'
127.0.0.1
'
,
port
:
process
.
env
.
REDIS_PORT
||
6379
,
port
:
process
.
env
.
REDIS_PORT
?
+
process
.
env
.
REDIS_PORT
:
6379
,
},
database
:
process
.
env
.
REDIS_DB
||
0
,
database
:
process
.
env
.
REDIS_DB
?
+
process
.
env
.
REDIS_DB
:
0
,
password
:
process
.
env
.
REDIS_PASSWORD
||
undefined
,
};
if
(
redisNamespace
)
{
redisParams
.
namespace
=
redisNamespace
;
}
const
redisPrefix
=
redisNamespace
?
`
${
redisNamespace
}
:`
:
''
;
/**
* @type {Object.<string, Array.<function(string): void>>}
*/
const
subs
=
{};
const
redisSubscribeClient
=
await
redisUrlToClient
(
redisParams
,
process
.
env
.
REDIS_URL
);
const
redisClient
=
await
redisUrlToClient
(
redisParams
,
process
.
env
.
REDIS_URL
);
const
subscriber
=
new
Subscriber
({
redisClient
:
redisSubscribeClient
,
});
/**
* @param {string[]} channels
...
...
@@ -195,121 +132,6 @@ const startWorker = async (workerId) => {
};
};
/**
* @param {string} message
* @param {string} channel
*/
const
onRedisMessage
=
(
message
,
channel
)
=>
{
const
callbacks
=
subs
[
channel
];
log
.
silly
(
`New message on channel
${
channel
}
`
);
if
(
!
callbacks
)
{
return
;
}
callbacks
.
forEach
(
callback
=>
callback
(
message
));
};
/**
* @param {string} channel
* @param {function(string): void} callback
*/
const
subscribe
=
(
channel
,
callback
)
=>
{
log
.
silly
(
`Adding listener for
${
channel
}
`
);
subs
[
channel
]
=
subs
[
channel
]
||
[];
if
(
subs
[
channel
].
length
===
0
)
{
log
.
verbose
(
`Subscribe
${
channel
}
`
);
redisSubscribeClient
.
subscribe
(
channel
,
onRedisMessage
);
}
subs
[
channel
].
push
(
callback
);
};
/**
* @param {string} channel
*/
const
unsubscribe
=
(
channel
,
callback
)
=>
{
log
.
silly
(
`Removing listener for
${
channel
}
`
);
if
(
!
subs
[
channel
])
{
return
;
}
subs
[
channel
]
=
subs
[
channel
].
filter
(
item
=>
item
!==
callback
);
if
(
subs
[
channel
].
length
===
0
)
{
log
.
verbose
(
`Unsubscribe
${
channel
}
`
);
redisSubscribeClient
.
unsubscribe
(
channel
);
delete
subs
[
channel
];
}
};
const
FALSE_VALUES
=
[
false
,
0
,
'
0
'
,
'
f
'
,
'
F
'
,
'
false
'
,
'
FALSE
'
,
'
off
'
,
'
OFF
'
,
];
/**
* @param {any} value
* @return {boolean}
*/
const
isTruthy
=
value
=>
value
&&
!
FALSE_VALUES
.
includes
(
value
);
/**
* @param {any} req
* @param {any} res
* @param {function(Error=): void}
*/
const
allowCrossDomain
=
(
req
,
res
,
next
)
=>
{
res
.
header
(
'
Access-Control-Allow-Origin
'
,
'
*
'
);
res
.
header
(
'
Access-Control-Allow-Headers
'
,
'
Authorization, Accept, Cache-Control
'
);
res
.
header
(
'
Access-Control-Allow-Methods
'
,
'
GET, OPTIONS
'
);
next
();
};
/**
* @param {any} req
* @param {any} res
* @param {function(Error=): void}
*/
const
setRequestId
=
(
req
,
res
,
next
)
=>
{
req
.
requestId
=
uuid
.
v4
();
res
.
header
(
'
X-Request-Id
'
,
req
.
requestId
);
next
();
};
/**
* @param {any} req
* @param {any} res
* @param {function(Error=): void}
*/
const
setRemoteAddress
=
(
req
,
res
,
next
)
=>
{
req
.
remoteAddress
=
req
.
connection
.
remoteAddress
;
next
();
};
/**
* @param {any} req
* @param {string[]} necessaryScopes
* @return {boolean}
*/
const
isInScope
=
(
req
,
necessaryScopes
)
=>
req
.
scopes
.
some
(
scope
=>
necessaryScopes
.
includes
(
scope
));
/**
* @param {string} token
* @param {any} req
...
...
@@ -409,58 +231,6 @@ const startWorker = async (workerId) => {
}
};
const
PUBLIC_CHANNELS
=
[
'
public
'
,
'
public:media
'
,
'
public:local
'
,
'
public:local:media
'
,
'
public:remote
'
,
'
public:remote:media
'
,
'
hashtag
'
,
'
hashtag:local
'
,
];
/**
* @param {any} req
* @param {string} channelName
* @return {Promise.<void>}
*/
const
checkScopes
=
(
req
,
channelName
)
=>
new
Promise
((
resolve
,
reject
)
=>
{
log
.
silly
(
req
.
requestId
,
`Checking OAuth scopes for
${
channelName
}
`
);
// When accessing public channels, no scopes are needed
if
(
PUBLIC_CHANNELS
.
includes
(
channelName
))
{
resolve
();
return
;
}
// The `read` scope has the highest priority, if the token has it
// then it can access all streams
const
requiredScopes
=
[
'
read
'
];
// When accessing specifically the notifications stream,
// we need a read:notifications, while in all other cases,
// we can allow access with read:statuses. Mind that the
// user stream will not contain notifications unless
// the token has either read or read:notifications scope
// as well, this is handled separately.
if
(
channelName
===
'
user:notification
'
)
{
requiredScopes
.
push
(
'
read:notifications
'
);
}
else
{
requiredScopes
.
push
(
'
read:statuses
'
);
}
if
(
req
.
scopes
&&
requiredScopes
.
some
(
requiredScope
=>
req
.
scopes
.
includes
(
requiredScope
)))
{
resolve
();
return
;
}
const
err
=
new
Error
(
'
Access token does not cover required scopes
'
);
err
.
status
=
401
;
reject
(
err
);
});
/**
* @param {any} info
* @param {function(boolean, number, string): void} callback
...
...
@@ -507,73 +277,6 @@ const startWorker = async (workerId) => {
};
};
/**
* @param {any} req
* @param {any} res
*/
const
subscribeHttpToSystemChannel
=
(
req
,
res
)
=>
{
const
systemChannelId
=
`timeline:access_token:
${
req
.
accessTokenId
}
`
;
const
listener
=
createSystemMessageListener
(
req
,
{
onKill
()
{
res
.
end
();
},
});
res
.
on
(
'
close
'
,
()
=>
{
unsubscribe
(
`
${
redisPrefix
}${
systemChannelId
}
`
,
listener
);
});
subscribe
(
`
${
redisPrefix
}${
systemChannelId
}
`
,
listener
);
};
/**
* @param {any} req
* @param {any} res
* @param {function(Error=): void} next
*/
const
authenticationMiddleware
=
(
req
,
res
,
next
)
=>
{
if
(
req
.
method
===
'
OPTIONS
'
)
{
next
();
return
;
}
accountFromRequest
(
req
,
alwaysRequireAuth
).
then
(()
=>
checkScopes
(
req
,
channelNameFromPath
(
req
))).
then
(()
=>
{
subscribeHttpToSystemChannel
(
req
,
res
);
}).
then
(()
=>
{
next
();
}).
catch
(
err
=>
{
next
(
err
);
});
};
/**
* @param {Error} err
* @param {any} req
* @param {any} res
* @param {function(Error=): void} next
*/
const
errorMiddleware
=
(
err
,
req
,
res
,
next
)
=>
{
log
.
error
(
req
.
requestId
,
err
.
toString
());
if
(
res
.
headersSent
)
{
next
(
err
);
return
;
}
res
.
writeHead
(
err
.
status
||
500
,
{
'
Content-Type
'
:
'
application/json
'
});
res
.
end
(
JSON
.
stringify
({
error
:
err
.
status
?
err
.
toString
()
:
'
An unexpected error occurred
'
}));
};
/**
* @param {array} arr
* @param {number=} shift
* @return {string}
*/
const
placeholders
=
(
arr
,
shift
=
0
)
=>
arr
.
map
((
_
,
i
)
=>
`$
${
i
+
1
+
shift
}
`
).
join
(
'
,
'
);
/**
* @param {string} listId
* @param {any} req
...
...
@@ -690,7 +393,7 @@ const startWorker = async (workerId) => {
};
ids
.
forEach
(
id
=>
{
subscribe
(
`
${
redisPrefix
}${
id
}
`
,
listener
);
subscribe
r
.
register
(
`
${
redisPrefix
}${
id
}
`
,
listener
);
});
if
(
attachCloseHandler
)
{
...
...
@@ -735,7 +438,7 @@ const startWorker = async (workerId) => {
const
streamHttpEnd
=
(
req
,
closeHandler
=
undefined
)
=>
(
ids
)
=>
{
req
.
on
(
'
close
'
,
()
=>
{
ids
.
forEach
(
id
=>
{
un
subscribe
(
id
);
subscribe
r
.
unregister
(
id
);
});
if
(
closeHandler
)
{
...
...
@@ -759,14 +462,6 @@ const startWorker = async (workerId) => {
ws
.
send
(
JSON
.
stringify
({
stream
:
streamName
,
event
,
payload
}));
};
/**
* @param {any} res
*/
const
httpNotFound
=
res
=>
{
res
.
writeHead
(
404
,
{
'
Content-Type
'
:
'
application/json
'
});
res
.
end
(
JSON
.
stringify
({
error
:
'
Not found
'
}));
};
app
.
use
(
setRequestId
);
app
.
use
(
setRemoteAddress
);
app
.
use
(
allowCrossDomain
);
...
...
@@ -780,15 +475,16 @@ const startWorker = async (workerId) => {
app
.
use
(
errorMiddleware
);
app
.
get
(
'
/api/v1/streaming/*
'
,
(
req
,
res
)
=>
{
channelNameToIds
(
req
,
channelNameFromPath
(
req
),
req
.
query
).
then
(({
channelIds
,
options
})
=>
{
const
onSend
=
streamToHttp
(
req
,
res
);
const
onEnd
=
streamHttpEnd
(
req
,
subscriptionHeartbeat
(
channelIds
));
streamFrom
(
channelIds
,
req
,
onSend
,
onEnd
,
options
.
needsFiltering
);
}).
catch
(
err
=>
{
log
.
verbose
(
req
.
requestId
,
'
Subscription error:
'
,
err
.
toString
());
httpNotFound
(
res
);
});
channelNameToIds
(
req
,
channelNameFromPath
(
req
),
req
.
query
)
.
then
(({
channelIds
,
options
})
=>
{
const
onSend
=
streamToHttp
(
req
,
res
);
const
onEnd
=
streamHttpEnd
(
req
,
subscriptionHeartbeat
(
channelIds
));
streamFrom
(
channelIds
,
req
,
onSend
,
onEnd
,
options
.
needsFiltering
);
}).
catch
(
err
=>
{
log
.
verbose
(
req
.
requestId
,
'
Subscription error:
'
,
err
.
toString
());
httpNotFound
(
res
);
});
});
const
wss
=
new
WebSocket
.
Server
({
server
,
verifyClient
:
wsVerifyClient
});
...
...
@@ -994,7 +690,7 @@ const startWorker = async (workerId) => {
const
{
listener
,
stopHeartbeat
}
=
subscription
;
channelIds
.
forEach
(
channelId
=>
{
un
subscribe
(
`
${
redisPrefix
}${
channelId
}
`
,
listener
);
subscribe
r
.
unregister
(
`
${
redisPrefix
}${
channelId
}
`
,
listener
);
});
stopHeartbeat
();
...
...
@@ -1019,7 +715,7 @@ const startWorker = async (workerId) => {
});
subscribe
(
`
${
redisPrefix
}${
systemChannelId
}
`
,
listener
);
subscribe
r
.
register
(
`
${
redisPrefix
}${
systemChannelId
}
`
,
listener
);
subscriptions
[
systemChannelId
]
=
{
listener
,
...
...
@@ -1028,18 +724,6 @@ const startWorker = async (workerId) => {
};
};
/**
* @param {string|string[]} arrayOrString
* @return {string}
*/
const
firstParam
=
arrayOrString
=>
{
if
(
Array
.
isArray
(
arrayOrString
))
{
return
arrayOrString
[
0
];
}
else
{
return
arrayOrString
;
}
};
wss
.
on
(
'
connection
'
,
(
ws
,
req
)
=>
{
const
location
=
url
.
parse
(
req
.
url
,
true
);
...
...
@@ -1068,7 +752,7 @@ const startWorker = async (workerId) => {
const
{
listener
,
stopHeartbeat
}
=
session
.
subscriptions
[
channelIds
];
channelIds
.
split
(
'
;
'
).
forEach
(
channelId
=>
{
un
subscribe
(
`
${
redisPrefix
}${
channelId
}
`
,
listener
);
subscribe
r
.
unregister
(
`
${
redisPrefix
}${
channelId
}
`
,
listener
);
});
stopHeartbeat
();
...
...
@@ -1135,45 +819,6 @@ const startWorker = async (workerId) => {
process
.
on
(
'
uncaughtException
'
,
onError
);
};
/**
* @param {any} server
* @param {function(string): void} [onSuccess]
*/
const
attachServerWithConfig
=
(
server
,
onSuccess
)
=>
{
if
(
process
.
env
.
SOCKET
||
process
.
env
.
PORT
&&
isNaN
(
+
process
.
env
.
PORT
))
{
server
.
listen
(
process
.
env
.
SOCKET
||
process
.
env
.
PORT
,
()
=>
{
if
(
onSuccess
)
{
fs
.
chmodSync
(
server
.
address
(),
0o666
);
onSuccess
(
server
.
address
());
}