How to Build a Robust Advanced Neural AI Agent with Stable Training, Adaptive Learning, and Intelligent Decision-Making?

Ledger
How to Build a Robust Advanced Neural AI Agent with Stable Training, Adaptive Learning, and Intelligent Decision-Making?
Paxful


class AdvancedNeuralAgent:
def __init__(self, input_size, hidden_layers=[64, 32], output_size=1, learning_rate=0.001):
“””Advanced AI Agent with stable training and decision making capabilities”””
self.lr = learning_rate
self.initial_lr = learning_rate
self.layers = []
self.memory = []
self.performance_history = []
self.epsilon = 1e-8

layer_sizes = [input_size] + hidden_layers + [output_size]
for i in range(len(layer_sizes) – 1):
fan_in, fan_out = layer_sizes[i], layer_sizes[i+1]
limit = np.sqrt(6.0 / (fan_in + fan_out))

layer = {
‘weights’: np.random.uniform(-limit, limit, (layer_sizes[i], layer_sizes[i+1])),
‘bias’: np.zeros((1, layer_sizes[i+1])),
‘momentum_w’: np.zeros((layer_sizes[i], layer_sizes[i+1])),
‘momentum_b’: np.zeros((1, layer_sizes[i+1]))
}
self.layers.append(layer)

def activation(self, x, func=”relu”):
“””Stable activation functions with clipping”””
x = np.clip(x, -50, 50)

bybit

if func == ‘relu’:
return np.maximum(0, x)
elif func == ‘sigmoid’:
return 1 / (1 + np.exp(-x))
elif func == ‘tanh’:
return np.tanh(x)
elif func == ‘leaky_relu’:
return np.where(x > 0, x, x * 0.01)
elif func == ‘linear’:
return x

def activation_derivative(self, x, func=”relu”):
“””Stable derivatives”””
x = np.clip(x, -50, 50)

if func == ‘relu’:
return (x > 0).astype(float)
elif func == ‘sigmoid’:
s = self.activation(x, ‘sigmoid’)
return s * (1 – s)
elif func == ‘tanh’:
return 1 – np.tanh(x)**2
elif func == ‘leaky_relu’:
return np.where(x > 0, 1, 0.01)
elif func == ‘linear’:
return np.ones_like(x)

def forward(self, X):
“””Forward pass with gradient clipping”””
self.activations = [X]
self.z_values = []

current_input = X
for i, layer in enumerate(self.layers):
z = np.dot(current_input, layer[‘weights’]) + layer[‘bias’]
z = np.clip(z, -50, 50)
self.z_values.append(z)

if i < len(self.layers) – 1:
a = self.activation(z, ‘leaky_relu’)
else:
a = self.activation(z, ‘linear’)

self.activations.append(a)
current_input = a

return current_input

def clip_gradients(self, gradients, max_norm=1.0):
“””Gradient clipping to prevent explosion”””
grad_norm = np.linalg.norm(gradients)
if grad_norm > max_norm:
gradients = gradients * (max_norm / (grad_norm + self.epsilon))
return gradients

def backward(self, X, y, output):
“””Stable backpropagation with gradient clipping”””
m = X.shape[0]

dz = (output – y.reshape(-1, 1)) / m
dz = np.clip(dz, -10, 10)

for i in reversed(range(len(self.layers))):
layer = self.layers[i]

dw = np.dot(self.activations[i].T, dz)
db = np.sum(dz, axis=0, keepdims=True)

dw = self.clip_gradients(dw, max_norm=1.0)
db = self.clip_gradients(db, max_norm=1.0)

momentum = 0.9
layer[‘momentum_w’] = momentum * layer[‘momentum_w’] + (1 – momentum) * dw
layer[‘momentum_b’] = momentum * layer[‘momentum_b’] + (1 – momentum) * db

weight_decay = 0.0001
layer[‘weights’] -= self.lr * (layer[‘momentum_w’] + weight_decay * layer[‘weights’])
layer[‘bias’] -= self.lr * layer[‘momentum_b’]

if i > 0:
activation_func=”leaky_relu” if i > 1 else ‘leaky_relu’
dz = np.dot(dz, layer[‘weights’].T) * self.activation_derivative(
self.z_values[i-1], activation_func)
dz = np.clip(dz, -10, 10)

def adapt_learning_rate(self, epoch, performance_history):
“””Adaptive learning rate with performance-based adjustment”””
if epoch > 10:
recent_performance = performance_history[-10:]
if len(recent_performance) >= 5:
if recent_performance[-1] >= recent_performance[-5]:
self.lr = max(self.lr * 0.95, self.initial_lr * 0.01)
elif recent_performance[-1] < recent_performance[-5] * 0.98:
self.lr = min(self.lr * 1.02, self.initial_lr * 2)

def calculate_loss(self, y_true, y_pred):
“””Stable loss calculation”””
y_true = y_true.reshape(-1, 1)
y_pred = np.clip(y_pred, -1e6, 1e6)

mse = np.mean((y_true – y_pred) ** 2)
mae = np.mean(np.abs(y_true – y_pred))

if not np.isfinite(mse):
mse = 1e6
if not np.isfinite(mae):
mae = 1e6

return mse, mae

def store_experience(self, state, action, reward, next_state):
“””Experience replay for RL aspects”””
experience = {
‘state’: state,
‘action’: action,
‘reward’: reward,
‘next_state’: next_state,
‘timestamp’: len(self.memory)
}
self.memory.append(experience)

