上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

第1关:MapReduce综合应用案例 — 电信数据清洗

guduadmin318小时前

根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。

数据说明如下: a.txt

数据切分方式:,

数据所在位置:/user/test/input/a.txt

15733218050,15778423030,1542457633,1542457678,450000,530000

157332180501577842303015424576331542457678450000530000
呼叫者手机号接受者手机号开始时间戳(s)接受时间戳(s)呼叫者地址省份编码接受者地址省份编码

Mysql数据库:

用户名:root 密码:123123

数据库名:mydb

用户表:userphone

列名类型非空是否自增介绍
idint(11)用户ID
phonevarchar(255)手机号
trueNamevarchar(255)真实姓名

地址省份表:allregion

列名类型非空是否自增介绍
idint(11)用户ID
CodeNumvarchar(255)编号
Addressvarchar(255)地址

清洗规则:

  • 处理数据中的时间戳(秒级)将其转化为2017-06-21 07:01:58,年-月-日 时:分:秒 这种格式;

  • 处理数据中的省份编码,结合mysql的表数据对应,将其转换成省份名称;

  • 处理用户手机号,与mysql的表数据对应,关联用户的真实姓名;

  • 处理数据中的开始时间与结束时间并计算通信时长(以秒为单位);

  • 设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为: /user/test/input/a.txt (HDFS); 清洗后的数据存放于:/user/test/output (HDFS)。

    数据清洗后如下:

    邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市

    邓二张倩13666666666151518896012018-03-29 10:58:122018-03-29 10:58:4230黑龙江省上海市
    用户名A用户名B用户A的手机号用户B的手机号开始时间结束时间

    step/com/LogMR.java

    package com;
    import java.io.IOException;
    import java.sql.Connection;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class LogMR {
        /********** begin **********/
        static class MyMapper extends Mapper {
            Map userMap = new HashMap<>();
            Map addressMap = new HashMap<>();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            PhoneLog pl = new PhoneLog();
            Text text = new Text();
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                Connection connection = DBHelper.getConnection();
                try {
                    Statement statement = connection.createStatement();
                    String sql = "select * from userphone";
                    ResultSet resultSet = statement.executeQuery(sql);
                    while (resultSet.next()) {
                        String phone = resultSet.getString(2);
                        String trueName = resultSet.getString(3);
                        userMap.put(phone, trueName);
                    }
                    String sql2 = "select * from allregion";
                    ResultSet resultSetA = statement.executeQuery(sql2);
                    while (resultSetA.next()) {
                        String phone = resultSetA.getString(2);
                        String trueName = resultSetA.getString(3);
                        addressMap.put(phone, trueName);
                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String str = value.toString();
                String[] split = str.split(",");
                if (split.length == 6) {
                    String trueName1 = userMap.get(split[0]);
                    String trueName2 = userMap.get(split[1]);
                    String address1 = addressMap.get(split[4]);
                    String address2 = addressMap.get(split[5]);
                    long startTimestamp = Long.parseLong(split[2]);
                    String startTime = sdf.format(startTimestamp * 1000);
                    long endTimestamp = Long.parseLong(split[3]);
                    String endTime = sdf.format(endTimestamp * 1000);
                    long timeLen = endTimestamp - startTimestamp;
                    pl.SetPhoneLog(trueName1, trueName2, split[0], split[1], startTime, endTime, timeLen, address1,
                            address2);
                    context.write(pl, NullWritable.get());
                }
            }
        }
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(LogMR.class);
            job.setMapperClass(MyMapper.class);
            job.setMapOutputKeyClass(PhoneLog.class);
            job.setMapOutputValueClass(NullWritable.class);
            job.setNumReduceTasks(0);
            Path inPath = new Path("/user/test/input/a.txt");
            Path out = new Path("/user/test/output");
            FileInputFormat.setInputPaths(job, inPath);
            FileOutputFormat.setOutputPath(job, out);
            job.waitForCompletion(true);
        }
        /********** end **********/
    }
    

    step/com/DBHelper.java

    package com;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    public class DBHelper {
        /********** begin **********/
        private static final String driver = "com.mysql.jdbc.Driver";
        private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
        private static final String username = "root";// 数据库的用户名
        private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
        private static Connection conn = null; // 声明数据库连接对象
        static {
            try {
                Class.forName(driver);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
        public static Connection getConnection() {
            if (conn == null) {
                try {
                    conn = DriverManager.getConnection(url, username, password);
                } catch (SQLException e) {
                    e.printStackTrace();
                } // 连接数据库
                return conn;
            }
            return conn;
        }
        /********** end **********/
    }

    step/com/phonelog.java

    package com;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    public class PhoneLog implements WritableComparable {
        private String userA;
        private String userB;
        private String userA_Phone;
        private String userB_Phone;
        private String startTime;
        private String endTime;
        private Long timeLen;
        private String userA_Address;
        private String userB_Address;
        public PhoneLog() {
        }
        public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,
                String endTime, Long timeLen, String userA_Address, String userB_Address) {
            this.userA = userA;
            this.userB = userB;
            this.userA_Phone = userA_Phone;
            this.userB_Phone = userB_Phone;
            this.startTime = startTime;
            this.endTime = endTime;
            this.timeLen = timeLen;
            this.userA_Address = userA_Address;
            this.userB_Address = userB_Address;
        }
        public String getUserA_Phone() {
            return userA_Phone;
        }
        public void setUserA_Phone(String userA_Phone) {
            this.userA_Phone = userA_Phone;
        }
        public String getUserB_Phone() {
            return userB_Phone;
        }
        public void setUserB_Phone(String userB_Phone) {
            this.userB_Phone = userB_Phone;
        }
        public String getUserA() {
            return userA;
        }
        public void setUserA(String userA) {
            this.userA = userA;
        }
        public String getUserB() {
            return userB;
        }
        public void setUserB(String userB) {
            this.userB = userB;
        }
        public String getStartTime() {
            return startTime;
        }
        public void setStartTime(String startTime) {
            this.startTime = startTime;
        }
        public String getEndTime() {
            return endTime;
        }
        public void setEndTime(String endTime) {
            this.endTime = endTime;
        }
        public Long getTimeLen() {
            return timeLen;
        }
        public void setTimeLen(Long timeLen) {
            this.timeLen = timeLen;
        }
        public String getUserA_Address() {
            return userA_Address;
        }
        public void setUserA_Address(String userA_Address) {
            this.userA_Address = userA_Address;
        }
        public String getUserB_Address() {
            return userB_Address;
        }
        public void setUserB_Address(String userB_Address) {
            this.userB_Address = userB_Address;
        }
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(userA);
            out.writeUTF(userB);
            out.writeUTF(userA_Phone);
            out.writeUTF(userB_Phone);
            out.writeUTF(startTime);
            out.writeUTF(endTime);
            out.writeLong(timeLen);
            out.writeUTF(userA_Address);
            out.writeUTF(userB_Address);
        }
        @Override
        public void readFields(DataInput in) throws IOException {
            userA = in.readUTF();
            userB = in.readUTF();
            userA_Phone = in.readUTF();
            userB_Phone = in.readUTF();
            startTime = in.readUTF();
            endTime = in.readUTF();
            timeLen = in.readLong();
            userA_Address = in.readUTF();
            userB_Address = in.readUTF();
        }
        @Override
        public String toString() {
            return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + ","
                    + timeLen + "," + userA_Address + "," + userB_Address;
        }
         @Override
         public int compareTo(PhoneLog pl) {
         if(this.hashCode() == pl.hashCode()) {
         return 0;
         }
         return -1;
         }
    }
    

    最后重启hadoop#start-all.sh  完成评测

网友评论

搜索
最新文章
热门文章
热门标签