test-sprocket.py raw
1 #!/usr/bin/env python3
2 """
3 Test sprocket script that processes Nostr events via stdin/stdout JSONL protocol.
4 This script demonstrates various filtering criteria for testing purposes.
5 """
6
7 import json
8 import sys
9 import re
10 from datetime import datetime
11
12 def process_event(event_json):
13 """
14 Process a single event and return the appropriate response.
15
16 Args:
17 event_json (dict): The parsed event JSON
18
19 Returns:
20 dict: Response with id, action, and msg fields
21 """
22 event_id = event_json.get('id', '')
23 event_kind = event_json.get('kind', 0)
24 event_content = event_json.get('content', '')
25 event_pubkey = event_json.get('pubkey', '')
26 event_tags = event_json.get('tags', [])
27
28 # Test criteria 1: Reject events containing "spam" in content
29 if 'spam' in event_content.lower():
30 return {
31 'id': event_id,
32 'action': 'reject',
33 'msg': 'Content contains spam'
34 }
35
36 # Test criteria 2: Shadow reject events with kind 9999 (test kind)
37 if event_kind == 9999:
38 return {
39 'id': event_id,
40 'action': 'shadowReject',
41 'msg': ''
42 }
43
44 # Test criteria 3: Reject events with certain hashtags
45 for tag in event_tags:
46 if len(tag) >= 2 and tag[0] == 't': # hashtag
47 hashtag = tag[1].lower()
48 if hashtag in ['blocked', 'rejected', 'test-block']:
49 return {
50 'id': event_id,
51 'action': 'reject',
52 'msg': f'Hashtag "{hashtag}" is not allowed'
53 }
54
55 # Test criteria 4: Shadow reject events from specific pubkeys (first 8 chars)
56 blocked_prefixes = ['00000000', '11111111', '22222222'] # Test prefixes
57 pubkey_prefix = event_pubkey[:8] if len(event_pubkey) >= 8 else event_pubkey
58 if pubkey_prefix in blocked_prefixes:
59 return {
60 'id': event_id,
61 'action': 'shadowReject',
62 'msg': ''
63 }
64
65 # Test criteria 5: Reject events that are too long
66 if len(event_content) > 1000:
67 return {
68 'id': event_id,
69 'action': 'reject',
70 'msg': 'Content too long (max 1000 characters)'
71 }
72
73 # Test criteria 6: Reject events with invalid timestamps (too old or too new)
74 try:
75 event_time = event_json.get('created_at', 0)
76 current_time = int(datetime.now().timestamp())
77
78 # Reject events more than 1 hour old
79 if current_time - event_time > 3600:
80 return {
81 'id': event_id,
82 'action': 'reject',
83 'msg': 'Event timestamp too old'
84 }
85
86 # Reject events more than 5 minutes in the future
87 if event_time - current_time > 300:
88 return {
89 'id': event_id,
90 'action': 'reject',
91 'msg': 'Event timestamp too far in future'
92 }
93 except (ValueError, TypeError):
94 pass # Ignore timestamp errors
95
96 # Default: accept the event
97 return {
98 'id': event_id,
99 'action': 'accept',
100 'msg': ''
101 }
102
103 def main():
104 """Main function to process events from stdin."""
105 try:
106 # Read events from stdin
107 for line in sys.stdin:
108 line = line.strip()
109 if not line:
110 continue
111
112 try:
113 # Parse the event JSON
114 event = json.loads(line)
115
116 # Process the event
117 response = process_event(event)
118
119 # Output the response as JSONL
120 print(json.dumps(response), flush=True)
121
122 except json.JSONDecodeError as e:
123 # Log error to stderr but continue processing
124 print(f"Error parsing JSON: {e}", file=sys.stderr)
125 continue
126 except Exception as e:
127 # Log error to stderr but continue processing
128 print(f"Error processing event: {e}", file=sys.stderr)
129 continue
130
131 except KeyboardInterrupt:
132 # Graceful shutdown
133 sys.exit(0)
134 except Exception as e:
135 print(f"Fatal error: {e}", file=sys.stderr)
136 sys.exit(1)
137
138 if __name__ == '__main__':
139 main()
140