Skip to content

AI Trading & Market Intelligence Agent

Advanced AI-powered trading analysis with real-time market intelligence

typescript
import { cronflow } from 'cronflow';
import { z } from 'zod';

// Types for better type safety
interface MarketData {
  symbol: string;
  price: number;
  volume: number;
  change24h: number;
  marketCap: number;
  volatility: number;
  technicalIndicators: {
    rsi: number;
    macd: number;
    bollinger: { upper: number; lower: number; middle: number };
    support: number;
    resistance: number;
  };
}

interface SentimentAnalysis {
  overallSentiment: number; // -1 to 1
  newsImpact: number;
  socialBuzz: number;
  fearGreedIndex: number;
  keyEvents: string[];
  confidenceScore: number;
}

interface TradingRecommendation {
  symbol: string;
  action: 'BUY' | 'SELL' | 'HOLD';
  confidence: number;
  targetPrice: number;
  stopLoss: number;
  reasoning: string;
  riskLevel: 'LOW' | 'MEDIUM' | 'HIGH';
  timeHorizon: 'SHORT' | 'MEDIUM' | 'LONG';
}

interface AIAnalysis {
  recommendations: TradingRecommendation[];
  portfolioOptimization: {
    suggestedAllocations: { [symbol: string]: number };
    riskScore: number;
    expectedReturn: number;
    sharpeRatio: number;
  };
  marketOutlook: {
    direction: 'BULLISH' | 'BEARISH' | 'NEUTRAL';
    confidence: number;
    keyFactors: string[];
  };
}

interface RiskAnalysis {
  adjustedRecommendations: Array<
    TradingRecommendation & {
      adjustedConfidence: number;
      positionSize: number;
    }
  >;
  riskMetrics: {
    portfolioVaR: number;
    maxDrawdown: number;
    correlation: number;
    diversificationScore: number;
  };
  alerts: string[];
}

interface PortfolioSimulation {
  projectedReturns: { [period: string]: number };
  riskAssessment: string;
  capitalRequired: number;
  impactAnalysis: {
    sharpeImprovement: number;
    diversificationImpact: number;
  };
}

interface ApprovalData {
  sessionId: string;
  highRiskTrades: TradingRecommendation[];
  totalCapital: number;
  projectedReturns: { [period: string]: number };
  riskMetrics: {
    portfolioVaR: number;
    maxDrawdown: number;
    correlation: number;
    diversificationScore: number;
  };
}

// Market Data & AI Analysis Functions
async function fetchMarketData(symbols: string[]): Promise<MarketData[]> {
  // Simulate real market data - replace with actual API calls
  return symbols.map(symbol => ({
    symbol,
    price: Math.random() * 50000 + 1000,
    volume: Math.random() * 1000000 + 100000,
    change24h: (Math.random() - 0.5) * 20,
    marketCap: Math.random() * 1000000000 + 10000000,
    volatility: Math.random() * 5 + 1,
    technicalIndicators: {
      rsi: Math.random() * 100,
      macd: (Math.random() - 0.5) * 100,
      bollinger: {
        upper: Math.random() * 55000 + 5000,
        lower: Math.random() * 45000 + 1000,
        middle: Math.random() * 50000 + 3000,
      },
      support: Math.random() * 45000 + 1000,
      resistance: Math.random() * 55000 + 5000,
    },
  }));
}

async function analyzeSentiment(
  newsData: any[],
  socialData: any[]
): Promise<SentimentAnalysis> {
  // Simulate advanced sentiment analysis
  const overallSentiment = (Math.random() - 0.5) * 2;
  const newsImpact = Math.random();
  const socialBuzz = Math.random();
  const fearGreedIndex = Math.random() * 100;

  return {
    overallSentiment,
    newsImpact,
    socialBuzz,
    fearGreedIndex,
    keyEvents: [
      'Federal Reserve meeting scheduled',
      'Tech earnings season begins',
    ],
    confidenceScore: Math.random() * 0.3 + 0.7, // 0.7-1.0
  };
}

