Showcase Hub

King Corp Technology

Projects, downloads, code, and modular showcase data.

Code Samples

10 files
.py (6) .ino (3) .html (1)

Grouped by Language

Arduino

3 files
drone.ino
.ino - 16.69 KB
Task: Arduino and Embedded Control
loadcell_brakepedal.ino
.ino - 4.15 KB
Task: Arduino and Embedded Control
networknode.ino
.ino - 17.16 KB
Task: Arduino and Embedded Control

Python

6 files
corpus_trainer.py
.py - 6.45 KB
Task: Data Parsing and Analytics
cursorcheck.py
.py - 336 B
Task: Desktop UI and Utilities
detokenizer.py
.py - 5.24 KB
Task: Data Parsing and Analytics
mojibake_tokenizer.py
.py - 1.52 KB
Task: General Purpose
postag.py
.py - 3.19 KB
Task: Automation and Scripting
tokenizer.py
.py - 8.02 KB
Task: Data Parsing and Analytics

HTML

1 file
mp4convertor.html
.html - 2.12 KB
Task: Web and API Services

All Code Samples

corpus_trainer.py
Python - Data Parsing and Analytics - 6.45 KB
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path

from tokenizer import Tokenizer

BASE_CONTENT_DIR = Path(__file__).resolve().parents[1]
DEFAULT_UPLOADS_DIR = BASE_CONTENT_DIR / "trainer_uploads"
DEFAULT_INCOMING_DIR = DEFAULT_UPLOADS_DIR / "incoming"
DEFAULT_CORPUS_PATH = DEFAULT_UPLOADS_DIR / "token_corpus.json"
DEFAULT_MANIFEST_PATH = DEFAULT_UPLOADS_DIR / "trainer_manifest.json"

TEXT_EXTENSIONS = {
    ".txt", ".md", ".json", ".csv", ".xml", ".log", ".rtf",
    ".py", ".ps1", ".bat", ".sh", ".ini", ".toml", ".yaml", ".yml",
}


def utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def sha256_file(path: Path, chunk_size: int = 1024 * 1024) -> str:
    digest = hashlib.sha256()
    with path.open("rb") as handle:
        while True:
            chunk = handle.read(chunk_size)
            if not chunk:
                break
            digest.update(chunk)
    return digest.hexdigest()


def ensure_upload_paths(root: Path) -> None:
    root.mkdir(parents=True, exist_ok=True)
    (root / "incoming").mkdir(parents=True, exist_ok=True)
    readme = root / "README.txt"
    if not readme.exists():
        readme.write_text(
            "Drop text files into the incoming folder.\n"
            "Run corpus_trainer.py to ingest them into token_corpus.json.\n",
            encoding="utf-8",
        )


def load_manifest(path: Path) -> dict[str, dict[str, str | int]]:
    if not path.exists():
        return {}
    try:
        data = json.loads(path.read_text(encoding="utf-8"))
        if isinstance(data, dict):
            return data
    except Exception:
        pass
    return {}


