package main import ( "fmt" "net" "os" "time" "git.smesh.lol/iskra" "git.smesh.lol/iskradb/lattice" "moxie" ) const ( defaultTCP = ":4847" defaultUnix = "/tmp/iskra.sock" flushEvery = 64 ) func main() { if len(os.Args) < 2 { fmt.Fprintf(os.Stderr(), "usage: iskra-server [tcp-addr] [unix-path]\n") os.Exit(1) } path := os.Args[1] tcpAddr := defaultTCP unixPath := defaultUnix if len(os.Args) > 2 { tcpAddr = os.Args[2] } if len(os.Args) > 3 { unixPath = os.Args[3] } tree, err := loadTree(path) if err != nil { fmt.Fprintf(os.Stderr(), "load: %v\n", err) os.Exit(1) } fmt.Fprintf(os.Stderr(), "loaded: %d records, %d bigram entries\n", tree.EntryCount(), len(tree.BigramIdx)) // Spawn network domains - one per listener. // Each handles connections sequentially (one active at a time per listener). // Channels carry raw frame bytes across the spawn boundary. tcpReq := chan moxie.Bytes{} tcpResp := chan moxie.Bytes{} unixReq := chan moxie.Bytes{} unixResp := chan moxie.Bytes{} spawn(netDomain, moxie.Bytes([]byte(tcpAddr)), moxie.Bytes([]byte("tcp")), tcpReq, tcpResp) spawn(netDomain, moxie.Bytes([]byte(unixPath)), moxie.Bytes([]byte("unix")), unixReq, unixResp) fmt.Fprintf(os.Stderr(), "listening: tcp=%s unix=%s\n", tcpAddr, unixPath) writesSinceFlush := 0 lastFlush := time.Now() for { select { case frame := <-tcpReq: resp := dispatch(tree, []byte(frame), &writesSinceFlush) tcpResp <- moxie.Bytes(resp) case frame := <-unixReq: resp := dispatch(tree, []byte(frame), &writesSinceFlush) unixResp <- moxie.Bytes(resp) } if writesSinceFlush >= flushEvery || (writesSinceFlush > 0 && time.Since(lastFlush) > 5*time.Second) { flushTree(tree, path) writesSinceFlush = 0 lastFlush = time.Now() } } } // netDomain is spawned per listener. It owns the socket, accepts connections, // and services them sequentially. Each request frame is sent to the main domain // via the req channel; the response comes back on resp. func netDomain(addr, network moxie.Bytes, req chan moxie.Bytes, resp chan moxie.Bytes) { addrStr := string([]byte(addr)) netStr := string([]byte(network)) if netStr == "unix" { os.Remove(addrStr) } ln, err := net.Listen(netStr, addrStr) if err != nil { fmt.Fprintf(os.Stderr(), "net: listen %s %s: %v\n", netStr, addrStr, err) return } for { conn, err := ln.Accept() if err != nil { continue } serviceConn(conn, req, resp) conn.Close() } } // serviceConn reads request frames from conn, forwards to main domain via req/resp // channels, and writes responses back. Returns when the connection errors or closes. func serviceConn(conn net.Conn, req chan moxie.Bytes, resp chan moxie.Bytes) { for { frame, err := iskra.ReadFrame(conn) if err != nil { return } if len(frame) == 0 { return } req <- moxie.Bytes(frame) response := <-resp if err := iskra.WriteFrame(conn, []byte(response)); err != nil { return } } } func dispatch(tree *iskra.Tree, frame []byte, writes *int32) []byte { if len(frame) < 1 { return iskra.EncodeResponse(iskra.StatusError, []byte("empty")) } op := frame[0] switch op { case iskra.OpBigramWeight: return handleBigramWeight(tree, frame[1:]) case iskra.OpBigramWeightRelaxed: return handleBigramWeightRelaxed(tree, frame[1:]) case iskra.OpIngestBigram: *writes++ return handleIngestBigram(tree, frame[1:]) case iskra.OpIngestText: *writes++ return handleIngestText(tree, frame[1:]) case iskra.OpWalkStep: return handleWalkStep(tree, frame[1:]) case iskra.OpTranslate: return handleTranslate(tree, frame[1:]) case iskra.OpBeamRun: return handleBeamRun(tree, frame[1:]) case iskra.OpFlush: *writes = 0 return iskra.EncodeResponse(iskra.StatusOK, nil) } return iskra.EncodeResponse(iskra.StatusError, []byte("unknown op")) } func handleBigramWeight(tree *iskra.Tree, p []byte) []byte { if len(p) < 9 { return iskra.EncodeResponse(iskra.StatusError, []byte("short")) } domain := p[0] coord, _ := iskra.DecodeUint64(p, 1) prev, n1 := iskra.DecodeString(p, 9) next, _ := iskra.DecodeString(p, 9+n1) w := iskra.BigramWeight(tree, domain, coord, prev, next) return iskra.EncodeUint32Response(w) } func handleBigramWeightRelaxed(tree *iskra.Tree, p []byte) []byte { if len(p) < 9 { return iskra.EncodeResponse(iskra.StatusError, []byte("short")) } domain := p[0] coord, _ := iskra.DecodeUint64(p, 1) prev, n1 := iskra.DecodeString(p, 9) next, _ := iskra.DecodeString(p, 9+n1) w, matched := iskra.BigramWeightRelaxed(tree, domain, coord, prev, next) return iskra.EncodeWeightRelaxedResponse(w, matched) } func handleIngestBigram(tree *iskra.Tree, p []byte) []byte { if len(p) < 9 { return iskra.EncodeResponse(iskra.StatusError, []byte("short")) } domain := p[0] coord, _ := iskra.DecodeUint64(p, 1) prev, n1 := iskra.DecodeString(p, 9) next, _ := iskra.DecodeString(p, 9+n1) ri := iskra.IngestBigram(tree, domain, coord, prev, next) return iskra.EncodeUint32Response(ri) } func handleIngestText(tree *iskra.Tree, p []byte) []byte { if len(p) < 11 { return iskra.EncodeResponse(iskra.StatusError, []byte("short")) } domain := p[0] coord, _ := iskra.DecodeUint64(p, 1) count := int32(uint16(p[9]) | uint16(p[10])<<8) off := 11 tokens := []string{:0:count} for i := 0; i < count; i++ { s, n := iskra.DecodeString(p, off) if n == 0 { break } tokens = append(tokens, s) off += n } iskra.IngestText(tree, domain, coord, tokens) return iskra.EncodeResponse(iskra.StatusOK, nil) } func handleWalkStep(tree *iskra.Tree, p []byte) []byte { if len(p) < 9 { return iskra.EncodeResponse(iskra.StatusError, []byte("short")) } domain := p[0] coord, _ := iskra.DecodeUint64(p, 1) word, n1 := iskra.DecodeString(p, 9) off := 9 + n1 maxCandidates := 10 if off+2 <= len(p) { maxCandidates = int32(uint16(p[off]) | uint16(p[off+1])<<8) } state := iskra.WalkState{ Domain: domain, Coord: coord, Word: word, } candidates := iskra.WalkStep(tree, state, maxCandidates) return iskra.EncodeWalkCandidatesResponse(candidates) } func handleTranslate(tree *iskra.Tree, p []byte) []byte { if len(p) < 10 { return iskra.EncodeResponse(iskra.StatusError, []byte("short")) } srcDomain := p[0] dstDomain := p[1] coord, _ := iskra.DecodeUint64(p, 2) word, _ := iskra.DecodeString(p, 10) src := iskra.NewSubLattice(tree.DB(), tree.Pool(), srcDomain, tree.Reg) dst := iskra.NewSubLattice(tree.DB(), tree.Pool(), dstDomain, tree.Reg) result := iskra.Translate(src, dst, word, coord) if result == "" { return iskra.EncodeResponse(iskra.StatusNotFound, nil) } return iskra.EncodeStringResponse(result) } func handleBeamRun(tree *iskra.Tree, p []byte) []byte { if len(p) < 9 { return iskra.EncodeResponse(iskra.StatusError, []byte("short")) } domain := p[0] coord, _ := iskra.DecodeUint64(p, 1) word, n1 := iskra.DecodeString(p, 9) off := 9 + n1 width := 4 maxSteps := 5 kind := iskra.WalkTranslate if off+5 <= len(p) { width = int32(uint16(p[off]) | uint16(p[off+1])<<8) maxSteps = int32(uint16(p[off+2]) | uint16(p[off+3])<<8) kind = iskra.WalkKind(p[off+4]) } initial := iskra.WalkState{ Domain: domain, Coord: coord, Word: word, } beam := iskra.NewBeam(initial, width, kind) result := beam.Run(tree, maxSteps) return iskra.EncodeBeamResponse(result) } func loadTree(path string) (*iskra.Tree, error) { tree, err := iskra.StorageOpen(path) if err != nil { db := lattice.NewTree(1024) tree = iskra.NewTree(db) } return tree, nil } func flushTree(tree *iskra.Tree, path string) { err := iskra.StorageFlushPath(tree, path) if err != nil { fmt.Fprintf(os.Stderr(), "flush: %v\n", err) } }