async function runAITradingModel(
  marketData: MarketData[],
  sentiment: SentimentAnalysis,
  portfolio: any
): Promise<AIAnalysis> {
  // Simulate advanced AI trading analysis
  return {
    recommendations: marketData.map(data => ({
      symbol: data.symbol,
      action: ['BUY', 'SELL', 'HOLD'][Math.floor(Math.random() * 3)] as
        | 'BUY'
        | 'SELL'
        | 'HOLD',
      confidence: Math.random() * 0.4 + 0.6,
      targetPrice: data.price * (1 + (Math.random() - 0.5) * 0.2),
      stopLoss: data.price * (0.9 + Math.random() * 0.05),
      reasoning: `Technical analysis suggests ${data.technicalIndicators.rsi > 70 ? 'overbought' : 'oversold'} conditions`,
      riskLevel: ['LOW', 'MEDIUM', 'HIGH'][Math.floor(Math.random() * 3)] as
        | 'LOW'
        | 'MEDIUM'
        | 'HIGH',
      timeHorizon: ['SHORT', 'MEDIUM', 'LONG'][
        Math.floor(Math.random() * 3)
      ] as 'SHORT' | 'MEDIUM' | 'LONG',
    })),
    portfolioOptimization: {
      suggestedAllocations: Object.fromEntries(
        marketData.map(d => [d.symbol, Math.random()])
      ),
      riskScore: Math.random() * 10,
      expectedReturn: Math.random() * 20 + 5,
      sharpeRatio: Math.random() * 2 + 0.5,
    },
    marketOutlook: {
      direction: ['BULLISH', 'BEARISH', 'NEUTRAL'][
        Math.floor(Math.random() * 3)
      ] as 'BULLISH' | 'BEARISH' | 'NEUTRAL',
      confidence: Math.random() * 0.3 + 0.7,
      keyFactors: [
        'Economic indicators',
        'Geopolitical events',
        'Market liquidity',
      ],
    },
  };
}

async function executeRiskManagement(
  recommendations: TradingRecommendation[],
  portfolio: any
): Promise<RiskAnalysis> {
  // Simulate risk management calculations
  return {
    adjustedRecommendations: recommendations.map(rec => ({
      ...rec,
      adjustedConfidence: rec.confidence * 0.9, // Risk adjustment
      positionSize: Math.min(rec.confidence * 0.1, 0.05), // Max 5% position
    })),
    riskMetrics: {
      portfolioVaR: Math.random() * 0.1,
      maxDrawdown: Math.random() * 0.15,
      correlation: Math.random(),
      diversificationScore: Math.random() * 100,
    },
    alerts:
      Math.random() > 0.7
        ? ['High correlation detected between positions']
        : [],
  };
}

// News and Social Media Functions
async function fetchMarketNews(symbols: string[]): Promise<any[]> {
  // Simulate news fetching
  return [
    { title: 'Market volatility increases', sentiment: 0.3, impact: 'HIGH' },
    { title: 'Tech stocks rally continues', sentiment: 0.7, impact: 'MEDIUM' },
  ];
}

async function fetchSocialSentiment(symbols: string[]): Promise<any[]> {
  // Simulate social media sentiment
  return [
    { platform: 'twitter', mentions: 1500, sentiment: 0.6 },
    { platform: 'reddit', mentions: 800, sentiment: 0.4 },
  ];
}

// Notification Functions
async function sendTradingAlert(
  message: string,
  priority: 'HIGH' | 'MEDIUM' | 'LOW',
  data: any
) {
  console.log(`🚨 [${priority}] Trading Alert: ${message}`);
  console.log('📊 Data:', JSON.stringify(data, null, 2));
  // Replace with actual notification service (Slack, Discord, Email, SMS)
}

async function updateDashboard(analysisData: any) {
  console.log('📈 Dashboard updated with latest analysis');
  // Replace with actual dashboard/database update
}

async function logTradingDecision(decision: any) {
  console.log('📝 Trading Decision Logged:', {
    timestamp: new Date().toISOString(),
    recommendations: decision.recommendations?.length || 0,
    portfolioRisk: decision.riskMetrics?.portfolioVaR || 0,
    processingTime: decision.processingTime,
  });
}

