11.11大促主会场
新人页面
精选商品
首月0月租体验,领12个月京东PLUS
自营热卖

122.Spark大型电商项目-广告点击流量实时统计-生产环境测试

旧梦拾遗 1年前   阅读数 152 0

目录

广告点击实时代码

MockRealTimeData.java


本篇文章记录广告点击流量实时统计-生产环境测试。

广告点击实时代码

MockRealTimeData.java

package graduation.java.test;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * FileName: MockRealTimeData
 * Author:   hadoop
 * Email:    3165845957@qq.com
 * Date:     19-4-5 下午10:40
 * Description:
 *模拟广告点击实时数据
 */
public class MockRealTimeData extends Thread {

    private static final Random random = new Random();
    private static final String[] provinces = new String[]{"Jiangsu", "Hubei", "Hunan", "Henan", "Hebei"};
    private static final Map<String, String[]> provinceCityMap = new HashMap<String, String[]>();

    private Producer<Integer, String> producer;

    public MockRealTimeData() {
        provinceCityMap.put("Jiangsu", new String[] {"Nanjing", "Suzhou"});
        provinceCityMap.put("Hubei", new String[] {"Wuhan", "Jingzhou"});
        provinceCityMap.put("Hunan", new String[] {"Changsha", "Xiangtan"});
        provinceCityMap.put("Henan", new String[] {"Zhengzhou", "Luoyang"});
        provinceCityMap.put("Hebei", new String[] {"Shijiazhuang", "Tangshan"});

        producer = new Producer<Integer, String>(createProducerConfig());
    }

    private ProducerConfig createProducerConfig() {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "172.17.0.1:9092,172.17.0.1:9092,172.17.0.1:9092");
        return new ProducerConfig(props);
    }

    public void run() {
        while(true) {
            String province = provinces[random.nextInt(5)];
            String city = provinceCityMap.get(province)[random.nextInt(2)];

            String log = new Date().getTime() + " " + province + " " + city + " "
                    + random.nextInt(1000) + " " + random.nextInt(10);
            producer.send(new KeyedMessage<Integer, String>("AdRealTimeLog", log));

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 启动Kafka Producer
     * @param args
     */
    public static void main(String[] args) {
        MockRealTimeData producer = new MockRealTimeData();
        producer.start();
    }

}

 


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: