Initial commit — Elden Ring RL agent
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
20
.gitignore
vendored
Normal file
20
.gitignore
vendored
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
# Python venv contents
|
||||||
|
bin/
|
||||||
|
include/
|
||||||
|
lib/
|
||||||
|
lib64
|
||||||
|
share/
|
||||||
|
pyvenv.cfg
|
||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
|
||||||
|
# Trained model artifacts
|
||||||
|
elden_ai_model*
|
||||||
|
*.zip
|
||||||
|
|
||||||
|
# Named pipe for the vision pipeline
|
||||||
|
/tmp/elden_pipe
|
||||||
|
|
||||||
|
# Misc
|
||||||
|
permission_test.txt
|
||||||
|
.claude/
|
||||||
95
elden_env.py
Normal file
95
elden_env.py
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
import gymnasium as gym
|
||||||
|
from gymnasium import spaces
|
||||||
|
import numpy as np
|
||||||
|
from vision_agent import EldenVision
|
||||||
|
from input_driver import EldenController
|
||||||
|
import time
|
||||||
|
from evdev import ecodes as e
|
||||||
|
|
||||||
|
class EldenRingEnv(gym.Env):
|
||||||
|
def __init__(self):
|
||||||
|
super(EldenRingEnv, self).__init__()
|
||||||
|
self.vision = EldenVision(stack_size=4)
|
||||||
|
self.controller = EldenController()
|
||||||
|
self.observation_space = spaces.Box(low=0, high=255, shape=(640, 640, 12), dtype=np.uint8)
|
||||||
|
self.action_space = spaces.MultiDiscrete([4, 5, 3, 3, 2])
|
||||||
|
|
||||||
|
self.last_p_hp = 100.0
|
||||||
|
self.last_b_hp = 100.0
|
||||||
|
self.frame_skip = 4
|
||||||
|
|
||||||
|
def step(self, actions):
|
||||||
|
combat, move, cam_x, cam_y, switch = actions
|
||||||
|
if switch == 1: self.controller.switch_item()
|
||||||
|
if combat == 1: self.controller.dodge()
|
||||||
|
elif combat == 2: self.controller.attack()
|
||||||
|
elif combat == 3: self.controller.use_item()
|
||||||
|
self.controller.move(move)
|
||||||
|
|
||||||
|
mx = -50 if cam_x == 1 else (50 if cam_x == 2 else 0)
|
||||||
|
my = -50 if cam_y == 1 else (50 if cam_y == 2 else 0)
|
||||||
|
if mx != 0 or my != 0: self.controller.look(mx, my)
|
||||||
|
|
||||||
|
for _ in range(self.frame_skip):
|
||||||
|
obs, hp, fp, sp, boss = self.vision.get_state()
|
||||||
|
|
||||||
|
reward = 0.01
|
||||||
|
if combat == 3 and hp > 90: reward -= 5.0
|
||||||
|
if hp < self.last_p_hp: reward -= 20.0
|
||||||
|
if boss < self.last_b_hp: reward += 10.0 * ((self.last_b_hp - boss) / 5.0)
|
||||||
|
|
||||||
|
self.last_p_hp, self.last_b_hp = hp, boss
|
||||||
|
done = hp <= 0 or (boss <= 0 and self.last_b_hp > 10)
|
||||||
|
return obs, reward, done, False, {}
|
||||||
|
|
||||||
|
def reset(self, seed=None, options=None):
|
||||||
|
super().reset(seed=seed)
|
||||||
|
print(f"\n[NAV] DEATH DETECTED. Waiting 25s for Respawn...")
|
||||||
|
time.sleep(25)
|
||||||
|
|
||||||
|
print("[NAV] Running to Maliketh Fog Gate...")
|
||||||
|
|
||||||
|
# 1. Exit Grace Room
|
||||||
|
self.controller.move(1) # W
|
||||||
|
time.sleep(2.0)
|
||||||
|
|
||||||
|
# 2. Turn Right to stairs
|
||||||
|
self.controller.look(1800, 0)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# 3. Run up the stairs
|
||||||
|
self.controller.move(1)
|
||||||
|
time.sleep(4.5)
|
||||||
|
|
||||||
|
# 4. Turn Left to bridge
|
||||||
|
self.controller.look(-1600, 0)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# 5. Sprint across bridge and look for fog
|
||||||
|
print("[NAV] Sprinting across bridge...")
|
||||||
|
start_nav = time.time()
|
||||||
|
found_fog = False
|
||||||
|
|
||||||
|
while time.time() - start_nav < 25: # 25s timeout for navigation
|
||||||
|
with self.vision.lock:
|
||||||
|
hsv = self.vision.frame_hsv.copy()
|
||||||
|
|
||||||
|
if self.vision.detect_fog(hsv):
|
||||||
|
print("[NAV] FOG DETECTED! Entering Arena...")
|
||||||
|
self.controller.move(0)
|
||||||
|
time.sleep(0.2)
|
||||||
|
self.controller.press(e.KEY_E) # Interact
|
||||||
|
time.sleep(5.0) # Extra safety for entry animation
|
||||||
|
found_fog = True
|
||||||
|
break
|
||||||
|
|
||||||
|
self.controller.move(1) # Keep running forward
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
if not found_fog:
|
||||||
|
print("[NAV] Warning: Fog not found. Sentinel might have blocked us.")
|
||||||
|
|
||||||
|
self.controller.move(0)
|
||||||
|
obs, hp, fp, sp, boss = self.vision.get_state()
|
||||||
|
self.last_p_hp, self.last_b_hp = hp, boss
|
||||||
|
return obs, {}
|
||||||
87
find_ui_elements.py
Normal file
87
find_ui_elements.py
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import re
|
||||||
|
from vision_agent import EldenVision
|
||||||
|
|
||||||
|
class UISelector:
|
||||||
|
def __init__(self):
|
||||||
|
self.vision = EldenVision(0)
|
||||||
|
self.window_name = "FLUID CALIBRATOR: TAB=Switch, R=Mode, S=SAVE, Q=Quit"
|
||||||
|
cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
|
||||||
|
cv2.resizeWindow(self.window_name, 1280, 1280)
|
||||||
|
cv2.setMouseCallback(self.window_name, self.on_mouse)
|
||||||
|
|
||||||
|
self.rois = [
|
||||||
|
list(self.vision.player_hp_roi),
|
||||||
|
list(self.vision.player_fp_roi),
|
||||||
|
list(self.vision.player_sp_roi),
|
||||||
|
list(self.vision.boss_hp_roi)
|
||||||
|
]
|
||||||
|
self.names = ["HP (Red)", "FP (Blue)", "SP (Green)", "BOSS"]
|
||||||
|
self.colors = [(255, 0, 0), (255, 255, 0), (0, 255, 255), (0, 255, 0)]
|
||||||
|
self.types = ['red', 'blue', 'green', 'red']
|
||||||
|
|
||||||
|
self.active_idx = 0
|
||||||
|
self.mode = 'MOVE'
|
||||||
|
self.dragging = False
|
||||||
|
self.start_mouse = (0, 0)
|
||||||
|
self.start_roi = []
|
||||||
|
|
||||||
|
def on_mouse(self, event, x, y, flags, param):
|
||||||
|
if event == cv2.EVENT_LBUTTONDOWN:
|
||||||
|
self.dragging = True
|
||||||
|
self.start_mouse = (x, y)
|
||||||
|
self.start_roi = list(self.rois[self.active_idx])
|
||||||
|
elif event == cv2.EVENT_MOUSEMOVE:
|
||||||
|
if not self.dragging: return
|
||||||
|
dx, dy = x - self.start_mouse[0], y - self.start_mouse[1]
|
||||||
|
roi = self.rois[self.active_idx]
|
||||||
|
if self.mode == 'MOVE':
|
||||||
|
roi[0], roi[1], roi[2], roi[3] = self.start_roi[0]+dy, self.start_roi[1]+dy, self.start_roi[2]+dx, self.start_roi[3]+dx
|
||||||
|
else:
|
||||||
|
roi[1], roi[3] = self.start_roi[1]+dy, self.start_roi[3]+dx
|
||||||
|
elif event == cv2.EVENT_LBUTTONUP:
|
||||||
|
self.dragging = False
|
||||||
|
|
||||||
|
def save_to_code(self):
|
||||||
|
file_path = "vision_agent.py"
|
||||||
|
with open(file_path, 'r') as f: content = f.read()
|
||||||
|
# Regex update to preserve logic but change coordinates
|
||||||
|
content = re.sub(r'self.player_hp_roi = \(.*?\)', f'self.player_hp_roi = {tuple(self.rois[0])}', content)
|
||||||
|
content = re.sub(r'self.player_fp_roi = \(.*?\)', f'self.player_fp_roi = {tuple(self.rois[1])}', content)
|
||||||
|
content = re.sub(r'self.player_sp_roi = \(.*?\)', f'self.player_sp_roi = {tuple(self.rois[2])}', content)
|
||||||
|
content = re.sub(r'self.boss_hp_roi = \(.*?\)', f'self.boss_hp_roi = {tuple(self.rois[3])}', content)
|
||||||
|
with open(file_path, 'w') as f: f.write(content)
|
||||||
|
print(f"\n[!!!] COORDINATES UPDATED in {file_path}")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
obs, hp, fp, sp, boss = self.vision.get_state()
|
||||||
|
# AI sees RGB, but OpenCV needs BGR for imshow
|
||||||
|
frame_rgb = obs[:, :, -3:].copy()
|
||||||
|
frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
|
||||||
|
|
||||||
|
# HUD
|
||||||
|
draw = frame_bgr
|
||||||
|
cv2.putText(draw, f"ACTIVE: {self.names[self.active_idx]} ({self.mode})", (10, 30), 0, 0.7, self.colors[self.active_idx], 2)
|
||||||
|
|
||||||
|
for i, roi in enumerate(self.rois):
|
||||||
|
cv2.rectangle(draw, (roi[2], roi[0]), (roi[3], roi[1]), self.colors[i], 4 if i==self.active_idx else 1)
|
||||||
|
# We can use the RGB frame for percent check, but color_type expects BGR-style logic or we adjust it
|
||||||
|
# Actually, vision_agent.get_bar_percent uses HSV, which we pre-calculate.
|
||||||
|
# For the tool, we just show the numbers returned by get_state.
|
||||||
|
|
||||||
|
cv2.putText(draw, f"HP: {hp:.0f}% FP: {fp:.0f}% SP: {sp:.0f}%", (10, 610), 0, 0.6, (255, 255, 255), 2)
|
||||||
|
|
||||||
|
cv2.imshow(self.window_name, draw)
|
||||||
|
key = cv2.waitKey(1) & 0xFF
|
||||||
|
if key == ord('q'): break
|
||||||
|
elif key == ord('r'): self.mode = 'RESIZE' if self.mode == 'MOVE' else 'MOVE'
|
||||||
|
elif key == ord('\t'): self.active_idx = (self.active_idx + 1) % 4
|
||||||
|
elif key == ord('s'): self.save_to_code()
|
||||||
|
|
||||||
|
self.vision.stop()
|
||||||
|
cv2.destroyAllWindows()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
UISelector().run()
|
||||||
40
input_driver.py
Normal file
40
input_driver.py
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
from evdev import UInput, ecodes as e
|
||||||
|
import time
|
||||||
|
|
||||||
|
class EldenController:
|
||||||
|
def __init__(self):
|
||||||
|
self.cap = {
|
||||||
|
e.EV_KEY: [
|
||||||
|
e.KEY_W, e.KEY_A, e.KEY_S, e.KEY_D,
|
||||||
|
e.KEY_SPACE, e.KEY_E, e.KEY_R, e.KEY_Q, e.KEY_DOWN,
|
||||||
|
e.BTN_LEFT, e.BTN_RIGHT
|
||||||
|
],
|
||||||
|
e.EV_REL: [e.REL_X, e.REL_Y]
|
||||||
|
}
|
||||||
|
self.ui = UInput(self.cap, name='EldenAI-DualController')
|
||||||
|
|
||||||
|
def move(self, direction):
|
||||||
|
for key in [e.KEY_W, e.KEY_A, e.KEY_S, e.KEY_D]:
|
||||||
|
self.ui.write(e.EV_KEY, key, 0)
|
||||||
|
if direction == 1: self.ui.write(e.EV_KEY, e.KEY_W, 1)
|
||||||
|
elif direction == 2: self.ui.write(e.EV_KEY, e.KEY_S, 1)
|
||||||
|
elif direction == 3: self.ui.write(e.EV_KEY, e.KEY_A, 1)
|
||||||
|
elif direction == 4: self.ui.write(e.EV_KEY, e.KEY_D, 1)
|
||||||
|
self.ui.syn()
|
||||||
|
|
||||||
|
def look(self, x, y):
|
||||||
|
self.ui.write(e.EV_REL, e.REL_X, int(x))
|
||||||
|
self.ui.write(e.EV_REL, e.REL_Y, int(y))
|
||||||
|
self.ui.syn()
|
||||||
|
|
||||||
|
def dodge(self): self.press(e.KEY_SPACE)
|
||||||
|
def attack(self): self.press(e.BTN_LEFT)
|
||||||
|
def use_item(self): self.press(e.KEY_R)
|
||||||
|
def switch_item(self): self.press(e.KEY_DOWN)
|
||||||
|
|
||||||
|
def press(self, key, duration=0.05):
|
||||||
|
self.ui.write(e.EV_KEY, key, 1)
|
||||||
|
self.ui.syn()
|
||||||
|
time.sleep(duration)
|
||||||
|
self.ui.write(e.EV_KEY, key, 0)
|
||||||
|
self.ui.syn()
|
||||||
62
install_dependencies.sh
Executable file
62
install_dependencies.sh
Executable file
@@ -0,0 +1,62 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
set -e
|
||||||
|
|
||||||
|
echo "=== Elden Ring AI — Dependency Installer ==="
|
||||||
|
|
||||||
|
# ── 1. System packages ────────────────────────────────────────────────────────
|
||||||
|
echo "[1/4] Installing system packages..."
|
||||||
|
sudo pacman -S --needed --noconfirm \
|
||||||
|
base-devel \
|
||||||
|
linux-headers \
|
||||||
|
v4l2loopback-dkms \
|
||||||
|
gpu-screen-recorder \
|
||||||
|
python \
|
||||||
|
python-pip
|
||||||
|
|
||||||
|
# ── 2. Persistent v4l2loopback config ─────────────────────────────────────────
|
||||||
|
echo "[2/4] Configuring v4l2loopback..."
|
||||||
|
echo 'v4l2loopback' | sudo tee /etc/modules-load.d/v4l2loopback.conf > /dev/null
|
||||||
|
echo 'options v4l2loopback devices=1 video_nr=2 card_label="EldenVision" exclusive_caps=1' \
|
||||||
|
| sudo tee /etc/modprobe.d/v4l2loopback.conf > /dev/null
|
||||||
|
sudo modprobe v4l2loopback devices=1 video_nr=2 card_label="EldenVision" exclusive_caps=1 2>/dev/null || true
|
||||||
|
|
||||||
|
# Add user to input group (needed for evdev/uinput)
|
||||||
|
sudo usermod -aG input "$USER"
|
||||||
|
echo " NOTE: Log out and back in for the input group to take effect."
|
||||||
|
|
||||||
|
# ── 3. Python venv ────────────────────────────────────────────────────────────
|
||||||
|
echo "[3/4] Setting up Python virtual environment..."
|
||||||
|
if [ ! -f "bin/activate" ]; then
|
||||||
|
python -m venv .
|
||||||
|
fi
|
||||||
|
# shellcheck disable=SC1091
|
||||||
|
source bin/activate
|
||||||
|
|
||||||
|
# ── 4. Python packages ────────────────────────────────────────────────────────
|
||||||
|
echo "[4/4] Installing Python packages..."
|
||||||
|
|
||||||
|
# Detect CUDA version from driver and map to a PyTorch wheel tag
|
||||||
|
CUDA_RAW=$(nvidia-smi 2>/dev/null | grep -oP "CUDA Version: \K[\d.]+" || echo "0")
|
||||||
|
CUDA_MAJOR=$(echo "$CUDA_RAW" | cut -d. -f1)
|
||||||
|
CUDA_MINOR=$(echo "$CUDA_RAW" | cut -d. -f2)
|
||||||
|
CUDA_NUM=$((CUDA_MAJOR * 10 + CUDA_MINOR)) # e.g. 13.2 → 132
|
||||||
|
|
||||||
|
if [ "$CUDA_NUM" -ge 130 ]; then TORCH_CU="cu130"
|
||||||
|
elif [ "$CUDA_NUM" -ge 124 ]; then TORCH_CU="cu124"
|
||||||
|
elif [ "$CUDA_NUM" -ge 121 ]; then TORCH_CU="cu121"
|
||||||
|
elif [ "$CUDA_NUM" -ge 118 ]; then TORCH_CU="cu118"
|
||||||
|
else
|
||||||
|
echo " WARNING: No supported CUDA found, installing CPU-only PyTorch."
|
||||||
|
TORCH_CU="cpu"
|
||||||
|
fi
|
||||||
|
|
||||||
|
TORCH_INDEX="https://download.pytorch.org/whl/${TORCH_CU}"
|
||||||
|
echo " CUDA ${CUDA_RAW} detected → using index: ${TORCH_INDEX}"
|
||||||
|
|
||||||
|
pip install --upgrade pip
|
||||||
|
pip install torch torchvision --index-url "$TORCH_INDEX"
|
||||||
|
pip install -r requirements.txt
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "=== Done! Activate the venv with: source bin/activate ==="
|
||||||
|
echo "=== Then start training with: python train.py ==="
|
||||||
9
requirements.txt
Normal file
9
requirements.txt
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
# PyTorch is installed separately by install_dependencies.sh
|
||||||
|
# because it needs a CUDA-specific index URL.
|
||||||
|
|
||||||
|
gymnasium==1.2.3
|
||||||
|
stable-baselines3==2.8.0
|
||||||
|
opencv-python==4.13.0.92
|
||||||
|
evdev==1.9.3
|
||||||
|
numpy==2.4.4
|
||||||
|
shimmy==2.0.0
|
||||||
70
train.py
Normal file
70
train.py
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
import torch
|
||||||
|
from stable_baselines3 import PPO
|
||||||
|
from elden_env import EldenRingEnv
|
||||||
|
|
||||||
|
|
||||||
|
def get_system_info():
|
||||||
|
with open('/proc/meminfo') as f:
|
||||||
|
for line in f:
|
||||||
|
if line.startswith('MemTotal'):
|
||||||
|
ram_gb = int(line.split()[1]) / (1024 ** 2)
|
||||||
|
break
|
||||||
|
|
||||||
|
if torch.cuda.is_available():
|
||||||
|
props = torch.cuda.get_device_properties(0)
|
||||||
|
vram_gb = props.total_memory / (1024 ** 3)
|
||||||
|
device = "cuda"
|
||||||
|
gpu_name = props.name
|
||||||
|
else:
|
||||||
|
vram_gb = 0
|
||||||
|
device = "cpu"
|
||||||
|
gpu_name = "None"
|
||||||
|
|
||||||
|
return ram_gb, vram_gb, device, gpu_name
|
||||||
|
|
||||||
|
|
||||||
|
def get_hyperparams(ram_gb, vram_gb):
|
||||||
|
# n_steps: rollout buffer size, lives in RAM.
|
||||||
|
# Each obs is (640,640,12) float32 = ~18.75MB. Use at most 20% of RAM.
|
||||||
|
if ram_gb >= 64:
|
||||||
|
n_steps = 2048
|
||||||
|
elif ram_gb >= 32:
|
||||||
|
n_steps = 512
|
||||||
|
elif ram_gb >= 16:
|
||||||
|
n_steps = 256
|
||||||
|
else:
|
||||||
|
n_steps = 128
|
||||||
|
|
||||||
|
# batch_size: minibatch for the gradient update, lives in VRAM.
|
||||||
|
if vram_gb >= 16:
|
||||||
|
batch_size = 128
|
||||||
|
elif vram_gb >= 8:
|
||||||
|
batch_size = 32
|
||||||
|
elif vram_gb >= 4:
|
||||||
|
batch_size = 16
|
||||||
|
else:
|
||||||
|
batch_size = 8
|
||||||
|
|
||||||
|
return n_steps, batch_size
|
||||||
|
|
||||||
|
|
||||||
|
def train():
|
||||||
|
ram_gb, vram_gb, device, gpu_name = get_system_info()
|
||||||
|
n_steps, batch_size = get_hyperparams(ram_gb, vram_gb)
|
||||||
|
|
||||||
|
print(f"[HW] RAM: {ram_gb:.1f} GB | VRAM: {vram_gb:.1f} GB ({gpu_name}) | Device: {device}")
|
||||||
|
print(f"[HW] n_steps={n_steps}, batch_size={batch_size}")
|
||||||
|
|
||||||
|
env = EldenRingEnv()
|
||||||
|
model = PPO("CnnPolicy", env, verbose=1,
|
||||||
|
device=device,
|
||||||
|
n_steps=n_steps,
|
||||||
|
batch_size=batch_size)
|
||||||
|
|
||||||
|
print("Starting Training...")
|
||||||
|
model.learn(total_timesteps=100000)
|
||||||
|
model.save("elden_ai_model")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
train()
|
||||||
78
vision_agent.py
Normal file
78
vision_agent.py
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
from collections import deque
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
class EldenVision:
|
||||||
|
def __init__(self, device_index=2, stack_size=4):
|
||||||
|
self.cap = cv2.VideoCapture(device_index, cv2.CAP_V4L2)
|
||||||
|
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
||||||
|
|
||||||
|
self.frame_rgb = None
|
||||||
|
self.frame_hsv = None
|
||||||
|
self.stopped = False
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
|
# PRESERVED USER CONFIGS
|
||||||
|
self.player_hp_roi = (158, 164, 48, 329)
|
||||||
|
self.player_fp_roi = (163, 169, 49, 160)
|
||||||
|
self.player_sp_roi = (169, 176, 48, 216)
|
||||||
|
self.boss_hp_roi = (427, 437, 149, 488)
|
||||||
|
|
||||||
|
self.stack_size = stack_size
|
||||||
|
self.frame_stack = deque(maxlen=stack_size)
|
||||||
|
|
||||||
|
threading.Thread(target=self.update, args=(), daemon=True).start()
|
||||||
|
while self.frame_rgb is None: time.sleep(0.1)
|
||||||
|
|
||||||
|
def update(self):
|
||||||
|
while not self.stopped:
|
||||||
|
ret, frame = self.cap.read()
|
||||||
|
if ret and frame is not None:
|
||||||
|
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||||
|
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
||||||
|
with self.lock:
|
||||||
|
self.frame_rgb = rgb
|
||||||
|
self.frame_hsv = hsv
|
||||||
|
else:
|
||||||
|
time.sleep(0.001)
|
||||||
|
|
||||||
|
def detect_fog(self, frame_hsv=None):
|
||||||
|
"""Detects the bright white/gold glow of a fog gate in center screen."""
|
||||||
|
if frame_hsv is None:
|
||||||
|
with self.lock: frame_hsv = self.frame_hsv.copy()
|
||||||
|
|
||||||
|
# Center ROI where fog usually appears
|
||||||
|
center_crop = frame_hsv[250:450, 200:440]
|
||||||
|
# Fog is very bright (High Value) and low saturation (White)
|
||||||
|
white_mask = cv2.inRange(center_crop, np.array([0, 0, 200]), np.array([180, 50, 255]))
|
||||||
|
return np.sum(white_mask > 0) > 5000 # Threshold for 'significant' fog
|
||||||
|
|
||||||
|
def get_bar_percent(self, hsv_frame, roi, color_type='red'):
|
||||||
|
y1, y2, x1, x2 = roi
|
||||||
|
crop = hsv_frame[y1:y2, x1:x2]
|
||||||
|
if crop.size == 0: return 100.0
|
||||||
|
if color_type == 'red':
|
||||||
|
mask = cv2.inRange(crop, np.array([0, 120, 70]), np.array([10, 255, 255]))
|
||||||
|
elif color_type == 'blue':
|
||||||
|
mask = cv2.inRange(crop, np.array([100, 120, 70]), np.array([130, 255, 255]))
|
||||||
|
elif color_type == 'green':
|
||||||
|
mask = cv2.inRange(crop, np.array([40, 100, 50]), np.array([80, 255, 255]))
|
||||||
|
count = np.sum(mask > 0)
|
||||||
|
if count < 5: return 100.0 if color_type != 'boss' else 0.0
|
||||||
|
return (count / mask.size) * 100
|
||||||
|
|
||||||
|
def get_state(self):
|
||||||
|
with self.lock:
|
||||||
|
fr_rgb = self.frame_rgb.copy()
|
||||||
|
fr_hsv = self.frame_hsv.copy()
|
||||||
|
p_hp = self.get_bar_percent(fr_hsv, self.player_hp_roi, 'red')
|
||||||
|
p_fp = self.get_bar_percent(fr_hsv, self.player_fp_roi, 'blue')
|
||||||
|
p_sp = self.get_bar_percent(fr_hsv, self.player_sp_roi, 'green')
|
||||||
|
b_hp = self.get_bar_percent(fr_hsv, self.boss_hp_roi, 'red')
|
||||||
|
self.frame_stack.append(fr_rgb)
|
||||||
|
while len(self.frame_stack) < self.stack_size: self.frame_stack.append(fr_rgb)
|
||||||
|
return np.concatenate(list(self.frame_stack), axis=-1), p_hp, p_fp, p_sp, b_hp
|
||||||
|
|
||||||
|
def stop(self): self.stopped = True
|
||||||
Reference in New Issue
Block a user