// Portfolio simulation functions
async function simulatePortfolioImpact(
  recommendations: TradingRecommendation[],
  currentPortfolio: any
): Promise<PortfolioSimulation> {
  return {
    projectedReturns: {
      '1D': Math.random() * 2 - 1,
      '1W': Math.random() * 5 - 2.5,
      '1M': Math.random() * 15 - 7.5,
      '3M': Math.random() * 30 - 15,
    },
    riskAssessment: 'MODERATE',
    capitalRequired: recommendations.reduce(
      (sum, rec) => sum + rec.targetPrice * 100,
      0
    ),
    impactAnalysis: {
      sharpeImprovement: Math.random() * 0.5,
      diversificationImpact: Math.random() * 0.3,
    },
  };
}

// Define the AI Trading Agent Workflow
const aiTradingAgent = cronflow.define({
  id: 'ai-trading-agent',
  name: 'AI Trading & Market Intelligence Agent',
  description:
    'Advanced AI-powered trading analysis with real-time market intelligence',
  hooks: {
    onSuccess: ctx => {
      console.log('✅ Trading analysis completed successfully');
      console.log('⚡ Performance:', {
        totalSteps: ctx.steps?.length || 0,
        processingTime:
          Date.now() - (ctx.steps?.[0]?.output?.startTime || Date.now()),
        recommendationsGenerated: ctx.last?.recommendations?.length || 0,
      });
    },
    onFailure: ctx => {
      console.log('❌ Trading analysis failed:', ctx.error);
      sendTradingAlert('Trading Agent Error', 'HIGH', { error: ctx.error });
    },
  },
});

