import nidaqmx import numpy as np from nidaqmx.constants import AcquisitionType import threading import time class SignalGenerator: def __init__(self, channel="myDAQ2/ao0", rate=1000): self.channel = channel self.rate = rate self.running = False self.thread = None self.signal_type = "sine" self.amplitude = 1.0 self.frequency = 1.0 self.duty_cycle = 0.5 self.task = None def configure(self, signal_type="sine", amplitude=1.0, frequency=1.0, duty_cycle=0.5): self.signal_type = signal_type self.amplitude = max(min(amplitude, 10.0), -10.0) # ograniczenie zakresu self.frequency = max(frequency, 0.01) self.duty_cycle = min(max(duty_cycle, 0.0), 1.0) def start(self): if not self.running: self.running = True self.thread = threading.Thread(target=self._generate, daemon=True) self.thread.start() def stop(self): self.running = False if self.thread: self.thread.join() if self.task: self.task.close() self.task = None def _generate(self): try: samples_per_period = max(2, int(self.rate / self.frequency)) with nidaqmx.Task() as task: self.task = task task.ao_channels.add_ao_voltage_chan(self.channel, min_val=-10.0, max_val=10.0) task.timing.cfg_samp_clk_timing( rate=self.rate, sample_mode=AcquisitionType.CONTINUOUS, samps_per_chan=samples_per_period ) if self.signal_type == "sine": samples = self._generate_sine_wave(samples_per_period) elif self.signal_type == "pwm": samples = self._generate_pwm_wave(samples_per_period) else: samples = np.zeros(samples_per_period) task.write(samples, auto_start=False) task.start() while self.running: time.sleep(0.1) # podtrzymuj pracę except Exception as e: print(f"Błąd generatora: {e}") self.stop() def _generate_sine_wave(self, samples_per_period): t = np.linspace(0, 1, samples_per_period, endpoint=False) return (self.amplitude * np.sin(2 * np.pi * self.frequency * t)).tolist() def _generate_pwm_wave(self, samples_per_period): high_samples = int(samples_per_period * self.duty_cycle) low_samples = samples_per_period - high_samples return ([self.amplitude] * high_samples + [0.0] * low_samples)