main.mx raw
1 package main
2
3 import (
4 "fmt"
5 "net"
6 "os"
7 "time"
8
9 "git.smesh.lol/iskra"
10 "git.smesh.lol/iskradb/lattice"
11 "moxie"
12 )
13
14 const (
15 defaultTCP = ":4847"
16 defaultUnix = "/tmp/iskra.sock"
17 flushEvery = 64
18 )
19
20 func main() {
21 if len(os.Args) < 2 {
22 fmt.Fprintf(os.Stderr(), "usage: iskra-server <lattice-path> [tcp-addr] [unix-path]\n")
23 os.Exit(1)
24 }
25 path := os.Args[1]
26 tcpAddr := defaultTCP
27 unixPath := defaultUnix
28 if len(os.Args) > 2 {
29 tcpAddr = os.Args[2]
30 }
31 if len(os.Args) > 3 {
32 unixPath = os.Args[3]
33 }
34
35 tree, err := loadTree(path)
36 if err != nil {
37 fmt.Fprintf(os.Stderr(), "load: %v\n", err)
38 os.Exit(1)
39 }
40 fmt.Fprintf(os.Stderr(), "loaded: %d records, %d bigram entries\n",
41 tree.EntryCount(), len(tree.BigramIdx))
42
43 // Spawn network domains - one per listener.
44 // Each handles connections sequentially (one active at a time per listener).
45 // Channels carry raw frame bytes across the spawn boundary.
46 tcpReq := chan moxie.Bytes{}
47 tcpResp := chan moxie.Bytes{}
48 unixReq := chan moxie.Bytes{}
49 unixResp := chan moxie.Bytes{}
50
51 spawn(netDomain, moxie.Bytes([]byte(tcpAddr)), moxie.Bytes([]byte("tcp")), tcpReq, tcpResp)
52 spawn(netDomain, moxie.Bytes([]byte(unixPath)), moxie.Bytes([]byte("unix")), unixReq, unixResp)
53
54 fmt.Fprintf(os.Stderr(), "listening: tcp=%s unix=%s\n", tcpAddr, unixPath)
55
56 writesSinceFlush := 0
57 lastFlush := time.Now()
58
59 for {
60 select {
61 case frame := <-tcpReq:
62 resp := dispatch(tree, []byte(frame), &writesSinceFlush)
63 tcpResp <- moxie.Bytes(resp)
64 case frame := <-unixReq:
65 resp := dispatch(tree, []byte(frame), &writesSinceFlush)
66 unixResp <- moxie.Bytes(resp)
67 }
68 if writesSinceFlush >= flushEvery || (writesSinceFlush > 0 && time.Since(lastFlush) > 5*time.Second) {
69 flushTree(tree, path)
70 writesSinceFlush = 0
71 lastFlush = time.Now()
72 }
73 }
74 }
75
76 // netDomain is spawned per listener. It owns the socket, accepts connections,
77 // and services them sequentially. Each request frame is sent to the main domain
78 // via the req channel; the response comes back on resp.
79 func netDomain(addr, network moxie.Bytes, req chan moxie.Bytes, resp chan moxie.Bytes) {
80 addrStr := string([]byte(addr))
81 netStr := string([]byte(network))
82
83 if netStr == "unix" {
84 os.Remove(addrStr)
85 }
86
87 ln, err := net.Listen(netStr, addrStr)
88 if err != nil {
89 fmt.Fprintf(os.Stderr(), "net: listen %s %s: %v\n", netStr, addrStr, err)
90 return
91 }
92
93 for {
94 conn, err := ln.Accept()
95 if err != nil {
96 continue
97 }
98 serviceConn(conn, req, resp)
99 conn.Close()
100 }
101 }
102
103 // serviceConn reads request frames from conn, forwards to main domain via req/resp
104 // channels, and writes responses back. Returns when the connection errors or closes.
105 func serviceConn(conn net.Conn, req chan moxie.Bytes, resp chan moxie.Bytes) {
106 for {
107 frame, err := iskra.ReadFrame(conn)
108 if err != nil {
109 return
110 }
111 if len(frame) == 0 {
112 return
113 }
114 req <- moxie.Bytes(frame)
115 response := <-resp
116 if err := iskra.WriteFrame(conn, []byte(response)); err != nil {
117 return
118 }
119 }
120 }
121
122 func dispatch(tree *iskra.Tree, frame []byte, writes *int32) []byte {
123 if len(frame) < 1 {
124 return iskra.EncodeResponse(iskra.StatusError, []byte("empty"))
125 }
126 op := frame[0]
127 switch op {
128 case iskra.OpBigramWeight:
129 return handleBigramWeight(tree, frame[1:])
130 case iskra.OpBigramWeightRelaxed:
131 return handleBigramWeightRelaxed(tree, frame[1:])
132 case iskra.OpIngestBigram:
133 *writes++
134 return handleIngestBigram(tree, frame[1:])
135 case iskra.OpIngestText:
136 *writes++
137 return handleIngestText(tree, frame[1:])
138 case iskra.OpWalkStep:
139 return handleWalkStep(tree, frame[1:])
140 case iskra.OpTranslate:
141 return handleTranslate(tree, frame[1:])
142 case iskra.OpBeamRun:
143 return handleBeamRun(tree, frame[1:])
144 case iskra.OpFlush:
145 *writes = 0
146 return iskra.EncodeResponse(iskra.StatusOK, nil)
147 }
148 return iskra.EncodeResponse(iskra.StatusError, []byte("unknown op"))
149 }
150
151 func handleBigramWeight(tree *iskra.Tree, p []byte) []byte {
152 if len(p) < 9 {
153 return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
154 }
155 domain := p[0]
156 coord, _ := iskra.DecodeUint64(p, 1)
157 prev, n1 := iskra.DecodeString(p, 9)
158 next, _ := iskra.DecodeString(p, 9+n1)
159 w := iskra.BigramWeight(tree, domain, coord, prev, next)
160 return iskra.EncodeUint32Response(w)
161 }
162
163 func handleBigramWeightRelaxed(tree *iskra.Tree, p []byte) []byte {
164 if len(p) < 9 {
165 return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
166 }
167 domain := p[0]
168 coord, _ := iskra.DecodeUint64(p, 1)
169 prev, n1 := iskra.DecodeString(p, 9)
170 next, _ := iskra.DecodeString(p, 9+n1)
171 w, matched := iskra.BigramWeightRelaxed(tree, domain, coord, prev, next)
172 return iskra.EncodeWeightRelaxedResponse(w, matched)
173 }
174
175 func handleIngestBigram(tree *iskra.Tree, p []byte) []byte {
176 if len(p) < 9 {
177 return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
178 }
179 domain := p[0]
180 coord, _ := iskra.DecodeUint64(p, 1)
181 prev, n1 := iskra.DecodeString(p, 9)
182 next, _ := iskra.DecodeString(p, 9+n1)
183 ri := iskra.IngestBigram(tree, domain, coord, prev, next)
184 return iskra.EncodeUint32Response(ri)
185 }
186
187 func handleIngestText(tree *iskra.Tree, p []byte) []byte {
188 if len(p) < 11 {
189 return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
190 }
191 domain := p[0]
192 coord, _ := iskra.DecodeUint64(p, 1)
193 count := int32(uint16(p[9]) | uint16(p[10])<<8)
194 off := 11
195 tokens := []string{:0:count}
196 for i := 0; i < count; i++ {
197 s, n := iskra.DecodeString(p, off)
198 if n == 0 {
199 break
200 }
201 tokens = append(tokens, s)
202 off += n
203 }
204 iskra.IngestText(tree, domain, coord, tokens)
205 return iskra.EncodeResponse(iskra.StatusOK, nil)
206 }
207
208 func handleWalkStep(tree *iskra.Tree, p []byte) []byte {
209 if len(p) < 9 {
210 return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
211 }
212 domain := p[0]
213 coord, _ := iskra.DecodeUint64(p, 1)
214 word, n1 := iskra.DecodeString(p, 9)
215 off := 9 + n1
216 maxCandidates := 10
217 if off+2 <= len(p) {
218 maxCandidates = int32(uint16(p[off]) | uint16(p[off+1])<<8)
219 }
220 state := iskra.WalkState{
221 Domain: domain,
222 Coord: coord,
223 Word: word,
224 }
225 candidates := iskra.WalkStep(tree, state, maxCandidates)
226 return iskra.EncodeWalkCandidatesResponse(candidates)
227 }
228
229 func handleTranslate(tree *iskra.Tree, p []byte) []byte {
230 if len(p) < 10 {
231 return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
232 }
233 srcDomain := p[0]
234 dstDomain := p[1]
235 coord, _ := iskra.DecodeUint64(p, 2)
236 word, _ := iskra.DecodeString(p, 10)
237 src := iskra.NewSubLattice(tree.DB(), tree.Pool(), srcDomain, tree.Reg)
238 dst := iskra.NewSubLattice(tree.DB(), tree.Pool(), dstDomain, tree.Reg)
239 result := iskra.Translate(src, dst, word, coord)
240 if result == "" {
241 return iskra.EncodeResponse(iskra.StatusNotFound, nil)
242 }
243 return iskra.EncodeStringResponse(result)
244 }
245
246 func handleBeamRun(tree *iskra.Tree, p []byte) []byte {
247 if len(p) < 9 {
248 return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
249 }
250 domain := p[0]
251 coord, _ := iskra.DecodeUint64(p, 1)
252 word, n1 := iskra.DecodeString(p, 9)
253 off := 9 + n1
254 width := 4
255 maxSteps := 5
256 kind := iskra.WalkTranslate
257 if off+5 <= len(p) {
258 width = int32(uint16(p[off]) | uint16(p[off+1])<<8)
259 maxSteps = int32(uint16(p[off+2]) | uint16(p[off+3])<<8)
260 kind = iskra.WalkKind(p[off+4])
261 }
262 initial := iskra.WalkState{
263 Domain: domain,
264 Coord: coord,
265 Word: word,
266 }
267 beam := iskra.NewBeam(initial, width, kind)
268 result := beam.Run(tree, maxSteps)
269 return iskra.EncodeBeamResponse(result)
270 }
271
272 func loadTree(path string) (*iskra.Tree, error) {
273 tree, err := iskra.StorageOpen(path)
274 if err != nil {
275 db := lattice.NewTree(1024)
276 tree = iskra.NewTree(db)
277 }
278 return tree, nil
279 }
280
281 func flushTree(tree *iskra.Tree, path string) {
282 err := iskra.StorageFlushPath(tree, path)
283 if err != nil {
284 fmt.Fprintf(os.Stderr(), "flush: %v\n", err)
285 }
286 }
287