aiTradingAgent
  .onWebhook('/webhooks/trading-analysis', {
    schema: z.object({
      symbols: z.array(z.string()).min(1).max(20),
      analysisType: z.enum(['QUICK', 'DEEP', 'FULL_PORTFOLIO']).default('DEEP'),
      riskTolerance: z
        .enum(['CONSERVATIVE', 'MODERATE', 'AGGRESSIVE'])
        .default('MODERATE'),
      timeHorizon: z.enum(['DAY', 'SWING', 'POSITION']).default('SWING'),
      portfolioValue: z.number().positive().default(100000),
      userId: z.string(),
      preferences: z
        .object({
          maxPositionSize: z.number().min(0.01).max(0.3).default(0.1),
          enableAlerts: z.boolean().default(true),
          riskLevel: z.number().min(1).max(10).default(5),
        })
        .optional(),
    }),
  })

  // Step 1: Initialize analysis session
  .step('initialize-analysis', async ctx => {
    console.log('🚀 Initializing AI Trading Analysis...');
    const { symbols, analysisType, userId } = ctx.payload;

    const sessionId = `trading_${userId}_${Date.now()}`;
    const startTime = Date.now();

    console.log(
      `📊 Analyzing ${symbols.length} symbols: ${symbols.join(', ')}`
    );
    console.log(`🎯 Analysis type: ${analysisType}`);

    return {
      sessionId,
      startTime,
      symbols,
      analysisType,
      initialized: true,
    };
  })

  // Step 2: Parallel data gathering (this is where Cronflow shines!)
  .parallel([
    // Market data collection
    async ctx => {
      console.log('📈 Fetching real-time market data...');
      const marketData = await fetchMarketData(ctx.last.symbols);
      return { marketData };
    },

    // News sentiment analysis
    async ctx => {
      console.log('📰 Analyzing market news...');
      const newsData = await fetchMarketNews(ctx.last.symbols);
      return { newsData };
    },

    // Social media sentiment
    async ctx => {
      console.log('💬 Analyzing social sentiment...');
      const socialData = await fetchSocialSentiment(ctx.last.symbols);
      return { socialData };
    },
  ])

  // Step 3: Advanced sentiment analysis
  .step('analyze-market-sentiment', async ctx => {
    console.log('🧠 Running advanced sentiment analysis...');

    const [marketResult, newsResult, socialResult] = ctx.last;
    const sentimentAnalysis = await analyzeSentiment(
      newsResult.newsData,
      socialResult.socialData
    );

    return {
      marketData: marketResult.marketData,
      newsData: newsResult.newsData,
      socialData: socialResult.socialData,
      sentimentAnalysis,
      sentimentProcessed: true,
    };
  })

  // Step 4: AI Trading Model Analysis
  .step('run-ai-trading-model', async ctx => {
    console.log('🤖 Running AI trading model...');

    const portfolio = {
      value: ctx.payload.portfolioValue,
      riskTolerance: ctx.payload.riskTolerance,
      preferences: ctx.payload.preferences,
    };

    const aiAnalysis = await runAITradingModel(
      ctx.last.marketData,
      ctx.last.sentimentAnalysis,
      portfolio
    );

    return {
      ...ctx.last,
      aiAnalysis,
      modelProcessed: true,
    };
  })

  // Step 5: Risk management and validation
  .step('apply-risk-management', async ctx => {
    console.log('🛡️ Applying risk management rules...');

    const portfolio = {
      value: ctx.payload.portfolioValue,
      riskTolerance: ctx.payload.riskTolerance,
    };

    const riskAnalysis = await executeRiskManagement(
      ctx.last.aiAnalysis.recommendations,
      portfolio
    );

    return {
      ...ctx.last,
      riskAnalysis,
      riskValidated: true,
    };
  })

  // Step 6: Portfolio impact simulation
  .step('simulate-portfolio-impact', async ctx => {
    console.log('🎯 Simulating portfolio impact...');

    const currentPortfolio = {
      value: ctx.payload.portfolioValue,
      positions: [], // Would be actual positions
    };

    const simulation = await simulatePortfolioImpact(
      ctx.last.riskAnalysis.adjustedRecommendations,
      currentPortfolio
    );

    return {
      ...ctx.last,
      simulation,
      simulationComplete: true,
    };
  })

  // Step 7: Decision point - high-impact trades need approval
  .if('needs-approval', ctx => {
    const highImpactTrades =
      ctx.last.riskAnalysis.adjustedRecommendations.filter(
        rec => rec.confidence > 0.8 && rec.riskLevel === 'HIGH'
      );

    const totalCapitalRequired = ctx.last.simulation.capitalRequired;
    const portfolioPercentage =
      totalCapitalRequired / ctx.payload.portfolioValue;

    // Require approval if high-risk or large position
    return highImpactTrades.length > 0 || portfolioPercentage > 0.2;
  })

  .step('prepare-approval-request', async ctx => {
    console.log('👤 Preparing for human approval...');

    const highRiskTrades = ctx.last.riskAnalysis.adjustedRecommendations.filter(
      rec => rec.riskLevel === 'HIGH'
    );

    return {
      approvalData: {
        sessionId: ctx.steps['initialize-analysis'].output.sessionId,
        highRiskTrades,
        totalCapital: ctx.last.simulation.capitalRequired,
        projectedReturns: ctx.last.simulation.projectedReturns,
        riskMetrics: ctx.last.riskAnalysis.riskMetrics,
      },
      preparedAt: Date.now(),
    };
  })

  .humanInTheLoop({
    timeout: '30m',
    description: 'High-impact trading decisions require approval',
    onPause: async (ctx, token) => {
      console.log('🛑 High-impact trades detected - approval required');

      await sendTradingAlert(
        'High-Impact Trading Decision Requires Approval',
        'HIGH',
        {
          token,
          capitalRequired: ctx.last.approvalData.totalCapital,
          projectedReturn: ctx.last.approvalData.projectedReturns['1M'],
          approvalUrl: `https://trading.yoursite.com/approve/${token}`,
        }
      );
    },
  })

  .step('process-trading-approval', async ctx => {
    if (ctx.last.timedOut) {
      return {
        approved: false,
        reason: 'Approval timeout - trades cancelled for safety',
        finalRecommendations: [],
      };
    }

    const approvedTrades = ctx.last.approved
      ? ctx.steps['prepare-approval-request']?.output?.approvalData
          ?.highRiskTrades || []
      : [];

    return {
      approved: ctx.last.approved,
      reason: ctx.last.reason,
      finalRecommendations: approvedTrades,
    };
  })

  .else()

  .step('auto-approve-low-risk', async ctx => {
    console.log('✅ Auto-approving low-risk recommendations...');

    return {
      approved: true,
      reason: 'Low-risk trades auto-approved',
      finalRecommendations: ctx.last.riskAnalysis.adjustedRecommendations,
    };
  })

  .endIf()

  // Step 8: Generate final trading report
  .step('generate-trading-report', async ctx => {
    console.log('📋 Generating comprehensive trading report...');

    const processingTime =
      Date.now() - ctx.steps['initialize-analysis'].output.startTime;
    const recommendations = ctx.last.finalRecommendations || [];

    const report = {
      sessionId: ctx.steps['initialize-analysis'].output.sessionId,
      timestamp: new Date().toISOString(),
      processingTime,
      marketAnalysis: {
        symbols: ctx.payload.symbols,
        marketData: ctx.steps[
          'analyze-market-sentiment'
        ]?.output?.marketData?.slice(0, 3), // Truncate for display
        sentiment:
          ctx.steps['analyze-market-sentiment']?.output?.sentimentAnalysis,
      },
      recommendations: recommendations.map(rec => ({
        symbol: rec.symbol,
        action: rec.action,
        confidence: rec.adjustedConfidence || rec.confidence,
        reasoning: rec.reasoning,
        riskLevel: rec.riskLevel,
      })),
      portfolioImpact:
        ctx.steps['simulate-portfolio-impact']?.output?.simulation,
      riskMetrics:
        ctx.steps['apply-risk-management']?.output?.riskAnalysis?.riskMetrics,
      approval: {
        required: !!ctx.steps['prepare-approval-request']?.output,
        status: ctx.last.approved ? 'APPROVED' : 'PENDING/REJECTED',
      },
    };

    return { report, generated: true };
  })

  // Step 9: Execute notifications and logging
  .parallel([
    // Send alerts for high-confidence recommendations
    async ctx => {
      const highConfidenceRecs = ctx.last.report.recommendations.filter(
        rec => rec.confidence > 0.85
      );

      if (
        highConfidenceRecs.length > 0 &&
        ctx.payload.preferences?.enableAlerts
      ) {
        await sendTradingAlert(
          `${highConfidenceRecs.length} high-confidence trading opportunities detected`,
          'MEDIUM',
          { recommendations: highConfidenceRecs }
        );
      }

      return { alertsSent: highConfidenceRecs.length };
    },

    // Update dashboard
    async ctx => {
      await updateDashboard(ctx.last.report);
      return { dashboardUpdated: true };
    },

    // Log trading decision
    async ctx => {
      await logTradingDecision({
        ...ctx.last.report,
        processingTime: ctx.last.report.processingTime,
      });
      return { decisionLogged: true };
    },
  ])

  // Background action: Update ML model with results
  .action('update-ml-model', async ctx => {
    console.log('🧠 Updating ML model with analysis results...');

    // This would feed results back into the AI model for continuous learning
    console.log('ML model updated with latest market analysis');
  });

