Exploring the Bluesky firehose
Programmatically observing public activity on Bluesky in real time is relatively straightforward
If you want to monitor various types of public activity on social media platform Bluesky in real time, you can do this via the Bluesky firehose. The firehose is not merely a chronological history of all posts on Bluesky; it is a data stream of all public actions on the platform.This data stream includes actions such as post creation and deletion, likes, follows, unfollows, handle changes, and more, all with timestamps. In this article, we’ll look at a couple of example uses of the firehose: analysis of Bluesky posts containing external links, and tracking handle changes. The provided Python code, which uses the atproto module by MarshalX, should be easy to adapt to other use cases as desired.
import atproto
import atproto.firehose as hose
from atproto.firehose.models import MessageFrame
from atproto.xrpc_client.models import get_or_create
import json
import time
def retry (method, params):
retries = 5
delay = 1
while retries > 0:
try:
r = method (params)
return r
except:
print (" error, sleeping " + str (delay) + "s")
time.sleep (delay)
delay = delay * 2
retries = retries - 1
return None
def get_profiles (actors, client):
profiles = []
while len (actors) > 0:
if len (actors) > 25:
batch = actors[:25]
actors = actors[25:]
else:
batch = actors
actors = []
r = retry (client.app.bsky.actor.get_profiles,
{"actors" : batch})
profiles.extend (r.profiles)
return profiles
def on_message (message, test_function, handler):
message = hose.parse_subscribe_repos_message (message)
if isinstance (message,
atproto.models.ComAtprotoSyncSubscribeRepos.Commit):
blocks = atproto.CAR.from_bytes (message.blocks).blocks
for op in message.ops:
uri = atproto.AtUri.from_str ("at://" + message.repo \
+ "/" + op.path)
raw = blocks.get (op.cid)
if raw:
record = get_or_create (raw, strict=False)
if record.py_type is not None:
rdict = record.model_dump ()
item = {
"repo" : message.repo,
"revision" : message.rev,
"sequence" : message.seq,
"timestamp" : message.time,
"action" : op.action,
"cid" : str (op.cid),
"path" : op.path,
"collection" : uri.collection,
"record" : rdict
}
if test_function (item):
handler (item)
def has_link (item):
urls = set ()
if item["collection"] == "app.bsky.feed.post" and \
item["action"] == "create":
record = item["record"]
if "embed" in record and record["embed"] is not None:
embed = record["embed"]
if "external" in embed and embed["external"] is not None:
external = embed["external"]
if "uri" in external and \
external["uri"].startswith ("http"):
result = True
urls.add (external["uri"])
if "facets" in record and record["facets"] is not None:
for facet in record["facets"]:
if "features" in facet and \
facet["features"] is not None:
for feature in facet["features"]:
if "uri" in feature:
if feature["uri"].startswith ("http"):
urls.add (feature["uri"])
if len (urls) == 0:
return False
item["urls"] = list (urls)
return True
def dump_to_folder (item, queue, path, client, batch_size=50):
if not path.endswith ("/"):
path = path + "/"
queue.append (item)
if len (queue) >= batch_size:
dids = set ()
for item in queue:
dids.add (item["repo"])
profiles = {}
for profile in get_profiles (list (dids), client):
profiles[profile["did"]] = profile.model_dump ()
for item in queue:
try:
item["user_profile"] = profiles[item["repo"]]
except:
print ("no profile: " + item["repo"])
file = path + str (int (time.time () * 1000000)) + ".json"
with open (file, "w") as f:
json.dump (queue, f)
print (str (len (queue)) + " items written to " + file)
queue.clear ()
def monitor_bsky_firehose (test_function, handler):
firehose = hose.FirehoseSubscribeReposClient ()
firehose.start (lambda message: on_message (message,
test_function, handler))
client = atproto.Client ()
client.login ("******", "******")
path = "bsky_hose_json/"
queue = []
monitor_bsky_firehose (lambda item: has_link (item),
lambda item: dump_to_folder (item, queue, path, client))
To begin, we’ll take a look at what external websites are being shared on Bluesky by monitoring the firehose for the creation of new posts containing links. The firehose consists of a stream of messages of different types, the most common of which is a commit message. Commit messages represent actions that create, delete, or update a data item in a collection (i.e., posts, likes, follow relationships, etc). For the purpose of studying external websites linked on BlueSky, we are interested in create
actions related to the collection of posts (the app.bsky.feed.post
collection). The has_link
function in the above code checks the posts for external links as they show up in the firehose. (Since links can appear under either the embed
or the facets
attribute of the post, we need to check both.) The dump_to_folder
function stores the posts as JSON in text files in batches of 50, but the posts could just as easily be piped to a database or other storage mechanism.
Monitoring the Bluesky firehose for posts containing links for 48 hours (September 15th-17th, starting and ending at midnight CDT) yielded 33916 posts from 13108 unique accounts. A variety of social media websites such as YouTube, X/Twitter, TikTok, Instagram, and Reddit turn up prominently, as do blogging platforms Wordpress and Substack. The most frequently linked mainstream news site (and the only news site in the top 20) is the Washington Post, which is tied with Substack at 144 links during the period studied.
The most frequently linked domain is X/Twitter’s URL shortener, t.co, which was linked 7509 times during the period in question. Almost all of these links are from a single automated account, news-feed.bsky.social, which aggregates content from popular news accounts on X/Twitter and posts it on Bluesky. This account is quite literally a bot (the word “bot” means “automated account” in the context of social media), and serves as a reminder that, contrary to popular usage of the term, not all bots are nefarious in nature. (That being said, Bluesky has already played host to at least one spambot network.)
def retry (method, params):
retries = 5
delay = 1
while retries > 0:
try:
r = method (params)
return r
except:
print (" error, sleeping " + str (delay) + "s")
time.sleep (delay)
delay = delay * 2
retries = retries - 1
return None
def get_profiles (actors, client):
profiles = []
while len (actors) > 0:
if len (actors) > 25:
batch = actors[:25]
actors = actors[25:]
else:
batch = actors
actors = []
r = retry (client.app.bsky.actor.get_profiles,
{"actors" : batch})
profiles.extend (r.profiles)
return profiles
def on_message (message, test_function, handler):
message = hose.parse_subscribe_repos_message (message)
if isinstance (message,
atproto.models.ComAtprotoSyncSubscribeRepos.Commit):
blocks = atproto.CAR.from_bytes (message.blocks).blocks
for op in message.ops:
uri = atproto.AtUri.from_str ("at://" + message.repo \
+ "/" + op.path)
raw = blocks.get (op.cid)
if raw:
record = get_or_create (raw, strict=False)
if record.py_type is not None:
rdict = record.model_dump ()
item = {
"repo" : message.repo,
"revision" : message.rev,
"sequence" : message.seq,
"timestamp" : message.time,
"action" : op.action,
"cid" : str (op.cid),
"path" : op.path,
"collection" : uri.collection,
"record" : rdict,
"type" : "commit"
}
if test_function (item):
handler (item)
elif isinstance (message,
atproto.models.ComAtprotoSyncSubscribeRepos.Handle):
item = {
"repo" : message.did,
"sequence" : message.seq,
"timestamp" : message.time,
"handle" : message.handle,
"type" : "handle"
}
if test_function (item):
handler (item)
def is_handle_change (item):
return item["type"] == "handle"
def dump_to_folder (item, queue, path, client, batch_size=50):
if not path.endswith ("/"):
path = path + "/"
queue.append (item)
if len (queue) >= batch_size:
dids = set ()
for item in queue:
dids.add (item["repo"])
profiles = {}
for profile in get_profiles (list (dids), client):
profiles[profile["did"]] = profile.model_dump ()
for item in queue:
try:
item["user_profile"] = profiles[item["repo"]]
except:
print ("no profile: " + item["repo"])
file = path + str (int (time.time () * 1000000)) + ".json"
with open (file, "w") as f:
json.dump (queue, f)
print (str (len (queue)) + " items written to " + file)
queue.clear ()
def monitor_bsky_firehose (test_function, handler):
firehose = hose.FirehoseSubscribeReposClient ()
firehose.start (lambda message: on_message (message,
test_function, handler))
path = "bsky_hose_handle_changes/"
queue = []
monitor_bsky_firehose (lambda item: is_handle_change (item),
lambda item: dump_to_folder (item, queue, path,
client, batch_size=10))
What if we want to watch for handle changes? This requires monitoring the Bluesky firehose for an additional message type, the handle message. The structure of this message is simpler than the commit message, and contains the new handle, the immutable account ID (DID), the time at which the handle was changed, and a sequence number. (Note that this method also appears to detect the initial handle selection at time of account creation.) The firehose provides a more reliable mechanism for checking for handle changes than is present on most other platforms, which can be quite valuable when researching the behavior of a suspicious account.
The utility of the Bluesky firehose for research purposes does come with some caveats for users. The ability to easily monitor handle changes means that changing your handle is of zero value in terms of protecting your identity. (To be fair, this is also true of other social media platforms that allow handle changes, but Bluesky makes tracking handle changes particularly easy.) Similarly, deleting posts on Bluesky doesn’t remove them from the public internet even if no one has screenshotted or archived them, since both the create
and delete
messages for the deleted posts still exist. This is worth keeping in mind when using social media platforms built on open protocols; although sites like Bluesky may look similar to traditional platforms such as X/Twitter or Facebook, the underlying architecture is different and that changes some user expectations, including the notion that public content can be permanently deleted or changed.