def save_manifest(path: Path, data: dict[str, dict[str, str | int]]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")


def iter_input_files(input_dir: Path, recursive: bool = True) -> list[Path]:
    pattern = "**/*" if recursive else "*"
    items: list[Path] = []
    for path in sorted(input_dir.glob(pattern)):
        if not path.is_file():
            continue
        if path.suffix.lower() not in TEXT_EXTENSIONS:
            continue
        items.append(path)
    return items


def train_file(tokenizer: Tokenizer, path: Path, include_pos: bool, chunk_chars: int) -> int:
    seen_chars = 0
    buffer: list[str] = []
    buffer_size = 0

    with path.open("r", encoding="utf-8", errors="replace") as handle:
        for line in handle:
            buffer.append(line)
            buffer_size += len(line)
            seen_chars += len(line)
            if buffer_size >= chunk_chars:
                tokenizer.encode("".join(buffer), include_pos=include_pos)
                buffer.clear()
                buffer_size = 0

    if buffer:
        tokenizer.encode("".join(buffer), include_pos=include_pos)

    return seen_chars


def train_corpus(
    uploads_dir: Path,
    incoming_dir: Path,
    corpus_path: Path,
    manifest_path: Path,
    include_pos: bool,
    force: bool,
    recursive: bool,
    limit: int,
    chunk_chars: int,
) -> dict[str, int]:
    ensure_upload_paths(uploads_dir)
    incoming_dir.mkdir(parents=True, exist_ok=True)

    tokenizer = Tokenizer(corpus_path=corpus_path)
    manifest = load_manifest(manifest_path)

    files = iter_input_files(incoming_dir, recursive=recursive)
    if limit > 0:
        files = files[:limit]

    processed = 0
    skipped = 0
    total_chars = 0

    for path in files:
        rel = path.relative_to(uploads_dir).as_posix()
        digest = sha256_file(path)
        size = path.stat().st_size

        previous = manifest.get(rel, {})
        if not force and previous.get("sha256") == digest and int(previous.get("size", -1)) == size:
            skipped += 1
            continue

        chars = train_file(tokenizer, path, include_pos=include_pos, chunk_chars=chunk_chars)
        total_chars += chars
        processed += 1

        manifest[rel] = {
            "sha256": digest,
            "size": size,
            "trained_at": utc_now_iso(),
        }

    tokenizer.save_corpus(corpus_path)
    save_manifest(manifest_path, manifest)

    return {
        "processed": processed,
        "skipped": skipped,
        "files_seen": len(files),
        "chars": total_chars,
        "vocab_size": len(tokenizer.token_to_id),
    }


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Train tokenizer corpus from uploaded text files.")
    parser.add_argument("--uploads-dir", default=str(DEFAULT_UPLOADS_DIR))
    parser.add_argument("--incoming-dir", default=str(DEFAULT_INCOMING_DIR))
    parser.add_argument("--corpus", default=str(DEFAULT_CORPUS_PATH))
    parser.add_argument("--manifest", default=str(DEFAULT_MANIFEST_PATH))
    parser.add_argument("--include-pos", action="store_true", help="Emit POS tokens while training.")
    parser.add_argument("--force", action="store_true", help="Retrain all files even if unchanged.")
    parser.add_argument("--no-recursive", action="store_true", help="Only read files in incoming root.")
    parser.add_argument("--limit", type=int, default=0, help="Max number of files to process (0 = all).")
    parser.add_argument("--chunk-chars", type=int, default=200000, help="Text chunk size per encode pass.")
    return parser.parse_args()


def main() -> int:
    args = parse_args()

    uploads_dir = Path(args.uploads_dir).resolve()
    incoming_dir = Path(args.incoming_dir).resolve()
    corpus_path = Path(args.corpus).resolve()
    manifest_path = Path(args.manifest).resolve()

    summary = train_corpus(
        uploads_dir=uploads_dir,
        incoming_dir=incoming_dir,
        corpus_path=corpus_path,
        manifest_path=manifest_path,
        include_pos=bool(args.include_pos),
        force=bool(args.force),
        recursive=not bool(args.no_recursive),
        limit=max(0, int(args.limit)),
        chunk_chars=max(10000, int(args.chunk_chars)),
    )

    print("Corpus training complete")
    print(f"uploads_dir: {uploads_dir}")
    print(f"incoming_dir: {incoming_dir}")
    print(f"corpus: {corpus_path}")
    print(f"manifest: {manifest_path}")
    print(f"files_seen: {summary['files_seen']}")
    print(f"processed: {summary['processed']}")
    print(f"skipped: {summary['skipped']}")
    print(f"chars: {summary['chars']}")
    print(f"vocab_size: {summary['vocab_size']}")

    return 0


if __name__ == "__main__":
    raise SystemExit(main())
cursorcheck.py
Python - Desktop UI and Utilities - 336 B
import pyautogui
import time

print("Cursor Position Tracker")
print("Press Ctrl+C to stop")
print("X, Y coordinates:")

try:
    while True:
        x, y = pyautogui.position()
        print(f"\rX: {x:4d}, Y: {y:4d}", end="", flush=True)
        time.sleep(0.1)  
except KeyboardInterrupt:
    print("\n\nTracker stopped")
detokenizer.py
Python - Data Parsing and Analytics - 5.24 KB
#!/usr/bin/env python3
from __future__ import annotations

import argparse
from pathlib import Path
from typing import Any, Iterable

SPACE_TOKEN = "space"
NEWLINE_TOKEN = "newline"
TAB_TOKEN = "tab"
POS_TOKEN_PREFIX = "__POS_"
POS_TOKEN_SUFFIX = "__"


class Detokenizer:
    """Rebuild plain text from tokenizer outputs.

    Handles token streams that include:
    - chars + full words
    - optional POS tokens
    - whitespace marker tokens
    - ids, tuples, or dict token rows
    """

    def __init__(
        self,
        tokenizer: Any | None = None,
        *,
        id_to_token: dict[int, str] | None = None,
        space_token: str = SPACE_TOKEN,
    ) -> None:
        self.space_token = (space_token or SPACE_TOKEN).lower()
        self.id_to_token: dict[int, str] = {}

        if tokenizer is not None and hasattr(tokenizer, "id_to_token"):
            for token_id, token_text in getattr(tokenizer, "id_to_token", {}).items():
                self.id_to_token[int(token_id)] = str(token_text)

        if id_to_token:
            for token_id, token_text in id_to_token.items():
                self.id_to_token[int(token_id)] = str(token_text)

    @classmethod
    def from_corpus(cls, corpus_path: str | Path) -> "Detokenizer":
        path = Path(corpus_path)
        if not path.exists():
            return cls()

        try:
            import json

            payload = json.loads(path.read_text(encoding="utf-8"))
            loaded = payload.get("token_to_id", {}) if isinstance(payload, dict) else {}
            id_to_token = {int(token_id): str(token) for token, token_id in (loaded or {}).items()}
            return cls(id_to_token=id_to_token)
        except Exception:
            return cls()

    def _resolve_token(self, raw: Any) -> str:
        if raw is None:
            return ""

        if isinstance(raw, str):
            return raw

        if isinstance(raw, int):
            return self.id_to_token.get(raw, "")

        if hasattr(raw, "token"):
            return str(getattr(raw, "token"))

        if isinstance(raw, dict):
            if "token" in raw:
                return str(raw.get("token") or "")
            if "id" in raw:
                token_id = int(raw.get("id") or 0)
                return self.id_to_token.get(token_id, "")
            if "token_id" in raw:
                token_id = int(raw.get("token_id") or 0)
                return self.id_to_token.get(token_id, "")
            return ""

        if isinstance(raw, (tuple, list)):
            if not raw:
                return ""
            first = raw[0]
            if isinstance(first, str):
                return first
            if isinstance(first, int):
                return self.id_to_token.get(first, "")
            return str(first or "")

        return str(raw)

    def _flush_chars(self, chars: list[str], output: list[str]) -> None:
        if chars:
            output.append("".join(chars))
            chars.clear()

    def detokenize(self, encoded_tokens: Iterable[Any]) -> str:
        output: list[str] = []
        chars: list[str] = []

        for raw in encoded_tokens:
            token = self._resolve_token(raw)
            if not token:
                continue

            lowered = token.lower()

            if lowered.startswith(POS_TOKEN_PREFIX.lower()) and lowered.endswith(POS_TOKEN_SUFFIX.lower()):
                continue

            if lowered == self.space_token:
                self._flush_chars(chars, output)
                output.append(" ")
                continue

            if lowered == NEWLINE_TOKEN:
                self._flush_chars(chars, output)
                output.append("\n")
                continue

            if lowered == TAB_TOKEN:
                self._flush_chars(chars, output)
                output.append("\t")
                continue

            if len(token) == 1 and token.isalnum():
                chars.append(token)
                continue

            if token.isalnum() and len(token) > 1:
                candidate = "".join(chars)
                if candidate and candidate.lower() == lowered:
                    output.append(token)
                    chars.clear()
                else:
                    self._flush_chars(chars, output)
                    output.append(token)
                continue

            if len(token) == 1 and not token.isalnum():
                self._flush_chars(chars, output)
                output.append(token)
                continue

            self._flush_chars(chars, output)
            output.append(token)

        self._flush_chars(chars, output)
        return "".join(output)


def detokenize_tokens(tokens: Iterable[Any], detokenizer: Detokenizer | None = None) -> str:
    engine = detokenizer or Detokenizer()
    return engine.detokenize(tokens)


def _parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Detokenize token streams.")
    parser.add_argument("--tokens", nargs="*", default=[])
    parser.add_argument("--corpus", default="")
    return parser.parse_args()


def main() -> int:
    args = _parse_args()
    detok = Detokenizer.from_corpus(args.corpus) if args.corpus else Detokenizer()
    if args.tokens:
        print(detok.detokenize(args.tokens))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
drone.ino
Arduino - Arduino and Embedded Control - 16.69 KB
#include <WiFi.h>
#include <Wire.h>
#include <Adafruit_MPU6050.h>
#include <Adafruit_Sensor.h>
#include <ArduinoJson.h>

// ------------------------------------------------------------------
// WiFi Credentials
// ------------------------------------------------------------------
const char* ssid = "That shitty BMW";
const char* password = "weewoowee";

// ------------------------------------------------------------------
// Motor Pins
// ------------------------------------------------------------------
const int M1F = 5;   // Front Left
const int M2F = 6;   // Front Right
const int M3F = 7;   // Back Left
const int M4F = 10;  // Back Right

// ------------------------------------------------------------------
// Basic Constants
// ------------------------------------------------------------------
const int   MIN_MOTOR      = 20;    // Minimal PWM
const int   MAX_MOTOR      = 255;   // Max PWM
const float START_THROTTLE = 0.05f; // 15%
const float THROTTLE_STEP  = 0.05f; // Raise/Lower increments
const float OFFSET_SCALE   = 40.0f; // Strength for manual pitch/roll/yaw

// ------------------------------------------------------------------
// PID Settings for Stabilization
// ------------------------------------------------------------------
struct PID {
  float kp, ki, kd;
  float integral;
  float lastError;
  unsigned long lastT;
};

PID pitchPID = {0.7f, 0.0f, 0.25f, 0, 0, 0};
PID rollPID  = {0.7f, 0.0f, 0.25f, 0, 0, 0};
PID altPID   = {3.0f, 0.0f, 0.5f,  0, 0, 0};  // altitude

// Each axis tries to hold 0 except altitude tries to hold targetAltitude
float targetPitch     = 0.0f;
float targetRoll      = 0.0f;
float targetAltitude  = 0.0f;

// ------------------------------------------------------------------
// Globals
// ------------------------------------------------------------------
WiFiServer server(80);
Adafruit_MPU6050 mpu;

// Manual overrides: 0..1 for throttle, -1..1 for pitch/roll/yaw
float throttleVal   = 0.0f;
float pitchManual   = 0.0f;
float rollManual    = 0.0f;
float yawManual     = 0.0f;

// The final “commanded” pitch/roll/yaw after PID
float pitchCmd      = 0.0f;
float rollCmd       = 0.0f;
float yawCmd        = 0.0f;

// If motors are running or not
bool motorsRunning        = false;
// If we should do self-leveling + alt hold
bool stabilizationEnabled = false;

// Altitude variable (placeholder—needs real sensor)
float currentAltitude     = 0.0f; 

// ------------------------------------------------------------------
// HTML + JavaScript
// ------------------------------------------------------------------
const char mainPage[] PROGMEM = R"rawliteral(
<!DOCTYPE html>
<html>
<head>
  <meta name="viewport" content="width=device-width,initial-scale=1.0">
  <title>Drone Control w/ Basic Stabilization</title>
  <style>
    body {
      font-family: Arial; text-align: center;
      background: #f0f0f0; margin:0; padding:20px;
    }
    button {
      font-size:1.2em; padding:12px 24px; margin:5px;
      border:none; border-radius:5px; background:#2196F3; color:white;
      cursor:pointer; min-width:120px; user-select:none;
    }
    button:active { background:#1976D2; }
    .armed   { background:#f44336; }
    .stab-on { background:#4CAF50; }
    .status  {
      margin:15px; padding:10px; background:white; border-radius:5px;
    }
    .controls {
      display:grid; grid-template-columns:repeat(3,1fr);
      gap:10px; max-width:400px; margin:20px auto;
    }
    .throttle-display {
      font-size:1.2em; font-weight:bold; margin:10px 0;
    }
  </style>
</head>
<body>
  <h1>Drone Control w/ Basic Stabilization</h1>
  <div>
    <button id="armBtn" onclick="toggleMotors()">Start Motors</button>
    <button id="stabBtn" onclick="toggleStab()">Enable Stabilization</button>
  </div>
  <div class="status">
    <div id="motorStatus">Motors: Stopped</div>
    <div id="stabStatus">Stabilization: Off</div>
    <div id="throttleStatus" class="throttle-display">Throttle: 0%</div>
    <div id="altitudeStatus">Alt: 0m</div>
  </div>
  <div>
    <button onmousedown="setThrottle(1)" onmouseup="stopThrottle()">Raise ▲</button>
    <button onmousedown="setThrottle(-1)" onmouseup="stopThrottle()">Lower ▼</button>
  </div>
  <div class="controls">
    <div></div>
    <button onmousedown="setControl('pitch',1)" onmouseup="stopControl()">Forward</button>
    <div></div>

    <button onmousedown="setControl('roll',-1)" onmouseup="stopControl()">Left</button>
    <button onmousedown="hover()" onmouseup="stopControl()">Hover</button>
    <button onmousedown="setControl('roll',1)" onmouseup="stopControl()">Right</button>

    <div></div>
    <button onmousedown="setControl('pitch',-1)" onmouseup="stopControl()">Back</button>
    <div></div>
  </div>
  <div>
    <button onmousedown="setControl('yaw',-1)" onmouseup="stopControl()">Rotate Left</button>
    <button onmousedown="setControl('yaw',1)" onmouseup="stopControl()">Rotate Right</button>
  </div>

  <script>
    let motorsRunning=false, stabEnabled=false, updateTimer=null;
    const state = { throttle:0.0, pitch:0.0, roll:0.0, yaw:0.0 };

    async function toggleMotors(){
      const r=await fetch('/motors');
      const d=await r.json();
      motorsRunning = d.running;
      updateUI();
    }
    async function toggleStab(){
      const r=await fetch('/stabilize');
      const d=await r.json();
      stabEnabled = d.enabled;
      updateUI();
    }
    function updateUI(){
      document.getElementById('motorStatus').textContent =
        `Motors: ${motorsRunning?'Running':'Stopped'}`;
      document.getElementById('stabStatus').textContent =
        `Stabilization: ${stabEnabled?'On':'Off'}`;
      document.getElementById('throttleStatus').textContent =
        `Throttle: ${Math.round(state.throttle*100)}%`;
      document.getElementById('armBtn').textContent =
        motorsRunning?'Stop Motors':'Start Motors';
      document.getElementById('armBtn').className =
        motorsRunning?'armed':'';
      document.getElementById('stabBtn').textContent =
        stabEnabled?'Disable Stabilization':'Enable Stabilization';
      document.getElementById('stabBtn').className =
        stabEnabled?'stab-on':'';
    }
    function setThrottle(dir){
      if(!motorsRunning) return;
      if(updateTimer) clearInterval(updateTimer);
      updateTimer=setInterval(()=>{
        state.throttle=Math.max(0,Math.min(1,state.throttle+dir*0.1));  // Make it 10%

        updateUI(); sendControls();
      },10);
    }
    function stopThrottle(){
      if(updateTimer){clearInterval(updateTimer);updateTimer=null;}
    }
    function setControl(axis,val){
      if(!motorsRunning) return;
      state[axis]=val; // pitch/roll/yaw in [-1..1]
      sendControls();
    }
    function stopControl(){
      if(!motorsRunning) return;
      state.pitch=0; state.roll=0; state.yaw=0;
      sendControls();
    }
    function hover(){
      if(!motorsRunning) return;
      // Keep throttle, zero out pitch/roll/yaw
      state.pitch=0; state.roll=0; state.yaw=0;
      sendControls();
    }
    async function sendControls(){
      try {
        await fetch('/control',{
          method:'POST',
          headers:{'Content-Type':'application/json'},
          body:JSON.stringify(state)
        });
      }catch(e){console.error("Control error:",e);}
    }
    updateUI();
  </script>
</body>
</html>
)rawliteral";

// --------------------------------------------------------------------------------
// A tiny helper to map [0..1] to [MIN_MOTOR..MAX_MOTOR]
// If you have ESCs that require a different mapping, change it here.
// --------------------------------------------------------------------------------
static int mapFloatToPWM(float x) {
  if(x<=0.0f) return 0;
  if(x>=1.0f) return MAX_MOTOR;
  return (int)(MIN_MOTOR + x*(MAX_MOTOR - MIN_MOTOR));
}

// --------------------------------------------------------------------------------
// Setup
// --------------------------------------------------------------------------------
void setup() {
  Serial.begin(115200);
  Serial.println("Starting initialization...");
  
  WiFi.begin(ssid, password);
  while(WiFi.status()!=WL_CONNECTED){
    delay(500);
    Serial.print(".");
  }
  Serial.println("\nWiFi connected, IP: " + WiFi.localIP().toString());
  server.begin();

  // Initialize MPU
  Wire.begin(21,20);
  if(!mpu.begin()){
    Serial.println("MPU6050 not found! Continuing anyway...");
  } else {
    mpu.setAccelerometerRange(MPU6050_RANGE_8_G);
    mpu.setGyroRange(MPU6050_RANGE_500_DEG);
    mpu.setFilterBandwidth(MPU6050_BAND_21_HZ);
  }

  // Motor pins
  pinMode(M1F, OUTPUT);
  pinMode(M2F, OUTPUT);
  pinMode(M3F, OUTPUT);
  pinMode(M4F, OUTPUT);

  // Ensure motors off
  analogWrite(M1F, 0);
  analogWrite(M2F, 0);
  analogWrite(M3F, 0);
  analogWrite(M4F, 0);

  // Initialize altitude to 0, or your starting altitude
  currentAltitude = 0;
  targetAltitude  = 0;

  Serial.println("Initialization complete!");
}

// --------------------------------------------------------------------------------
// A simple PID compute function
// --------------------------------------------------------------------------------
float pidCompute(PID &pid, float setpoint, float measured) {
  unsigned long now = millis();
  float dt = (pid.lastT==0)? 0.01f : (now - pid.lastT)/1000.0f;
  pid.lastT = now;

  float error = setpoint - measured;
  pid.integral += error*dt;
  float derivative = (error - pid.lastError)/dt;
  pid.lastError = error;

  // naive PID
  float output = pid.kp*error + pid.ki*pid.integral + pid.kd*derivative;
  return output;
}

// --------------------------------------------------------------------------------
// readIMU: compute approximate pitch/roll angles (naive!)
// For a real drone, you'd do sensor fusion. This is a big simplification.
// Also do a placeholder altitude measurement here.
// --------------------------------------------------------------------------------
void readIMU() {
  sensors_event_t accel, gyro, temp;
  if(!mpu.getEvent(&accel, &gyro, &temp)) return;

  // Approx pitch angle from X/Z, roll angle from Y/Z
  // Usually: pitch = atan2(Ay, Az), roll = atan2(-Ax, Az)
  float pitchAngle = atan2(accel.acceleration.y,
                           accel.acceleration.z)*180.0f/PI;
  float rollAngle  = atan2(-accel.acceleration.x,
                            accel.acceleration.z)*180.0f/PI;

  // A placeholder altitude read
  // e.g. if you had a barometer or range sensor,
  // you'd store it in currentAltitude
  // For now, we'll just pretend it's stable or do something naive:
  // currentAltitude += something...

  if(stabilizationEnabled){
    // ~~~~~~~~~~~~~~~~~ PITCH/ROLL STABILIZATION ~~~~~~~~~~~~~~~~~
    // Compute how much we want to change pitchCmd / rollCmd
    // so the drone tries to hold 0 angle.
    // This will "compete" with manual pitchManual & rollManual
    float pitchCorrection = pidCompute(pitchPID, targetPitch, pitchAngle);
    float rollCorrection  = pidCompute(rollPID,  targetRoll,  rollAngle);

    // Merge manual input with auto correction:
    // e.g. final pitchCmd = manual + correction
    pitchCmd = constrain(pitchManual + pitchCorrection, -1.0f, 1.0f);
    rollCmd  = constrain(rollManual  + rollCorrection,  -1.0f, 1.0f);

    // ~~~~~~~~~~~~~~~~~ ALTITUDE HOLD ~~~~~~~~~~~~~~~~~
    float altCorrection   = pidCompute(altPID, targetAltitude, currentAltitude);

    // If altCorrection is positive => we want to climb slightly,
    // negative => descend. So we apply that to throttleVal:
    float newThrottle = throttleVal + altCorrection;
    // clamp 0..1
    throttleVal = constrain(newThrottle, 0.0f, 1.0f);

  } else {
    // If stabilization is off, just pass manual commands directly
    pitchCmd = pitchManual;
    rollCmd  = rollManual;
  }
}

// --------------------------------------------------------------------------------
// driveMotors: merges throttle with pitchCmd, rollCmd, yawManual
// --------------------------------------------------------------------------------
void driveMotors() {
  if(!motorsRunning){
    analogWrite(M1F,0); analogWrite(M2F,0);
    analogWrite(M3F,0); analogWrite(M4F,0);
    return;
  }

  // Use throttle directly without minimum enforcement
  float effectiveThrottle = throttleVal;
  
  int basePWM = mapFloatToPWM(effectiveThrottle);

  // We combine pitchCmd, rollCmd, yawManual
  float pTerm = pitchCmd * OFFSET_SCALE * effectiveThrottle;
  float rTerm = rollCmd  * OFFSET_SCALE * effectiveThrottle;
  float yTerm = yawManual* OFFSET_SCALE * effectiveThrottle;

  int m1 = basePWM + (int)( pTerm + rTerm + yTerm); // front-left
  int m2 = basePWM + (int)( pTerm - rTerm - yTerm); // front-right
  int m3 = basePWM + (int)(-pTerm + rTerm - yTerm); // back-left
  int m4 = basePWM + (int)(-pTerm - rTerm + yTerm); // back-right

  // Only apply MIN_MOTOR if the calculated value is above 0
  if(m1>0) m1=max(m1,MIN_MOTOR);
  if(m2>0) m2=max(m2,MIN_MOTOR);
  if(m3>0) m3=max(m3,MIN_MOTOR);
  if(m4>0) m4=max(m4,MIN_MOTOR);

  m1=constrain(m1,0,MAX_MOTOR);
  m2=constrain(m2,0,MAX_MOTOR);
  m3=constrain(m3,0,MAX_MOTOR);
  m4=constrain(m4,0,MAX_MOTOR);

  analogWrite(M1F,m1);
  analogWrite(M2F,m2);
  analogWrite(M3F,m3);
  analogWrite(M4F,m4);

  static unsigned long lastDbg=0;
  if(millis()-lastDbg>500){
    Serial.printf("driveMotors => T=%.2f pitchCmd=%.2f rollCmd=%.2f yaw=%.2f alt=%.2f => M1=%d M2=%d M3=%d M4=%d\n",
      throttleVal, pitchCmd, rollCmd, yawManual, currentAltitude,
      m1,m2,m3,m4);
    lastDbg=millis();
  }
}

// --------------------------------------------------------------------------------
// handleWebClient: read HTTP requests
//  - /control => parse {throttle,pitch,roll,yaw}
//  - /motors => toggle motors
//  - /stabilize => toggle stabilization
//  - / => serve mainPage
// --------------------------------------------------------------------------------
void handleWebClient() {
  WiFiClient client=server.available();
  if(!client) return;

  String reqLine=client.readStringUntil('\r');
  client.readStringUntil('\n'); // discard leftover

  if(reqLine.indexOf("POST /control")>=0){
    while(true){
      String l=client.readStringUntil('\n');
      if(l.length()<=1) break;
    }
    String body;
    while(client.available()) body+=(char)client.read();

    StaticJsonDocument<256> doc;
    DeserializationError err = deserializeJson(doc, body);
    if(!err){
      // Add debug printing
      Serial.println("Received control update:");
      
      // More explicit control value handling
      if(doc.containsKey("throttle")) {
        throttleVal = doc["throttle"].as<float>();
        Serial.printf("  New throttle: %.2f\n", throttleVal);
      }
      if(doc.containsKey("pitch")) {
        pitchManual = doc["pitch"].as<float>();
        Serial.printf("  New pitch: %.2f\n", pitchManual);
      }
      if(doc.containsKey("roll")) {
        rollManual = doc["roll"].as<float>();
        Serial.printf("  New roll: %.2f\n", rollManual);
      }
      if(doc.containsKey("yaw")) {
        yawManual = doc["yaw"].as<float>();
        Serial.printf("  New yaw: %.2f\n", yawManual);
      }
    } else {
      Serial.println("JSON parse error in /control");
    }
    client.println("HTTP/1.1 200 OK\r\nConnection:close\r\n\r\n");
  }
  else if(reqLine.indexOf("GET /motors")>=0){
    motorsRunning=!motorsRunning;
    if(!motorsRunning){
      throttleVal=0;
      pitchManual=0; rollManual=0; yawManual=0;
      // and reset PIDs
      pitchPID.integral=0; pitchPID.lastError=0; pitchPID.lastT=0;
      rollPID.integral=0;  rollPID.lastError=0;  rollPID.lastT=0;
      altPID.integral=0;   altPID.lastError=0;   altPID.lastT=0;
      analogWrite(M1F,0); analogWrite(M2F,0);
      analogWrite(M3F,0); analogWrite(M4F,0);
    } else {
      throttleVal = 0.0f;  // Start at 0 instead of START_THROTTLE
    }
    StaticJsonDocument<64> doc;
    doc["running"]=motorsRunning;
    String s; 
    serializeJson(doc,s);
    client.println("HTTP/1.1 200 OK\r\nContent-Type:application/json\r\nConnection:close\r\n\r\n"+s);
  }
  else if(reqLine.indexOf("GET /stabilize")>=0){
    stabilizationEnabled=!stabilizationEnabled;
    StaticJsonDocument<64> doc;
    doc["enabled"]=stabilizationEnabled;
    String s; 
    serializeJson(doc,s);
    client.println("HTTP/1.1 200 OK\r\nContent-Type:application/json\r\nConnection:close\r\n\r\n"+s);

    // If enabling, set targetAltitude to currentAltitude, etc.
    if(stabilizationEnabled){
      targetAltitude = currentAltitude; 
    }
  }
  else {
    // serve the main HTML
    client.println("HTTP/1.1 200 OK\r\nContent-Type:text/html\r\nConnection:close\r\n\r\n");
    client.println(mainPage);
  }
  client.stop();
}

// --------------------------------------------------------------------------------
// Main Loop
// --------------------------------------------------------------------------------
void loop() {
  handleWebClient();

  // 1) read IMU => compute angles => run PID if stab enabled
  readIMU();

  // 2) drive motors with final pitchCmd/rollCmd/yawManual + throttleVal
  driveMotors();
  
  delay(10);
}
loadcell_brakepedal.ino
Arduino - Arduino and Embedded Control - 4.15 KB
#include "HX711.h"
#include <math.h>
#define HX_DOUT 32
#define HX_SCK  33
#define INVERT_BUTTON_PIN 27
#define BTN_RANGE_UP    14 
#define BTN_RANGE_DOWN  26 
#define DAC_PIN 25 
#define LED_PIN 2 
const int SAMPLES_PER_READ  = 1;
const int ZERO_CAL_READS    = 20;
const long MIN_FULL_RANGE   = 50000;      
const long MAX_FULL_RANGE   = 2000000;    
const long STEP_SMALL       = 1000;
const long STEP_FAST        = 75000; 
const unsigned long HOLD_DELAY_MS   = 500;   
const unsigned long HOLD_REPEAT_MS  = 100;   
float CURVE_EXP  = 1.5f;
HX711 scale;
long zeroOffset = 0;
bool invertBrake = false;
int  lastButtonStateInvert = HIGH;
long fullRange = MAX_FULL_RANGE;
int lastStateUp   = HIGH;
int lastStateDown = HIGH;
unsigned long pressStartUp    = 0;
unsigned long pressStartDown  = 0;
unsigned long lastRepeatUp    = 0;
unsigned long lastRepeatDown  = 0;
long readFastRaw() {
  while (!scale.is_ready()) {
  }
  static bool ledState = false;
  ledState = !ledState;
  digitalWrite(LED_PIN, ledState);
  return scale.read();
}
void clampFullRange() {
  if (fullRange < MIN_FULL_RANGE) fullRange = MIN_FULL_RANGE;
  if (fullRange > MAX_FULL_RANGE) fullRange = MAX_FULL_RANGE;
}
void adjustFullRange(long delta) {
  fullRange += delta;
  clampFullRange();
  Serial.print("FULL_RANGE adjusted to: ");
  Serial.println(fullRange);
}
void setup() {
  Serial.begin(115200);
  delay(300);
  Serial.println("FAST HX711 LOAD CELL BRAKE (80Hz, curved, adjustable FULL_RANGE + LED)");
  scale.begin(HX_DOUT, HX_SCK);
  scale.set_gain(128);
  delay(300);
  pinMode(INVERT_BUTTON_PIN, INPUT_PULLUP);
  pinMode(BTN_RANGE_UP,      INPUT_PULLUP);
  pinMode(BTN_RANGE_DOWN,    INPUT_PULLUP);
  pinMode(LED_PIN, OUTPUT);
  digitalWrite(LED_PIN, LOW);
  Serial.println("Zeroing... release pedal.");
  long sum = 0;
  for (int i = 0; i < ZERO_CAL_READS; i++) {
    sum += readFastRaw();
  }
  zeroOffset = sum / ZERO_CAL_READS;
  Serial.print("Zero offset = ");
  Serial.println(zeroOffset);
  fullRange = MAX_FULL_RANGE;
  Serial.print("Initial FULL_RANGE = ");
  Serial.println(fullRange);
}
void handleInvertButton() {
  int btn = digitalRead(INVERT_BUTTON_PIN);
  if (lastButtonStateInvert == HIGH && btn == LOW) {
    invertBrake = !invertBrake;
    Serial.print("Invert: ");
    Serial.println(invertBrake ? "ON" : "OFF");
  }
  lastButtonStateInvert = btn;
}
void handleRangeButtons() {
  unsigned long now = millis();
  int stateUp = digitalRead(BTN_RANGE_UP);
  if (lastStateUp == HIGH && stateUp == LOW) {
    pressStartUp = now;
    lastRepeatUp = now;
    adjustFullRange(STEP_SMALL); 
  }
  if (stateUp == LOW) {
    if (now - pressStartUp >= HOLD_DELAY_MS) {
      if (now - lastRepeatUp >= HOLD_REPEAT_MS) {
        adjustFullRange(STEP_FAST);
        lastRepeatUp = now;
      }
    }
  }
  lastStateUp = stateUp;
  int stateDown = digitalRead(BTN_RANGE_DOWN);
  if (lastStateDown == HIGH && stateDown == LOW) {
    pressStartDown = now;
    lastRepeatDown = now;
    adjustFullRange(-STEP_SMALL);
  }
  if (stateDown == LOW) {
    if (now - pressStartDown >= HOLD_DELAY_MS) {
      if (now - lastRepeatDown >= HOLD_REPEAT_MS) {
        adjustFullRange(-STEP_FAST);
        lastRepeatDown = now;
      }
    }
  }
  lastStateDown = stateDown;
}
void loop() {
  handleInvertButton();
  handleRangeButtons();
  long raw = readFastRaw();
  long diff = raw - zeroOffset;
  long rawRel = diff < 0 ? -diff : diff;
  float norm = (float)rawRel / (float)fullRange;
  if (norm < 0) norm = 0;
  if (norm > 1) norm = 1;
  if (CURVE_EXP != 1.0f) {
    norm = pow(norm, CURVE_EXP);
  }
  if (invertBrake) {
    norm = 1.0f - norm;
  }
  int brakePct = (int)(norm * 100.0f + 0.5f);
  int dacValue = (int)(norm * 255.0f);
  if (dacValue < 0) dacValue = 0;
  if (dacValue > 255) dacValue = 255;
  dacWrite(DAC_PIN, dacValue);
  float outV = 3.3f * ((float)dacValue / 255.0f);
  Serial.print("raw=");
  Serial.print(raw);
  Serial.print("  diff=");
  Serial.print(diff);
  Serial.print("  rawRel=");
  Serial.print(rawRel);
  Serial.print("  brake=");
  Serial.print(brakePct);
  Serial.print("%  FR=");
  Serial.print(fullRange);
  Serial.print("  Vout≈");
  Serial.print(outV, 3);
  Serial.print("V  invert=");
  Serial.println(invertBrake ? "ON" : "OFF");
}
mojibake_tokenizer.py
Python - General Purpose - 1.52 KB
#!/usr/bin/env python3
from __future__ import annotations

import re

MOJIBAKE_TOKEN_RE = re.compile(r"\u00ef\u00bb\u00bf|\s+|\w+|[^\w\s]", re.UNICODE)

MOJIBAKE_TOKEN_MARKERS = (
    "\u00ef\u00bb\u00bf",
    "\u00c3",
    "\u00c2",
    "\u00e2\u20ac",
    "\u00e2\u20ac\u2122",
    "\u00e2\u20ac\u0153",
    "\u00e2\u20ac\u009d",
    "\u00e2\u20ac\u201c",
    "\u00e2\u20ac\u201d",
    "\u00e2\u20ac\u00a2",
    "\u00e2\u201e\u00a2",
    "\u00e2\u201a\u00ac",
    "\u00c3\u0192",
    "\u00c6\u2019",
    "\u00c5\u00a1",
    "\u00f0\u0178",
    "\ufffd",
)

SINGLE_BAD_TOKENS = {
    "\u201a",
    "\u201e",
    "\u2026",
    "\u2020",
    "\u2021",
    "\u02c6",
    "\u2030",
    "\u2039",
    "\u203a",
    "\u0153",
    "\u017e",
    "\u0178",
}


def tokenize_mojibake(text: str) -> list[str]:
    return MOJIBAKE_TOKEN_RE.findall(text)


def is_mojibake_token(token: str) -> bool:
    if not token or token.isspace():
        return False
    if token in SINGLE_BAD_TOKENS:
        return True
    if any(marker in token for marker in MOJIBAKE_TOKEN_MARKERS):
        return True
    odd = sum(1 for ch in token if ch in "\u00c3\u00c2\u00e2\u00f0\u00c5\u00c6")
    if odd >= 2 and odd >= (len(token) // 2):
        return True
    return False


def remove_mojibake_tokens(text: str) -> tuple[str, int]:
    tokens = tokenize_mojibake(text)
    kept: list[str] = []
    removed = 0
    for token in tokens:
        if is_mojibake_token(token):
            removed += 1
            continue
        kept.append(token)
    return "".join(kept), removed
mp4convertor.html
HTML - Web and API Services - 2.12 KB
<!DOCTYPE html>
<html>
<head>
  <meta charset="utf-8">
  <title>MP4 → MP3</title>
</head>
<body>

<input type="file" accept="video/mp4">
<button>Convert to MP3</button>
<p id="status"></p>

<script src="https://cdn.jsdelivr.net/npm/lamejs@1.2.0/lame.min.js"></script>
<script>
const input = document.querySelector("input");
const button = document.querySelector("button");
const status = document.getElementById("status");

button.onclick = async () => {
  const file = input.files[0];
  if (!file) return;

  status.textContent = "Processing...";

  const arrayBuffer = await file.arrayBuffer();
  const audioCtx = new AudioContext();
  const decodedData = await audioCtx.decodeAudioData(arrayBuffer);

  const mp3Blob = encodeMP3(decodedData);
  download(mp3Blob, file.name.replace(".mp4", ".mp3"));

  status.textContent = "Conversion complete! File downloaded.";
};

function encodeMP3(audioBuffer) {
  const samples = audioBuffer.getChannelData(0); // Mono only
  const sampleRate = audioBuffer.sampleRate;
  const mp3encoder = new lamejs.Mp3Encoder(1, sampleRate, 128); // Mono, 128kbps
  const blockSize = 1152;
  const mp3Data = [];

  for (let i = 0; i < samples.length; i += blockSize) {
    const chunk = samples.subarray(i, i + blockSize);
    const buffer = floatTo16(chunk);
    const mp3buf = mp3encoder.encodeBuffer(buffer);
    if (mp3buf.length > 0) {
      mp3Data.push(mp3buf);
    }
  }

  const end = mp3encoder.flush();
  if (end.length > 0) {
    mp3Data.push(end);
  }

  return new Blob(mp3Data, { type: "audio/mp3" });
}

function floatTo16(float32Array) {
  const output = new Int16Array(float32Array.length);
  for (let i = 0; i < float32Array.length; i++) {
    const s = Math.max(-1, Math.min(1, float32Array[i]));
    output[i] = s < 0 ? s * 0x8000 : s * 0x7FFF;
  }
  return output;
}

function download(blob, filename) {
  const a = document.createElement("a");
  a.href = URL.createObjectURL(blob);
  a.download = filename;
  a.style.display = "none";
  document.body.appendChild(a);
  a.click();
  document.body.removeChild(a);
}
</script>

</body>
</html>
networknode.ino
Arduino - Arduino and Embedded Control - 17.16 KB
#include <Arduino.h>
#include <Wire.h>
#include <SPI.h>
#include <RadioLib.h>
#include <map>
#include <driver/gpio.h>
#define DEFAULT_LORA_MOSI 10
#define DEFAULT_LORA_MISO 11
#define DEFAULT_LORA_SCK 9
#define DEFAULT_LORA_NSS 8
#define DEFAULT_LORA_DIO1 14
#define DEFAULT_LORA_NRST 12
#define DEFAULT_LORA_BUSY 13
#define LOW_BATTERY_THRESHOLD 1.328 // Low battery voltage threshold (20%)
#define FULL_BATTERY_THRESHOLD 1.378 // Fully charged battery voltage threshold
#define CHARGING_VOLTAGE_THRESHOLD 1.369 // Voltage detected from solar panel
#define LEGAL_TX_POWER 22 // Set legal transmission power limit (example: 14 dBm)
#define SOLAR_CHARGE_LED_PIN 19  // Replace with the actual pin number for solar charging LED
#define SOLAR_DONE_LED_PIN 20    // Replace with the actual pin number for solar done LED
#define BATTERY_LED_1_PIN 21     // Replace with the actual pin number for battery level 1 LED
#define BATTERY_LED_2_PIN 26     // Replace with the actual pin number for battery level 2 LED
#define BATTERY_LED_3_PIN 48     // Replace with the actual pin number for battery level 3 LED
#define BATTERY_LED_4_PIN 47 

// LED pin definitions
#define GREEN_LED 6
#define YELLOW_LED 5
#define RED_LED 7

// Heltec Vext control and ADC control pins
#define Vext 21
#define ADC_CTRL 37
#define Read_VBAT_Voltage 1
#define ADC_READ_STABILIZE 10
#define NUM_BATTERY_READINGS 3
float batteryVoltages[NUM_BATTERY_READINGS];
int currentBatteryReadingIndex = 0;
// LoRa pin structure
struct LoRaPins {
  int mosi;
  int miso;
  int sck;
  int nss;
  int dio1;
  int nrst;
  int busy;
};

// Default LoRa pin settings
LoRaPins loRaPins = {
  .mosi = DEFAULT_LORA_MOSI,
  .miso = DEFAULT_LORA_MISO,
  .sck = DEFAULT_LORA_SCK,
  .nss = DEFAULT_LORA_NSS,
  .dio1 = DEFAULT_LORA_DIO1,
  .nrst = DEFAULT_LORA_NRST,
  .busy = DEFAULT_LORA_BUSY
};

uint8_t calculateChecksum(const String& message) {
  uint8_t checksum = 0;
  for (size_t i = 0; i < message.length(); i++) {
    checksum ^= message[i];
  }
  return checksum;
}

SPIClass* hspi = new SPIClass(HSPI);
Module* module = new Module(loRaPins.nss, loRaPins.dio1, loRaPins.nrst, loRaPins.busy);
SX1262 radio = SX1262(module);

String nodeID = "Node " + String(random(0xFFFF), HEX); // Unique node ID based on random value
String masterNodeID = ""; // Stores the current master node ID
bool isMaster = false; // Flag to check if this node is the master
unsigned long lastHeartbeatReceived = 0; // Unique node ID
String codeword = "1457101457"; // Shared codeword
unsigned long lastMessageSend = 0;
unsigned long lastHeartbeatSend = 0;
unsigned long lastNodeCheck = 0;
unsigned long lastRetry = 0;
float lastRSSI = 0;
bool isTransmitter = true; // Switch between transmitter and receiver
int retryCount = 0;
const int maxRetries = 3;

std::map<String, unsigned long> knownNodes; // Map to keep track of known nodes and their last communication time

// Data structure for sensor data
static const int sensorDataPayloadSize = 12;
union SensorDataPayload {
  uint8_t payloadByte[sensorDataPayloadSize];
  struct payloadContent {
    int16_t temperature;
    uint16_t pressure;
    uint16_t humidity;
    int16_t altitude;
    uint16_t light;
  } recorded;
};

void VextON(void) {
  pinMode(Vext, OUTPUT);
  digitalWrite(Vext, LOW);
}

void updateBatteryVoltages(float voltage) {
  batteryVoltages[currentBatteryReadingIndex] = voltage;
  currentBatteryReadingIndex = (currentBatteryReadingIndex + 1) % NUM_BATTERY_READINGS;
}
float readBatteryVoltage() {
  pinMode(ADC_CTRL, OUTPUT);
  digitalWrite(ADC_CTRL, LOW);
  delay(ADC_READ_STABILIZE); // let GPIO stabilize
  pinMode(Read_VBAT_Voltage, INPUT); // Set the analog pin as input
  int analogValue = analogRead(Read_VBAT_Voltage);
  float voltage = 0.00403532794741887 * analogValue;

  // Apply secondary scaling to map 4.2V to 1.6V
  float scaledVoltage = (voltage / 4.2) * 1.7;

  return scaledVoltage;
}

int getBatteryLevel() {
  int batteryLevel = 0;
  if (gpio_get_level((gpio_num_t)BATTERY_LED_1_PIN) == LOW) batteryLevel += 25;
  if (gpio_get_level((gpio_num_t)BATTERY_LED_2_PIN) == LOW) batteryLevel += 25;
  if (gpio_get_level((gpio_num_t)BATTERY_LED_3_PIN) == LOW) batteryLevel += 25;
  if (gpio_get_level((gpio_num_t)BATTERY_LED_4_PIN) == LOW) batteryLevel += 25;
  return batteryLevel;
}

float getAverageBatteryVoltage() {
  float sum = 0;
  for (int i = 0; i < NUM_BATTERY_READINGS; i++) {
    sum += batteryVoltages[i];
  }
  return sum / NUM_BATTERY_READINGS;
}

void updateKnownNodes(const String& sender) {
  unsigned long currentTime = millis();

  // Add or update the sender in the knownNodes map
  knownNodes[sender] = currentTime;

  // Remove nodes that have been inactive for more than 2 minutes (120000 milliseconds)
  for (auto it = knownNodes.begin(); it != knownNodes.end();) {
    if (currentTime - it->second > 120000) {
      it = knownNodes.erase(it);
    } else {
      ++it;
    }
  }
}

void sendMessage(const String& messageType, const String& target = "", const String& message = "", float voltage = -1) {
  String fullMessage = codeword + " " + nodeID + " " + messageType;
  if (!target.isEmpty()) {
    fullMessage += " " + target; // Append the target node ID if specified
  }
  if (!message.isEmpty()) {
    fullMessage += " " + message; // Append the message
  }
  if (voltage >= 0) {
    char voltageStr[10];
    snprintf(voltageStr, sizeof(voltageStr), "%.3f", voltage); // Format voltage to 3 decimal places
    fullMessage += " Voltage: " + String(voltageStr); // Append the formatted voltage
  }
  uint8_t checksum = calculateChecksum(fullMessage);
  fullMessage += " " + String(checksum, HEX); // Append the checksum

  int state = radio.transmit(fullMessage);
  if (state == RADIOLIB_ERR_NONE) {
    Serial.print(F("Message ("));
    Serial.print(messageType);
    Serial.println(F(") sent successfully"));
    digitalWrite(GREEN_LED, HIGH); // Turn on yellow LED for message sending
    delay(100); // Keep yellow LED on for 0.1 second
    digitalWrite(GREEN_LED, LOW); // Turn off yellow LED
    retryCount = 0; // Reset retry count after successful transmission
  } else {
    Serial.print(F("Failed to send message, code "));
    Serial.println(state);
    digitalWrite(RED_LED, HIGH); // Turn on red LED for error
    delay(100); // Keep red LED on for 1 second
    digitalWrite(RED_LED, LOW); // Turn off red LED
    retryCount++;
    if (retryCount < maxRetries) {
      lastRetry = millis();
    } else {
      Serial.println(F("Max retries reached. Abandoning message."));
      retryCount = 0;
    }
  }
}

void manageBattery() {
  // Average the last three voltage readings
  float voltageSum = 0;
  for (int i = 0; i < 3; i++) {
    voltageSum += readBatteryVoltage();
  }
  float averageVoltage = voltageSum / 3;

  Serial.print(F("Average Battery Voltage: "));
  Serial.println(averageVoltage, 3); // Print with 3 decimal places

  // Check the current battery level
  int batteryLevel = getBatteryLevel();

  Serial.print(F("Battery Level: "));
  Serial.print(batteryLevel);
  Serial.println(F("%"));

  if (batteryLevel <= 25) {
    Serial.println(F("Battery is low, preparing to enter light sleep..."));
    digitalWrite(RED_LED, HIGH); // Turn on red LED for low battery
    delay(100); // Keep red LED on for 1 second
    digitalWrite(RED_LED, LOW); // Turn off red LED

    String message = "low battery: " + String(averageVoltage, 3);
    sendMessage("Status", "", message, averageVoltage); // Send low battery message to other nodes with voltage

    // Small delay to stabilize the system before light sleep
    delay(1000);

    while (true) {
      // Enter light sleep to allow solar charging
      esp_sleep_enable_timer_wakeup(30 * 1000000); // Sleep for 30 seconds
      esp_light_sleep_start();

      // After waking up, check battery voltage again
      averageVoltage = 0;
      for (int i = 0; i < 3; i++) {
        averageVoltage += readBatteryVoltage();
      }
      averageVoltage /= 3;

      Serial.print(F("Waking up, Average Battery Voltage: "));
      Serial.println(averageVoltage, 3); // Print with 3 decimal places

      // Send heartbeat message before sleeping again
      sendMessage("Heartbeat", "", message, averageVoltage);

      // Check if charging voltage is detected
      bool solarChargeState = gpio_get_level((gpio_num_t)SOLAR_CHARGE_LED_PIN) == LOW;
      if (solarChargeState) {
        Serial.println(F("Charging voltage detected, entering light sleep until fully charged..."));

        // Small delay to stabilize the system before light sleep
        delay(1000);

        // Enter light sleep until battery is fully charged
        while (true) {
          esp_sleep_enable_timer_wakeup(30 * 1000000); // Sleep for 30 seconds
          esp_light_sleep_start();

          // After waking up, check battery voltage again
          averageVoltage = 0;
          for (int i = 0; i < 3; i++) {
            averageVoltage += readBatteryVoltage();
          }
          averageVoltage /= 3;

          Serial.print(F("Waking up, Average Battery Voltage: "));
          Serial.println(averageVoltage, 3); // Print with 3 decimal places

          // Send heartbeat message before sleeping again
          sendMessage("Heartbeat", "", message, averageVoltage);

          if (averageVoltage >= FULL_BATTERY_THRESHOLD) {
            Serial.println(F("Battery is fully charged, exiting light sleep..."));
            return; // Exit the light sleep loop
          }
        }
      }

      if (batteryLevel > 25) {
        break; // Exit the loop if battery level is sufficient
      }
    }
  }

  if (averageVoltage >= FULL_BATTERY_THRESHOLD) {
    Serial.println(F("Battery is fully charged, continuing normal operation..."));
  }

  bool solarDoneState = gpio_get_level((gpio_num_t)SOLAR_DONE_LED_PIN) == LOW;
  if (solarDoneState) {
    Serial.println(F("Charging detected, managing power..."));
  }
}

void logNetworkStatistics() {
  Serial.println(F("Network Statistics:"));
  Serial.print(F("Known nodes: "));
  Serial.println(knownNodes.size());
  Serial.print(F("Last RSSI: "));
  Serial.print(lastRSSI);
  Serial.println(F(" dBm"));
}

bool validateChecksum(const String& message, uint8_t receivedChecksum) {
  return calculateChecksum(message) == receivedChecksum;
}

void checkIncomingTransmission() {
  Serial.println(F("[SX1262] Preparing to wait for incoming transmission ..."));
  String str;
  manageBattery();
  unsigned long start = millis();
  unsigned long waitTime = random(100, 1000); // Listen for a random interval between 100 and 500 milliseconds
  bool messageReceived = false;

  while (millis() - start < waitTime) {
    int state = radio.receive(str, waitTime); // Check for message every 0.5 second
    if (state == RADIOLIB_ERR_NONE) {
      messageReceived = true;
      break;
    } else if (state == RADIOLIB_ERR_CRC_MISMATCH) {
      Serial.println(F("CRC error!"));
    }
    //delay(100); // Small delay to avoid excessive CPU usage
  }

  unsigned long end = millis();
  Serial.print(F("[SX1262] Time spent waiting: "));
  Serial.print(end - start);
  Serial.println(F(" ms"));

  if (messageReceived) {
    Serial.println(F("Message received successfully!"));
    Serial.print(F("[SX1262] Data: "));
    Serial.println(str);

    lastRSSI = radio.getRSSI();
    Serial.print(F("[SX1262] RSSI: "));
    Serial.print(lastRSSI);
    Serial.println(F(" dBm"));

    Serial.print(F("[SX1262] SNR: "));
    Serial.print(radio.getSNR());
    Serial.println(F(" dB"));

    digitalWrite(YELLOW_LED, HIGH); // Turn on yellow LED for successful reception
    delay(100); // Keep yellow LED on for 0.1 second
    digitalWrite(YELLOW_LED, LOW); // Turn off yellow LED

    // Parsing the received message
    int firstSpace = str.indexOf(' ');
    int secondSpace = str.indexOf(' ', firstSpace + 1);
    int thirdSpace = str.indexOf(' ', secondSpace + 1);
    int fourthSpace = str.indexOf(' ', thirdSpace + 1);
    int fifthSpace = str.lastIndexOf(' ');

    if (firstSpace != -1 && secondSpace != -1 && thirdSpace != -1 && fourthSpace != -1 && fifthSpace != -1) {
      String sender = str.substring(firstSpace + 1, secondSpace);
      String messageType = str.substring(secondSpace + 1, thirdSpace);
      String messageContent = str.substring(thirdSpace + 1, fourthSpace);
      String voltageStr = str.substring(fourthSpace + 1, fifthSpace);
      uint8_t receivedChecksum = str.substring(fifthSpace + 1).toInt();

      Serial.print(F("Received from: "));
      Serial.println(sender);
      Serial.print(F("Message Type: "));
      Serial.println(messageType);
      Serial.print(F("Message Content: "));
      Serial.println(messageContent);
      Serial.print(F("Voltage: "));
      Serial.println(voltageStr);

      // Validate checksum
      if (validateChecksum(codeword + " " + sender + " " + messageType + " " + messageContent + " " + voltageStr, receivedChecksum)) {
        // Add or update the sender in the knownNodes map
        knownNodes[sender] = millis();

        if (messageType == "Ping") {
          // Respond to Ping with Pong
          sendMessage("Pong", sender);
        } else if (messageType == "Pong") {
          // Respond to Pong with Ping
          sendMessage("Ping", sender);
        } else if (messageType == "Hello") {
          if (masterNodeID == "") {
            masterNodeID = sender;
            isMaster = false;
            sendMessage("Master", sender); // Announce self as master
          }
        } else if (messageType == "Master") {
          masterNodeID = sender;
          isMaster = false;
        } else if (messageType == "Heartbeat") {
          if (sender == masterNodeID) {
            lastHeartbeatReceived = millis();
          }
        } else if (messageType.startsWith("Master:")) {
          int targetStart = messageType.indexOf(' ', 7) + 1;
          String target = messageType.substring(targetStart, messageType.indexOf(' ', targetStart));
          String data = messageType.substring(messageType.indexOf(' ', targetStart) + 1);

          if (target != nodeID) {
            sendMessage("Master: " + data, target);
          } else {
            Serial.print(F("Data received for this node: "));
            Serial.println(data);
          }
        } else {
          Serial.print(F("Message: "));
          Serial.println(messageType);
        }
      } else {
        Serial.print(F("Invalid checksum: "));
        Serial.println(str.substring(fifthSpace + 1));
      }
    } else {
      Serial.print(F("Malformed message: "));
      Serial.println(str);
    }
  } else {
    Serial.println(F("timeout!"));
  }
}

void setup() {
  Serial.begin(9600);

  // Initialize LED pins
  pinMode(GREEN_LED, OUTPUT);
  pinMode(YELLOW_LED, OUTPUT);
  pinMode(RED_LED, OUTPUT);

  // Turn on Vext
  VextON();

  // Configure battery LED pins as inputs with pull-up resistors
  pinMode(BATTERY_LED_1_PIN, INPUT_PULLUP);
  pinMode(BATTERY_LED_2_PIN, INPUT_PULLUP);
  pinMode(BATTERY_LED_3_PIN, INPUT_PULLUP);
  pinMode(BATTERY_LED_4_PIN, INPUT_PULLUP);
  
  // Configure solar LED pins as inputs
  pinMode(SOLAR_CHARGE_LED_PIN, INPUT_PULLUP);
  pinMode(SOLAR_DONE_LED_PIN, INPUT_PULLUP);

  Serial.print(F("[SX1262] Initializing ... "));

  // Initialize LoRa pins
  hspi->begin(loRaPins.sck, loRaPins.miso, loRaPins.mosi, loRaPins.nss);

  int state = radio.begin();
  if (state == RADIOLIB_ERR_NONE) {
    Serial.println(F("success!"));
    radio.setOutputPower(LEGAL_TX_POWER);

    nodeID += String(random(0xFFFF), HEX); // Unique node ID based on random value
    Serial.print(F("Node ID: "));
    Serial.println(nodeID);
    Serial.print(F("Codeword: "));
    Serial.println(codeword);

    digitalWrite(GREEN_LED, HIGH); // Turn on green LED for successful initialization
    delay(100); // Keep green LED on for 1 second
    digitalWrite(GREEN_LED, LOW); // Turn off green LED
  } else {
    Serial.print(F("failed, code "));
    Serial.println(state);
    digitalWrite(RED_LED, HIGH);
    digitalWrite(RED_LED, LOW); // Turn on red LED for initialization failure
    while (true);
  }
}


void loop() {
  unsigned long currentTime = millis();

  if (retryCount > 0 && currentTime - lastRetry > 2000) {
    float batteryVoltage = readBatteryVoltage();
    sendMessage("Ping", "", "", batteryVoltage);
    return;
  }

  if (isTransmitter) {
    if (currentTime - lastMessageSend > 100) {
      lastMessageSend = currentTime;
      float batteryVoltage = readBatteryVoltage();
      sendMessage("Ping", "", "", batteryVoltage); // Include the battery voltage
      isTransmitter = false; // Switch to receiver mode
    }
  } else {
    checkIncomingTransmission();
    isTransmitter = true; // Switch to transmitter mode
  }

  if (currentTime - lastHeartbeatSend > 10000) { // Send heartbeat every 10 seconds
    lastHeartbeatSend = currentTime;
    float batteryVoltage = readBatteryVoltage();
    sendMessage("Heartbeat", "", "", batteryVoltage);
  }

  if (!isMaster && currentTime - lastHeartbeatReceived > 30000) { // If no heartbeat for 30 seconds
    masterNodeID = "";
    sendMessage("Hello", "", nodeID); // Reinitiate election
  }

  static unsigned long lastBatteryManagementTime = 0;
  if (currentTime - lastBatteryManagementTime > 30000) { // Manage battery every 30 seconds
    lastBatteryManagementTime = currentTime;
    manageBattery();
  }

  static unsigned long lastLogTime = 0;
  if (currentTime - lastLogTime > 60000) { // Log network statistics every 60 seconds
    lastLogTime = currentTime;
    logNetworkStatistics();
  }
}
postag.py
Python - Automation and Scripting - 3.19 KB
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import re
from typing import Iterable

WORD_PATTERN = re.compile(r"[A-Za-z0-9]+(?:'[A-Za-z0-9]+)?")
TOKEN_PATTERN = re.compile(r"[A-Za-z0-9]+(?:'[A-Za-z0-9]+)?|[^\w\s]", re.UNICODE)

DETERMINERS = {"a", "an", "the", "this", "that", "these", "those", "my", "your", "his", "her", "its", "our", "their"}
PRONOUNS = {"i", "you", "he", "she", "it", "we", "they", "me", "him", "her", "us", "them"}
PREPOSITIONS = {"in", "on", "at", "by", "for", "from", "to", "with", "without", "over", "under", "between", "through"}
CONJUNCTIONS = {"and", "or", "but", "nor", "so", "yet", "because", "while", "although"}
AUXILIARIES = {"am", "is", "are", "was", "were", "be", "been", "being", "do", "does", "did", "have", "has", "had", "can", "could", "will", "would", "should", "may", "might", "must"}
COMMON_VERBS = {"run", "walk", "write", "build", "make", "use", "read", "send", "load", "save", "open", "close", "parse", "tokenize", "detokenize", "test"}
COMMON_ADJECTIVES = {"fast", "slow", "large", "small", "strong", "clean", "simple", "complex", "new", "old"}


class POSTagger:
    """Lightweight rule-based POS tagger for fast local workflows."""

    def tag_word(self, word: str) -> str:
        raw = (word or "").strip()
        if not raw:
            return "X"

        if re.fullmatch(r"[^\w\s]", raw):
            return "PUNCT"

        lower = raw.lower()

        if lower.isdigit():
            return "NUM"
        if lower in DETERMINERS:
            return "DET"
        if lower in PRONOUNS:
            return "PRON"
        if lower in PREPOSITIONS:
            return "ADP"
        if lower in CONJUNCTIONS:
            return "CONJ"
        if lower in AUXILIARIES:
            return "AUX"
        if lower in COMMON_VERBS:
            return "VERB"
        if lower in COMMON_ADJECTIVES:
            return "ADJ"

        if lower.endswith("ly"):
            return "ADV"
        if lower.endswith(("ing", "ed", "ize", "ise")):
            return "VERB"
        if lower.endswith(("ous", "ful", "able", "ible", "ive", "al", "ic", "ary")):
            return "ADJ"
        if lower.endswith(("ness", "ment", "tion", "sion", "ity", "ship", "age", "ism")):
            return "NOUN"

        return "NOUN"

    def tag_words(self, words: Iterable[str]) -> list[tuple[str, str]]:
        return [(word, self.tag_word(word)) for word in words]

    def tag_text(self, text: str) -> list[tuple[str, str]]:
        rows: list[tuple[str, str]] = []
        for token in TOKEN_PATTERN.findall(text or ""):
            if WORD_PATTERN.fullmatch(token):
                rows.append((token, self.tag_word(token)))
            else:
                rows.append((token, "PUNCT"))
        return rows


def _parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Simple POS tagger.")
    parser.add_argument("--text", default="")
    return parser.parse_args()


def main() -> int:
    args = _parse_args()
    if not args.text:
        print("Provide --text to POS tag.")
        return 0

    tagger = POSTagger()
    for token, pos in tagger.tag_text(args.text):
        print(f"{token}\t{pos}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
tokenizer.py
Python - Data Parsing and Analytics - 8.02 KB
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import re
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable

from postag import POSTagger

SPACE_TOKEN = "space"
NEWLINE_TOKEN = "newline"
TAB_TOKEN = "tab"
POS_TOKEN_PREFIX = "__POS_"
POS_TOKEN_SUFFIX = "__"

WORD_PATTERN = re.compile(r"[A-Za-z0-9]+(?:'[A-Za-z0-9]+)?")
PIECE_PATTERN = re.compile(r"[A-Za-z0-9]+(?:'[A-Za-z0-9]+)?|[^\w\s]", re.UNICODE)
SEGMENT_PATTERN = re.compile(r"\S+|\s+", re.UNICODE)


@dataclass(frozen=True)
class EncodedToken:
    token: str
    token_id: int


class Tokenizer:
    """Character + whole-word tokenizer with optional POS tagging.

    Example for "hello":
    h -> 1, e -> 2, l -> 3, o -> 4, hello -> 5
    and then a space token can be 6 when a space is encountered.
    """

    def __init__(
        self,
        corpus_path: str | Path | None = None,
        *,
        lowercase: bool = True,
        include_word_token: bool = True,
        include_pos: bool = False,
        space_token: str = SPACE_TOKEN,
        postagger: POSTagger | None = None,
    ) -> None:
        self.lowercase = lowercase
        self.include_word_token = include_word_token
        self.include_pos = include_pos
        self.space_token = space_token
        self.postagger = postagger or POSTagger()

        self.token_to_id: dict[str, int] = {}
        self.id_to_token: dict[int, str] = {}
        self.token_counts: Counter[str] = Counter()
        self.next_id = 1

        self.corpus_path = Path(corpus_path) if corpus_path else None
        if self.corpus_path and self.corpus_path.exists():
            self.load_corpus(self.corpus_path)

    def _normalize(self, token: str) -> str:
        if not token:
            return ""
        return token.lower() if self.lowercase else token

    def _register_token(self, token: str) -> int:
        cleaned = self._normalize(token).strip()
        if not cleaned:
            return 0

        existing = self.token_to_id.get(cleaned)
        if existing is not None:
            self.token_counts[cleaned] += 1
            return existing

        token_id = self.next_id
        self.next_id += 1
        self.token_to_id[cleaned] = token_id
        self.id_to_token[token_id] = cleaned
        self.token_counts[cleaned] += 1
        return token_id

    def _emit(self, token: str, output: list[EncodedToken]) -> None:
        token_id = self._register_token(token)
        if token_id:
            output.append(EncodedToken(token=self._normalize(token), token_id=token_id))

    def _emit_whitespace(self, chunk: str, output: list[EncodedToken]) -> None:
        for char in chunk:
            if char == " ":
                self._emit(self.space_token, output)
            elif char == "\n":
                self._emit(NEWLINE_TOKEN, output)
            elif char == "\t":
                self._emit(TAB_TOKEN, output)
            else:
                self._emit(self.space_token, output)

    def _emit_word(self, word: str, output: list[EncodedToken], include_pos: bool) -> None:
        normalized_word = self._normalize(word)
        if not normalized_word:
            return

        for char in normalized_word:
            self._emit(char, output)

        if self.include_word_token:
            self._emit(normalized_word, output)

        if include_pos:
            pos_tag = self.postagger.tag_word(normalized_word)
            self._emit(f"{POS_TOKEN_PREFIX}{pos_tag}{POS_TOKEN_SUFFIX}", output)

    def encode(self, text: str, include_pos: bool | None = None) -> list[EncodedToken]:
        include_pos_value = self.include_pos if include_pos is None else include_pos
        output: list[EncodedToken] = []

        for segment in SEGMENT_PATTERN.findall(text or ""):
            if segment.isspace():
                self._emit_whitespace(segment, output)
                continue

            pieces = PIECE_PATTERN.findall(segment)
            if not pieces:
                continue

            for piece in pieces:
                if WORD_PATTERN.fullmatch(piece):
                    self._emit_word(piece, output, include_pos=include_pos_value)
                else:
                    self._emit(piece, output)

        return output

    def encode_words(self, words: Iterable[str], include_pos: bool | None = None) -> list[EncodedToken]:
        joined = " ".join(str(word) for word in words)
        return self.encode(joined, include_pos=include_pos)

    def decode(self, encoded_tokens: Iterable[str | int | EncodedToken | tuple[Any, ...] | dict[str, Any]]) -> str:
        from detokenizer import Detokenizer

        detok = Detokenizer(tokenizer=self)
        return detok.detokenize(encoded_tokens)

    def token_pairs(self) -> list[tuple[str, int, int]]:
        rows: list[tuple[str, int, int]] = []
        for token, token_id in sorted(self.token_to_id.items(), key=lambda row: row[1]):
            rows.append((token, token_id, int(self.token_counts.get(token, 0))))
        return rows

    def save_corpus(self, path: str | Path | None = None) -> Path:
        target = Path(path) if path else self.corpus_path
        if target is None:
            raise ValueError("No corpus path provided.")

        target.parent.mkdir(parents=True, exist_ok=True)
        payload = {
            "next_id": self.next_id,
            "token_to_id": self.token_to_id,
            "token_counts": dict(self.token_counts),
            "config": {
                "lowercase": self.lowercase,
                "include_word_token": self.include_word_token,
                "include_pos": self.include_pos,
                "space_token": self.space_token,
            },
        }
        target.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
        self.corpus_path = target
        return target

    def load_corpus(self, path: str | Path | None = None) -> None:
        target = Path(path) if path else self.corpus_path
        if target is None or not target.exists():
            return

        payload = json.loads(target.read_text(encoding="utf-8"))
        loaded = payload.get("token_to_id", {})
        if not isinstance(loaded, dict):
            return

        self.token_to_id = {}
        self.id_to_token = {}
        for token, token_id in loaded.items():
            token_text = self._normalize(str(token))
            token_int = int(token_id)
            self.token_to_id[token_text] = token_int
            self.id_to_token[token_int] = token_text

        counts = payload.get("token_counts", {})
        if isinstance(counts, dict):
            self.token_counts = Counter({self._normalize(str(k)): int(v) for k, v in counts.items()})
        else:
            self.token_counts = Counter()

        self.next_id = int(payload.get("next_id", max(self.id_to_token.keys(), default=0) + 1))
        self.corpus_path = target


def encode_to_pairs(text: str, tokenizer: Tokenizer | None = None, include_pos: bool = False) -> list[tuple[str, int]]:
    tk = tokenizer or Tokenizer()
    return [(row.token, row.token_id) for row in tk.encode(text, include_pos=include_pos)]


def _parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Character+word tokenizer with optional POS tagging.")
    parser.add_argument("--text", default="")
    parser.add_argument("--corpus", default="")
    parser.add_argument("--pos", action="store_true", help="Include POS tag tokens.")
    parser.add_argument("--save", action="store_true", help="Save updated corpus after encoding.")
    return parser.parse_args()


def main() -> int:
    args = _parse_args()
    corpus = Path(args.corpus) if args.corpus else None
    tokenizer = Tokenizer(corpus_path=corpus)

    if not args.text:
        print("Provide --text to tokenize.")
        return 0

    encoded = tokenizer.encode(args.text, include_pos=args.pos)
    for row in encoded:
        print(f"{row.token}\t{row.token_id}")

    if args.save and corpus:
        tokenizer.save_corpus(corpus)
        print(f"Saved corpus: {corpus}")

    return 0


if __name__ == "__main__":
    raise SystemExit(main())