// Start the AI Trading Agent
console.log('🚀 Starting AI Trading & Market Intelligence Agent...');
cronflow.start();

console.log('📋 Available endpoints:');
console.log(
  '  POST /webhooks/trading-analysis - Analyze markets and generate trading recommendations'
);
console.log('');
console.log('📖 Example usage:');
console.log(`curl -X POST http://localhost:3000/webhooks/trading-analysis \\
  -H "Content-Type: application/json" \\
  -d '{
    "symbols": ["BTC", "ETH", "TSLA", "AAPL"],
    "analysisType": "DEEP",
    "riskTolerance": "MODERATE",
    "timeHorizon": "SWING",
    "portfolioValue": 50000,
    "userId": "trader_123",
    "preferences": {
      "maxPositionSize": 0.15,
      "enableAlerts": true,
      "riskLevel": 6
    }
  }'`);

console.log('');
console.log('🔧 Advanced features:');
console.log('  - Real-time market data analysis');
console.log('  - Multi-source sentiment analysis');
console.log('  - AI-powered trading recommendations');
console.log('  - Risk management and portfolio optimization');
console.log('  - Human approval for high-impact trades');
console.log('  - Comprehensive reporting and analytics');

// Export for external use
export { cronflow, aiTradingAgent };

Released under the Apache 2.0 License.