if len(self.memory) > 1000:
self.memory.pop(0)

def make_decision(self, X, exploration_rate=0.1):
“””Stable decision making”””
prediction = self.forward(X)

if np.random.random() < exploration_rate:
noise_scale = np.std(prediction) * 0.1 if np.std(prediction) > 0 else 0.1
noise = np.random.normal(0, noise_scale, prediction.shape)
prediction += noise

return np.clip(prediction, -1e6, 1e6)

def reset_if_unstable(self):
“””Reset network if training becomes unstable”””
print(“🔄 Resetting network due to instability…”)
for i, layer in enumerate(self.layers):
fan_in, fan_out = layer[‘weights’].shape
limit = np.sqrt(6.0 / (fan_in + fan_out))
layer[‘weights’] = np.random.uniform(-limit, limit, (fan_in, fan_out))
layer[‘bias’] = np.zeros((1, fan_out))
layer[‘momentum_w’] = np.zeros((fan_in, fan_out))
layer[‘momentum_b’] = np.zeros((1, fan_out))
self.lr = self.initial_lr

def train(self, X, y, epochs=500, batch_size=32, validation_split=0.2, verbose=True):
“””Robust training with stability checks”””
y_mean, y_std = np.mean(y), np.std(y)
y_normalized = (y – y_mean) / (y_std + self.epsilon)

X_trn, X_val, y_trn, y_val = train_test_split(
X, y_normalized, test_size=validation_split, random_state=42)

best_val_loss = float(‘inf’)
patience = 30
patience_counter = 0

train_losses, val_losses = [], []
reset_count = 0

for epoch in range(epochs):
if epoch > 0 and (not np.isfinite(train_losses[-1]) or train_losses[-1] > 1e6):
if reset_count < 2:
self.reset_if_unstable()
reset_count += 1
continue
else:
print(“🚫 Training unstable, stopping…”)
break

indices = np.random.permutation(len(X_train))
X_train_shuffled = X_train[indices]
y_train_shuffled = y_train[indices]

epoch_loss = 0
batches = 0
for i in range(0, len(X_trn), batch_size):
batch_X = X_train_shuffled[i:i+batch_size]
batch_y = y_train_shuffled[i:i+batch_size]

if len(batch_X) == 0:
continue

output = self.forward(batch_X)
self.backward(batch_X, batch_y, output)

loss, _ = self.calculate_loss(batch_y, output)
epoch_loss += loss
batches += 1

avg_train_loss = epoch_loss / max(batches, 1)

val_output = self.forward(X_val)
val_loss, val_mae = self.calculate_loss(y_val, val_output)

train_losses.append(avg_train_loss)
val_losses.append(val_loss)
self.performance_history.append(val_loss)

if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1

if patience_counter >= patience:
if verbose:
print(f”✋ Early stopping at epoch {epoch}”)
break

if epoch > 0:
self.adapt_learning_rate(epoch, self.performance_history)

if verbose and (epoch % 50 == 0 or epoch < 10):
print(f”Epoch {epoch:3d}: Train Loss = {avg_train_loss:.4f}, ”
f”Val Loss = {val_loss:.4f}, LR = {self.lr:.6f}”)

self.y_mean, self.y_std = y_mean, y_std
return train_losses, val_losses

def predict(self, X):
“””Make predictions with denormalization”””
normalized_pred = self.forward(X)
if hasattr(self, ‘y_mean’) and hasattr(self, ‘y_std’):
return normalized_pred * self.y_std + self.y_mean
return normalized_pred

def evaluate_performance(self, X, y):
“””Comprehensive performance evaluation”””
predictions = self.predict(X)
mse, mae = self.calculate_loss(y, predictions)

y_mean = np.mean(y)
ss_tot = np.sum((y – y_mean) ** 2)
ss_res = np.sum((y.reshape(-1, 1) – predictions) ** 2)
r2 = 1 – (ss_res / (ss_tot + self.epsilon))

return {
‘mse’: float(mse) if np.isfinite(mse) else float(‘inf’),
‘mae’: float(mae) if np.isfinite(mae) else float(‘inf’),
‘r2’: float(r2) if np.isfinite(r2) else -float(‘inf’),
‘predictions’: predictions.flatten()
}

def visualize_training(self, train_losses, val_losses):
“””Visualize training progress”””
plt.figure(figsize=(15, 5))

plt.subplot(1, 3, 1)
plt.plot(train_losses, label=”Training Loss”, alpha=0.8)
plt.plot(val_losses, label=”Validation Loss”, alpha=0.8)
plt.title(‘Training Progress’)
plt.xlabel(‘Epoch’)
plt.ylabel(‘Loss’)
plt.legend()
plt.grid(True, alpha=0.3)
plt.yscale(‘log’)

plt.subplot(1, 3, 2)
if len(self.performance_history) > 0:
plt.plot(self.performance_history)
plt.title(‘Performance History’)
plt.xlabel(‘Epoch’)
plt.ylabel(‘Validation Loss’)
plt.grid(True, alpha=0.3)
plt.yscale(‘log’)

plt.subplot(1, 3, 3)
if hasattr(self, ‘lr_history’):
plt.plot(self.lr_history)
plt.title(‘Learning Rate Schedule’)
plt.xlabel(‘Epoch’)
plt.ylabel(‘Learning Rate’)
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()



Source link

fiverr

Be the first to comment

Leave a Reply

Your email address will not be published.


*